In [0]:
%run "/Workspace/Project/04.Common Notebook"


In [0]:
dbutils.widgets.text("Enter the environment","")
env = dbutils.widgets.get("Enter the environment")

In [0]:
from pyspark.sql.functions import *

In [0]:
def read_raw_road_data(environment):
    # Read the live/streaming road data from the bronze table in dev_catalog
    road_data = spark.readStream.table(f"{environment}_catalog.bronze.raw_roads")
    return road_data
            

In [0]:
def adding_columns(df):
    # Print status before adding the new column
    print("Creating road category name column: ", end=" ")
    
     # Ensure 'road_category_id' is treated as a string
    add_df = df.withColumn("Road_Category", col("Road_Category").cast("string"))


    # Add a new column 'road_category_name' based on the value of 'road_category_id'
    df = add_df.withColumn("road_category_name",
                       when(col("Road_Category") == "TA", "class A Trunk Road")
                       .when(col("Road_Category") == "TM", "class A Trunk Motor")
                       .when(col("Road_Category") == "PA", "class A Principal Road")
                       .when(col("Road_Category") == "PM", "class B Principal Motorway")
                       .when(col("Road_Category") == "M", "class B Road")
                       .otherwise("NA"))  # Default value if none match
    
    df = df.withColumn("Road_type",when(col("road_category_name").like("class A%"), "Mojar")
                                  .when(col("road_category_name").like("class B%"), "Minor"))
    
     # ➤ Add the 'Transformed time' column to the DataFrame for Incremental loading
    df = df.withColumn('TRANSFORMED_TIME', current_timestamp())

    # Print success confirmation
    print("Success !!")
    print("****************************************************************************************")

    # Return the modified DataFrame
    return df

In [0]:
def road_renaming_columns(df, columns):
    # Print message before starting to rename
    print('Renaming columns (converting to UPPERCASE only): ', end=' ')
    
    # Loop through each column name
    for col_name in columns:
        # Change the column name to UPPERCASE
        new_col_name = col_name.upper()
        # Rename the column in the DataFrame
        df = df.withColumnRenamed(col_name, new_col_name)
    
    # Print success message
    print('Success !!')
    print("****************************************************************************************")
    
    # Show the updated DataFrame
    display(df)
    
    # Return the DataFrame with renamed columns
    return df


In [0]:
def write_raw_road_SilverTable(df, environment):
    print('Starting to write the Silver Road Table:', end=' ')

    #  Write the streaming DataFrame to the silver Delta table
    df.writeStream \
        .format("delta") \
        .option("checkpointLocation", checkpoint_path + "/SilverawRoadLoad/Checkpt") \
        .outputMode("append") \
        .queryName("SilverRoadWriteStream") \
        .trigger(availableNow=True) \
        .toTable(f"{environment}_catalog.silver.silver_Roads")

    print('Success !!')
    print("****************************************************************************************")

In [0]:
# Call the function to read road data for the given environment
Read_road_df = read_raw_road_data(env)

rm_df = remove_duplicates(Read_road_df)
rm_col = rm_df.columns

null_df = handling_nulls(rm_df, rm_col)

# Add new columns to the road data
add_df = adding_columns(null_df)

# Get the list of column names from the DataFrame
df_col = add_df.columns

# Rename the columns using the list of column names
result_df = road_renaming_columns(add_df, df_col)


write_raw_road_SilverTable(result_df, env)

In [0]:
%sql
SELECT COUNT(*) FROM `dev_catalog`.`silver`.`silver_roads`