#### Origin
bronze table

In [0]:

CATALOG_NAME = "databricks_hackathon"
SCHEMA_NAME_ORIG = "bronze"
TABLE_NAME = "timeseries_bronze_adv01"
df_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME_ORIG}.{TABLE_NAME}")

In [0]:
n_rows = df_bronze.count()
n_cols = len(df_bronze.columns)
print("spark df shape: ", (n_rows, n_cols))

#### Destination
silver table

In [0]:
CATALOG_NAME = "databricks_hackathon"
SCHEMA_NAME_DEST = "silver"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG_NAME}.{SCHEMA_NAME_DEST}")

# Silver table
Objective : Create a table timeseries_silver from timeseries_raw_AdV02

Select : columns with > 80% filling

Convert : NaN with fillforward

Add : Derivative of column 'Biomass', 'Conductivity'

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit

def filter_nulls_threshold(df_bronze, threshold=0.1):
    """
    filter-out columns whose null count < threshold
    so that non-null value > 1-threshold
    """
    # Get stats for filtering
    desc_df = df_bronze.describe()
    count_row = desc_df.filter(F.col("summary") == "count").first()
    total_rows = df_bronze.count()

    # Identify columns to keep
    columns_to_keep = []
    for col_name in df_bronze.columns:
        try:
            non_null_count = int(count_row[col_name])
            if non_null_count / total_rows > threshold:  
                columns_to_keep.append(col_name)
        except:
            # If conversion fails, keep the column
            columns_to_keep.append(col_name)

    # Filter the Spark DataFrame
    df_filtered = df_bronze.select(columns_to_keep)
    df_filtered = (df_filtered
        .withColumn("filter_null_threshold", lit(threshold))
    )

    return df_filtered

# df_filtered = filter_nulls_threshold(df_bronze, threshold=0.2)

#### count the null 

In [0]:
from pyspark.sql.functions import col, sum, when

def count_nulls_per_column(df):
    """Calculates the count of null values for every column in a Spark DataFrame."""
    null_counts = [
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) 
        for c in df.columns
    ]
    return df.agg(*null_counts).collect()[0].asDict()

#null_counts_dict = count_nulls_per_column(df_filtered)
#for a,b in null_counts_dict.items():
#    print(a,"\t\t",b)

#### frowardfill null values / replace starting "NaN" by "0"

In [0]:
from pyspark.sql.window import Window


def forwardfill_null_values(df_filtered):
    # Apply forward fill one column at a time
    window_spec = Window.partitionBy("batch_id").orderBy("Timestamp").rowsBetween(Window.unboundedPreceding, 0)

    current_df = df_filtered
    for c in df_filtered.columns:
        if c not in ["Timestamp", "batch_id"]:
            all_cols = []
            for col_name in current_df.columns:
                if col_name == c:
                    all_cols.append(F.last(F.col(col_name), ignorenulls=True).over(window_spec).alias(col_name))
                    print(f"filled col {col_name}")
                else:
                    all_cols.append(F.col(col_name))
            
            current_df = current_df.select(*all_cols)
    current_df = current_df.fillna(0)
    return current_df

# current_df = forwardfill_null_values(df_filtered)
# null_counts_dict = count_nulls_per_column(current_df)
# for a,b in null_counts_dict.items():
#    print(a,"\t\t",b)

In [0]:
def create_or_merge_table(sdf,table_name):
    from delta.tables import DeltaTable
    from pyspark.sql.functions import current_timestamp, lit

    # Quoted FQN for SQL
    table_fqn = ".".join([f"`{a}`" for a in table_name.split(".")])  
    # Define the unique merge key 
    if table_name.endswith("timeseries"):
        merge_condition = f"""
            target.Timestamp = updates.Timestamp AND 
            target.batch_id = updates.batch_id AND
            target.source_file = updates.source_file
        """
    elif table_name.endswith("capacitance"):
        merge_condition = f"""
            target.Time_Stamp = updates.Time_Stamp AND 
            target.batch_id = updates.batch_id AND
            target.source_file = updates.source_file
        """

    # MERGE with existing table
    if spark.catalog.tableExists(table_name):

        deltaTable = DeltaTable.forName(spark, table_fqn)
        deltaTable.alias("target") \
            .merge(
                source=sdf.alias(f"updates"),
                condition=merge_condition
            ) \
            .whenNotMatchedInsertAll() \
            .execute()
        
        print(f"Data MERGED (upserted) into existing table: {table_name}. Duplicates avoided.")

    # create table with indexing for project and batch_id
    else:
        
        sdf.write \
            .format("delta") \
            .mode("overwrite") \
            .partitionBy("project", "batch_id") \
            .saveAsTable(table_name)
        print(f"New table created: {table_name}")


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, when


