In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
dbutils.widgets.text(name = "environment", defaultValue = "", label = "please type environment")
current_environment = dbutils.widgets.get("environment")

In [0]:
%python
checkpoint_location = spark.sql(""" DESCRIBE EXTERNAL LOCATION `checkpoints` """).select("url").collect()[0][0]
landing_location = spark.sql(""" DESCRIBE EXTERNAL LOCATION `landing` """).select("url").collect()[0][0]

### Creating a read_Traffic_Data() Function


In [0]:
def read_Traffic_data():
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
    from pyspark.sql.functions import current_timestamp
    traffic_schema = StructType([
    StructField("Record_ID",IntegerType()),
    StructField("Count_point_id",IntegerType()),
    StructField("Direction_of_travel",StringType()),
    StructField("Year",IntegerType()),
    StructField("Count_date",StringType()),
    StructField("hour",IntegerType()),
    StructField("Region_id",IntegerType()),
    StructField("Region_name",StringType()),
    StructField("Local_authority_name",StringType()),
    StructField("Road_name",StringType()),
    StructField("Road_Category_ID",IntegerType()),
    StructField("Start_junction_road_name",StringType()),
    StructField("End_junction_road_name",StringType()),
    StructField("Latitude",DoubleType()),
    StructField("Longitude",DoubleType()),
    StructField("Link_length_km",DoubleType()),
    StructField("Pedal_cycles",IntegerType()),
    StructField("Two_wheeled_motor_vehicles",IntegerType()),
    StructField("Cars_and_taxis",IntegerType()),
    StructField("Buses_and_coaches",IntegerType()),
    StructField("LGV_Type",IntegerType()),
    StructField("HGV_Type",IntegerType()),
    StructField("EV_Car",IntegerType()),
    StructField("EV_Bike",IntegerType())
])
 

    rawTraffic_stream = (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.schemaLocation", f"{checkpoint_location}/rawTrafficload/schemaInfer")
            .option("header", "true")
            .schema(traffic_schema)
            .load(landing_location + "/raw_traffic/")
            .withColumn("Extract_Time", current_timestamp())
    )
    print('Reading Success raw_traffic !!')
    print("______________________")

    return rawTraffic_stream




## craete raw_roads function()

In [0]:
def read_Road_data():
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
    from pyspark.sql.functions import current_timestamp
    rawRoad_schema = StructType([
        StructField('Road_ID',IntegerType()),
        StructField('Road_Category_Id',IntegerType()),
        StructField('Road_Category',StringType()),
        StructField('Region_ID',IntegerType()),
        StructField('Region_Name',StringType()),
        StructField('Total_Link_Length_Km',DoubleType()),
        StructField('Total_Link_Length_Miles',DoubleType()),
        StructField('All_Motor_Vehicles',DoubleType())
     ])
 
    rawRoad_stream = (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.schemaLocation", f"{checkpoint_location}/rawRoadload/schemaInfer")
            .option("header", "true")
            .schema(rawRoad_schema)
            .load(landing_location + "/raw_roads/")
            .withColumn("Extract_Time", current_timestamp())
    )
    print('Reading Success rawRoad !!')
    print("______________________")

    return rawRoad_stream

## write raw Road function

In [0]:
def write_rawRoad_data(StreamingDF, environment):
    write_stream = (
        StreamingDF.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_location}/rawRoadload/checkpt")
        .outputMode("append")
        .queryName('rawRoadwritestream')
        .trigger(availableNow=True)
        .toTable(f"`{environment}_catalog`.`bronze`.`raw_roads`")
    )
    write_stream.awaitTermination()
    print("write Success raw road")
    print("-------------------------")

## wirte traffic data function

In [0]:
def write_Traffic_data(StreamingDF, environment):
    write_stream = (
        StreamingDF.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_location}/rawTrafficload/checkpt")
        .outputMode("append")
        .queryName('rawTrafficwritestream')
        .trigger(availableNow=True)
        .toTable(f"`{environment}_catalog`.`bronze`.`raw_traffic`")
    )
    write_stream.awaitTermination()
    print("write Success raw traffic")
    print("-------------------------")

In [0]:
read_df_raw_Traffic = read_Traffic_data()

write_Traffic_data(read_df_raw_Traffic, current_environment)

In [0]:
read_df_raw_Traffic = read_Road_data()
write_rawRoad_data(read_df_raw_Traffic, current_environment)

In [0]:
%sql
SELECT max(Record_ID) FROM `dev_catalog`.`bronze`.`raw_traffic`