
### Extracting checkpoint, Bronze, silver containers URL

In [0]:
checkpoints = spark.sql("""DESCRIBE EXTERNAL LOCATION `checkpoints`""").select('url').collect()[0][0]
bronze = spark.sql("""DESCRIBE EXTERNAL LOCATION `bronze`""").select('url').collect()[0][0]
silver = spark.sql("""DESCRIBE EXTERNAL LOCATION `silver`""").select('url').collect()[0][0]

In [0]:
print(checkpoints)
print(bronze)
print(silver)

In [0]:
dbutils.widgets.text(name="env",defaultValue = '', label='Enter the environment in lower case')
env = dbutils.widgets.get('env')

In [0]:
df_bronzeTraffic = spark.readStream.table("`dev_catalog`.`bronze`.`raw_traffic`")

### Handling duplicate rows

In [0]:
def removeDuplicate(df):
    print('Remove duplication values: ',end='')
    df.dropDuplicates()
    print('Success!')
    return df

In [0]:
# Handling Null Values by replacing them

In [0]:
def handle_NULLs(df,columns):
    print('Handling NULL values on String cloumns with "unknown" ',end='')
    df_string = df.fillna('Unknown',subset=columns)
    print('Success!')

    print('Handling NULL values on String cloumns with "0" ',end='')
    df_clean = df_string.fillna(0,subset=columns)
    print('Success')

    return df_clean

### Getting count of electric vehicles by creating new column

In [0]:
from pyspark.sql.functions import col
def ev_Count(df):
    print('Creating Electric Vehicles Count Column : ', end='')
    df_ev = df.withColumn('Electric_Vehicles_Count',
                            col('EV_Car') + col('EV_Bike')
                            )
    
    print('Success!! ')
    return df_ev


#### Calling functions

In [0]:
#To remove duplicate rows

df_dups = removeDuplicate(df_bronzeTraffic)

#To replace any Null values
df_nulls = handle_NULLs(df_dups,df_dups.schema.names)

# To get the total EV_count

df_ev = ev_Count(df_nulls)

In [0]:
df_ev.printSchema()

### Creating columns to get Count of all motor vehicles

In [0]:
from pyspark.sql.functions import col
def motor_vehicles_count(df):
    print('Creating Motor Vehicles Count Column : ', end='')
    df_mv = df.withColumn('Mortor_Vehicles_Count',
                            col('Two_wheeled_motor_vehicles') + col('Cars_and_taxis') + col('Buses_and_coaches') + col('LGV_Type') + col('HGV_Type') + col('Electric_Vehicles_Count')
                            )
    
    print('Success!! ')
    return df_mv

In [0]:
df_mv = motor_vehicles_count(df_ev)

### Transforming time

In [0]:
from pyspark.sql.functions import current_timestamp
def create_TransformedTime(df):
    print('Creating Transformed time column:',end='')
    df_timestamp = df.withColumn('Transformed_Time',current_timestamp())
    print('Success')
    return df_timestamp

In [0]:
df_transformedTime = create_TransformedTime(df_mv)


### Write the transformed data to silver Traffic table

In [0]:

df_transformedTime.writeStream \
    .format("delta") \
    .option("checkpointLocation",f'{checkpoints}/SilverTrafficLoad/Checkpt') \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .toTable("`dev_catalog`.`silver`.`silver_traffic`")