In [0]:
#Notebook used to storage multi purpose use functions on the silver layer

In [0]:
def checkduplicates(df, idcol):
    duplicated = df.groupBy(col(idcol)).agg(count(col(idcol)).alias("count")).filter(col("count") > 1)
    if duplicated.count() > 0:
        return duplicated.display()
    else:
         return print("No duplicated values found.")



In [0]:
def deduplicate(df, idcol, timecol=None):
    if timecol:
        window_spec = Window.partitionBy(idcol).orderBy(col(timecol).desc())
        df = df.withColumn('row_num', row_number().over(window_spec))
        df = df.filter(col('row_num') == 1).drop('row_num')
    else:
        df = df.dropDuplicates([idcol])

    return df


In [0]:
def graphbycolumnd (df,col): 
    df= df.groupBy(col).agg(count(col).alias("count")).orderBy(desc("count"))

    return df.show()

In [0]:

def iqr_outlier(dataframe, column, fac=1.5):

    # Calculate Q1, Q3, and IQR
    quantiles = dataframe.approxQuantile(column, [0.25, 0.75], 0.01)
    q1, q3 = quantiles[0], quantiles[1]
    iqr = q3 - q1

     # Define the upper and lower bounds for outliers
    lower_bound = q1 - fac * iqr
    upper_bound = q3 + fac * iqr

    # Filter outliers and update the DataFrame
    outliers_df = dataframe.filter((col(column) < lower_bound) | (col(column) > upper_bound) | (col(column) < 1))
    
    if not outliers_df.rdd.isEmpty():
        print("Outlier found")
        outliers_df.display()
        cleaned_df = dataframe.filter((col(column) >= lower_bound) & (col(column) <= upper_bound) & (col(column) >= 1))
        return cleaned_df, outliers_df
    else:
        print("No outlier")
        return dataframe, outliers_df

In [0]:
def _validate_schema(df, expected_schema) -> bool:

    actual_schema = df.schema

    # Veryfing the number of columns
    if len(expected_schema.fields) != len(actual_schema.fields):
        return False

    # Verfying the data types and names
    for i, field in enumerate(actual_schema.fields):
        expected_field = expected_schema.fields[i]
        if field.name != expected_field.name or not isinstance(field.dataType, type(expected_field.dataType)):
            print(f"Error: {field.name}")
            print(f"Expected: {expected_field}, Find: {field}")
            return False

    return True


In [0]:

from delta.tables import DeltaTable
from pyspark.sql import DataFrame, functions as F

def upsert_table(transformed_df: DataFrame, target_table: str, primary_keys: list, schema: str, catalog: str, not_matched_by_source_action: str = None, not_matched_by_source_condition: str = None):
    
    # Construct the full table name with catalog and schema
    full_table_name = f"{catalog}.{schema}.{target_table}"
    
    # Verifying landing table existence
    if not spark.catalog.tableExists(full_table_name):
        # If not, create it using primary key as columns 
        transformed_df.write.format("delta").saveAsTable(full_table_name)
        print(f"Table {full_table_name} created.")
        return
    
    # Creating merge condition
    merge_condition = " AND ".join([f"s.{key} = t.{key}" for key in primary_keys])

    # Load existing delta table as a DataFrame
    delta_table = DeltaTable.forName(spark, full_table_name)

    # Initiate merge operation
    merge_builder = delta_table.alias("t").merge(
        transformed_df.alias("s"),
        merge_condition
    )

    # When matched clause
    merge_builder = merge_builder.whenMatchedUpdateAll()

    # When not matched clause
    merge_builder = merge_builder.whenNotMatchedInsertAll()

    # If needed, use the not_matched_by_source_action as DELETE
    if not_matched_by_source_action and not_matched_by_source_action.upper() == "DELETE":
        unmatched_rows = delta_table.toDF().alias("t").join(
            transformed_df.alias("s"),
            on=[F.col(f"t.{key}") == F.col(f"s.{key}") for key in primary_keys],
            how="left_anti"
        )
        # Apply the condition if needed
        if not_matched_by_source_condition:
            unmatched_rows = unmatched_rows.filter(not_matched_by_source_condition)

        delta_table.alias("t").merge(
            unmatched_rows.alias("s"),
            merge_condition
        ).whenMatchedDelete().execute()

    # Execute merge
    merge_builder.execute()
    
    print("Upsert executed")

In [0]:

def compare_lengths(df1, df2):

    len_df1 = df1.count()
    len_df2 = df2.count()
    
    if len_df1 == len_df2:
        return f"Both DataFrames have the same number of rows: {len_df1}"
    else:
        return f"The DataFrames have different numbers of rows: {len_df1} and {len_df2}"
