In [0]:
dbutils.widgets.text("env","")

In [0]:
env = dbutils.widgets.get("env")

In [0]:
checkpointLocation = "/Volumes/workspace/storage/containers/checkpoints/"
raw_traffic_file_path = "/Volumes/workspace/storage/containers/landing/raw_traffic/"
raw_roads_file_path = "/Volumes/workspace/storage/containers/landing/raw_roads/"

## Create Read and Write Functions

In [0]:
def read_raw_traffic_data(checkpointLocation,raw_traffic_file_path):
    from pyspark.sql.functions import current_timestamp
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

    traffic_record_schema = StructType([
    StructField("Record_ID", IntegerType()),
    StructField("Count_point_id", IntegerType()),
    StructField("Direction_of_travel", StringType()),
    StructField("Year", IntegerType()),
    StructField("Count_date", StringType()),
    StructField("hour", IntegerType()),
    StructField("Region_id", IntegerType()),
    StructField("Region_name", StringType()),
    StructField("Local_authority_name", StringType()),
    StructField("Road_name", StringType()),
    StructField("Road_Category_ID", IntegerType()),
    StructField("Start_junction_road_name", StringType()),
    StructField("End_junction_road_name", StringType()),
    StructField("Latitude", DoubleType()),
    StructField("Longitude", DoubleType()),
    StructField("Link_length_km", DoubleType()),
    StructField("Pedal_cycles", IntegerType()),
    StructField("Two_wheeled_motor_vehicles", IntegerType()),
    StructField("Cars_and_taxis", IntegerType()),
    StructField("Buses_and_coaches", IntegerType()),
    StructField("LGV_Type", IntegerType()),
    StructField("HGV_Type", IntegerType()),
    StructField("EV_Car", IntegerType()),
    StructField("EV_Bike", IntegerType()),
    StructField("Extract_Time", TimestampType())
    ])

    readStream = (spark.readStream
             .format("cloudFiles")
             .option("cloudFiles.format",'csv')
             .option("cloudFiles.schemaLocation", f"{checkpointLocation}/raw_traffic/schema")
             .option("header", True)
             .schema(traffic_record_schema)
             .load(raw_traffic_file_path)
             .withColumn("Extract_Time", current_timestamp())
             )
    print("Read Success for Raw Traffic")
    print("*****************************************************************")

    return readStream  


In [0]:
def write_raw_traffic_data(readStream, environment, checkpointLocation):
    writeStream = (readStream.writeStream
             .format("delta")
             .option("checkpointLocation", f"{checkpointLocation}/raw_traffic/checkpoint")
             .outputMode("append")
             .queryName("rawTrafficWriteStream")
             .trigger(availableNow=True)
             .toTable(f"`traffic_analysis_{environment}`.`bronze`.`raw_traffic`"))
    writeStream.awaitTermination()
    print("Write Success for Raw Traffic")
    print("*****************************************************************")
    

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.functions import current_timestamp

def read_raw_roads_data(checkpointLocation, raw_road_file_path):
    road_record_schema = StructType([
        StructField("Road_ID", IntegerType()),
        StructField("Road_Category_Id", IntegerType()),
        StructField("Road_Category", StringType()),
        StructField("Region_ID", IntegerType()),
        StructField("Region_Name", StringType()),
        StructField("Total_Link_Length_Km", DoubleType()),
        StructField("Total_Link_Length_Miles", DoubleType()),
        StructField("All_Motor_Vehicles", DoubleType())
    ])
    
    readStream = (spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format","csv")
                  .option("cloudFiles.schemaLocation",f"{checkpointLocation}/raw_roads/schema")
                  .option("header", True)
                  .schema(road_record_schema)
                  .load(raw_road_file_path)
                  )
    
    print("Read Success for Raw Roads")
    print("*****************************************************************")
    return readStream

In [0]:
def write_raw_roads_data(readStream, environment, checkpointLocation):
    writeStream = (readStream.writeStream
                   .format("delta")
                   .option("checkpointLocation", f"{checkpointLocation}/raw_roads/checkpoint")
                   .outputMode("append")
                   .queryName("rawRoadsWriteStream")
                   .trigger(availableNow=True)
                   .toTable(f"traffic_analysis_{environment}.bronze.raw_roads")
    )
    writeStream.awaitTermination()
    print("Write Success for Raw Roads")
    print("*****************************************************************")


##Calling Functions

In [0]:
read_raw_traffic_df = read_raw_traffic_data(checkpointLocation, raw_traffic_file_path)
write_raw_traffic_data(read_raw_traffic_df, env, checkpointLocation)
read_raw_roads_df = read_raw_roads_data(checkpointLocation, raw_roads_file_path)
write_raw_roads_data(read_raw_roads_df, env, checkpointLocation)