def calculate_derivative_and_filter(df,columns):
    w = Window.partitionBy("batch_id").orderBy("Time_Stamp")
    for c in columns:
        df = df.withColumn(f"prev_value_{c}", lag(c, 1).over(w))
        df = df.withColumn(f"{c}_derivative", col(c) - col(f"prev_value_{c}"))        
        df = df.withColumn(
            f"{c}_trend",
            when(col(f"{c}_derivative") > 0, "increasing")
            .when(col(f"{c}_derivative") < 0, "decreasing")
            .otherwise("no change")
        )  
    df = df.select('Time_Stamp',
                    'Biomass',
                    'Biomass_derivative',
                    'Biomass_trend',
                    'Conductivity',
                    'Conductivity_derivative',
                    'Conductivity_trend',
                    'ingestion_time',
                    'project',
                    'source_file',
                    'batch_id',)
    return df

SCHEMA_NAME_ORIG = 'bronze'
TABLE_NAME = "capacitance_bronze_adv01"
df_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME_ORIG}.{TABLE_NAME}")
df = calculate_derivative_and_filter(df_bronze,["Biomass","Conductivity"])
df.columns


In [0]:

CATALOG_NAME = "databricks_hackathon"
SCHEMA_NAME = "bronze"
SCHEMA_NAME_DEST = "silver"
tables = spark.catalog.listTables(dbName=f"{CATALOG_NAME}.{SCHEMA_NAME}")


cap_dfs=list()
time_dfs=list()
for table in tables:

    # for capacitance data, we select only clean data and calculate derivative
    if  table.name.startswith("capa") and table.name.split("_")[-1].startswith("adv"):
        print("Processing ", table.name)
        df_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.{table.name}")
        # discrete approximation of a derivative and selection of given columns
        final_df = calculate_derivative_and_filter(df,["Biomass","Conductivity"])
        cap_dfs.append(final_df)
        
    # for time serie data, we select data that has > 80% values and fill null values
    elif table.name.startswith("time") and table.name.split("_")[-1].startswith("adv"):
        print("Processing ", table.name)

        df_bronze = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.{table.name}")
        # cleaning
        df_filtered = filter_nulls_threshold(df_bronze, threshold=0.1)
        final_df = forwardfill_null_values(df_filtered)
        time_dfs.append(final_df)




In [0]:
def drop_void_columns(sdf):
    """Remove all VOID type columns"""
    cols_to_keep = [field.name for field in sdf.schema.fields 
                    if str(field.dataType) != "VoidType"]
    return sdf.select(cols_to_keep)
    
def create_or_merge_table_sql(sdf, table_name):

    sdf = drop_void_columns(sdf)
    table_fqn = ".".join([f"`{a}`" for a in table_name.split(".")])

    # Create a temporary view for the incoming data
    temp_view_name = f"updates_temp_{table_name.split('.')[-1]}"
    sdf.createOrReplaceTempView(temp_view_name)

    merge_sql = f"""
        MERGE INTO {table_fqn} AS target
        USING {temp_view_name} AS updates
        ON target.Timestamp = updates.Timestamp
           AND target.batch_id = updates.batch_id
           AND target.source_file = updates.source_file
        WHEN MATCHED THEN
            UPDATE SET *
        WHEN NOT MATCHED THEN
            INSERT *
    """

    try:
        # Try to run the merge
        _ = spark.sql(merge_sql)

        print(f"Data MERGED (upserted) into existing table: {table_name}. Duplicates avoided.")
    except Exception as e:

        if "Table not found" in str(e): 
             spark.sql(f"CREATE TABLE {table_fqn} AS SELECT * FROM {temp_view_name}")
             print(f"New table created: {table_name}")
        else:
             raise e



In [0]:
for table_name, df_list in {
    "capacitance": cap_dfs,
    "timeseries": time_dfs
}.items():
    # Get the union of all columns across all DataFrames
    all_columns = list(
        set().union(*[df.columns for df in df_list])
    )
    # Add missing columns with nulls and align order
    aligned_dfs = [
        df.select(
            [
                F.col(c) if c in df.columns else F.lit(None).cast("string").alias(c) # if empty columns we cast it as a string
                for c in all_columns
            ]
        )
        for df in df_list
    ]
    df_all = aligned_dfs[0]
    for i, df_n in enumerate(aligned_dfs[1:]):
        df_all = df_all.unionByName(
            df_n,
            allowMissingColumns=True
        )
        print(
            f"Unioning {table_name} DataFrame {i+1}/{len(df_list)}..."
        )

    create_or_merge_table(
        df_all,
        f"{CATALOG_NAME}.{SCHEMA_NAME_DEST}.{table_name}"
    )