In [0]:
dbutils.widgets.dropdown(name = 'environment', choices= ['dev', 'uat', 'prod'], defaultValue = 'dev', label = 'select your environment')
environment = dbutils.widgets.get('environment')

In [0]:
# %pip install great_expectations==1.10.0

In [0]:
#dbutils.library.restartPython()

In [0]:
%run "./04_common"

In [0]:
import great_expectations_common as gec
print(dir(gec))

In [0]:
import importlib
import great_expectations_common as gec

# 1. 强制重新加载模块
importlib.reload(gec)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
schema_traffic = StructType([
        StructField("Record_ID",IntegerType()),
        StructField("Count_point_id",IntegerType()),
        StructField("Direction_of_travel",StringType()),
        StructField("Year",IntegerType()),
        StructField("Count_date",StringType()),
        StructField("hour",IntegerType()),
        StructField("Region_id",IntegerType()),
        StructField("Region_name",StringType()),
        StructField("Local_authority_name",StringType()),
        StructField("Road_name",StringType()),
        StructField("Road_Category_ID",IntegerType()),
        StructField("Start_junction_road_name",StringType()),
        StructField("End_junction_road_name",StringType()),
        StructField("Latitude",DoubleType()),
        StructField("Longitude",DoubleType()),
        StructField("Link_length_km",DoubleType()),
        StructField("Pedal_cycles",IntegerType()),
        StructField("Two_wheeled_motor_vehicles",IntegerType()),
        StructField("Cars_and_taxis",IntegerType()),
        StructField("Buses_and_coaches",IntegerType()),
        StructField("LGV_Type",IntegerType()),
        StructField("HGV_Type",IntegerType()),
        StructField("EV_Car",IntegerType()),
        StructField("EV_Bike",IntegerType())
    ])

In [0]:
schema_roads =  StructType([
        StructField('Road_ID',IntegerType()),
        StructField('Road_Category_Id',IntegerType()),
        StructField('Road_Category',StringType()),
        StructField('Region_ID',IntegerType()),
        StructField('Region_Name',StringType()),
        StructField('Total_Link_Length_Km',DoubleType()),
        StructField('Total_Link_Length_Miles',DoubleType()),
        StructField('All_Motor_Vehicles',DoubleType())
        
        ])

In [0]:
def load_raw_traffic(schema):
    rawTraffic_stream = spark.readStream\
                    .format('cloudFiles')\
                    .option('cloudFiles.format', 'csv')\
                    .option('cloudFiles.schemaLocation', checkpoint_url + 'rawTraffic_load/schema')\
                    .option('header', 'true')\
                    .option("pathGlobFilter", "raw_traffic*.csv")\
                    .schema(schema)\
                    .load(landing_url)\
                    .withColumn('Extract_Time', current_timestamp())
    print('read raw traffic successfully')
    return rawTraffic_stream

In [0]:
def load_raw_roads(schema):
    rawTraffic_stream = spark.readStream\
                    .format('cloudFiles')\
                    .option('cloudFiles.format', 'csv')\
                    .option('cloudFiles.schemaLocation', checkpoint_url + 'rawRoads_load/schema')\
                    .option('header', 'true')\
                    .option("pathGlobFilter", "raw_roads*.csv")\
                    .schema(schema)\
                    .load(landing_url)
    print('read raw roads successfully')
    return rawTraffic_stream

In [0]:
def write_raw_traffic(rawTraffic_stream, environment):
    catalog_name = f"traffic_{environment}_catalog"
    schema_name = db_name 
    table_name = "raw_traffic"
    
    rawTraffic_stream.writeStream \
        .foreachBatch(lambda batch_df, b_id: gec.validate_and_insert_process_batch(
            df=batch_df,           
            batch_id=b_id,         
            table_name=table_name, 
            catalog=catalog_name,  
            schema=schema_name     
        )) \
        .option("checkpointLocation", checkpoint_url + 'rawTraffic_load/checkpoint') \
        .trigger(availableNow=True) \
        .start()
    
    print(f'Successfully started writing {table_name} to {catalog_name}')


In [0]:
def write_raw_roads(rawRoads_stream, environment):
    catalog_name = f"traffic_{environment}_catalog"
    schema_name = db_name 
    table_name = "raw_roads"
    
    rawRoads_stream.writeStream \
        .foreachBatch(lambda batch_df, b_id: gec.validate_and_insert_process_batch(
            df=batch_df,           
            batch_id=b_id,         
            table_name=table_name, 
            catalog=catalog_name,  
            schema=schema_name     
        )) \
        .option("checkpointLocation", checkpoint_url + 'rawRoads_load/checkpoint') \
        .trigger(availableNow=True) \
        .start()
    
    print(f'Successfully started writing {table_name} to {catalog_name}')

In [0]:
rawTraffic_stream = load_raw_traffic(schema_traffic)
rawRoads_stream = load_raw_roads(schema_roads)



In [0]:
write_raw_traffic(rawTraffic_stream, environment)

In [0]:
write_raw_roads(rawRoads_stream,environment)