## Silver Layer Ingestion


In [0]:
%sql
select count(*) from `demo-taxi`.`bronze-layer`.green_bronze

In [0]:
%sql

USE CATALOG `demo-taxi`;
CREATE SCHEMA IF NOT EXISTS `Silver-Layer`;


In [0]:
%sql
select * from `demo-taxi`.`bronze-layer`.green_bronze limit 100

In [0]:
from pyspark.sql.functions import col, to_date, hour,when,lit,cast,expr,dayofweek,nullif
 
def process_green_silver():

    df_green = (
            spark.readStream.table("`demo-taxi`.`bronze-layer`.green_bronze")
            .filter("passenger_count > 0 and lpep_pickup_datetime is not null and lpep_dropoff_datetime is not null and passenger_count < 6 and trip_distance < 500 and fare_amount<1000")
            .withColumnRenamed("lpep_pickup_datetime","pickup_datetime")
            .withColumnRenamed("lpep_dropoff_datetime","dropoff_datetime")
            .withColumn("trip_duration_minutes",expr("datediff(MINUTE,pickup_datetime,dropoff_datetime)"))
            .withColumn("trip_type",when(col("trip_type")==1,"Hail")
                                    .when(col("trip_type")==2,"Dispatch")
                                    .otherwise("Unknown"))
            .withColumn("payment_description",when(col("payment_type")==1,"Credit card")
                                    .when(col("payment_type")==2,"Cash")
                                    .when(col("payment_type")==3,"No charge")
                                    .when(col("payment_type")==4,"Dispute")
                                    .otherwise("Unknown"))    
            .withColumn("pickup_date", to_date(col("pickup_datetime")))
            .withColumn("dropoff_date", to_date(col("dropoff_datetime")))
            .withColumn("pickup_dayofweek", dayofweek(col("pickup_datetime")))
            .withColumn("dropoff_dayofweek", dayofweek(col("dropoff_datetime")))    
            .withColumn("pickup_hour",hour(col("pickup_datetime")))
            .withColumn("dropoff_hour",hour(col("dropoff_datetime")))
            .withColumn("fare_per_mile",expr("try_divide(fare_amount,trip_distance)"))
            .writeStream
            .outputMode("append")
            .option("checkpointLocation", "/Volumes/demo-taxi/bronze-layer/taxi_landing/checkpoint/silver/green/")
            .option("mergeSchema", "true")
            .trigger(availableNow=True)
            .toTable("`demo-taxi`.`silver-layer`.green_silver")
            )
    df_green.awaitTermination()

process_green_silver()


In [0]:
%sql
SELECT * FROM `demo-taxi`.`silver-layer`.green_silver LIMIT 100

In [0]:
%sql
SELECT * FROM `demo-taxi`.`bronze-layer`.yellow_bronze LIMIT 10

In [0]:

def process_yellow_silver():

    df_yellow = (
            spark.readStream.table("`demo-taxi`.`bronze-layer`.yellow_bronze")
            .filter("passenger_count > 0 and tpep_pickup_datetime is not null and tpep_dropoff_datetime is not null and passenger_count < 6 and trip_distance < 500 and fare_amount<1000")
            .withColumnRenamed("tpep_pickup_datetime","pickup_datetime")
            .withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")
            .withColumn("trip_duration_minutes",expr("datediff(MINUTE,pickup_datetime,dropoff_datetime)"))
            .withColumn("payment_description",when(col("payment_type")==1,"Credit card")
                                    .when(col("payment_type")==2,"Cash")
                                    .when(col("payment_type")==3,"No charge")
                                    .when(col("payment_type")==4,"Dispute")
                                    .otherwise("Unknown"))    
            .withColumn("pickup_date", to_date(col("pickup_datetime")))
            .withColumn("dropoff_date", to_date(col("dropoff_datetime")))
            .withColumn("trip_duration_minutes",expr("datediff(MINUTE,pickup_datetime,dropoff_datetime)"))
            .withColumn("pickup_dayofweek", dayofweek(col("pickup_datetime")))
            .withColumn("dropoff_dayofweek", dayofweek(col("dropoff_datetime")))
            .withColumn("pickup_hour",hour(col("pickup_datetime")))
            .withColumn("dropoff_hour",hour(col("dropoff_datetime")))
            .withColumn("fare_per_mile",expr("try_divide(fare_amount,trip_distance)"))
            .writeStream
            .outputMode("append")
            .option("checkpointLocation", "/Volumes/demo-taxi/bronze-layer/taxi_landing/checkpoint/silver/yellow/")
            .option("mergeSchema", "true")
            .trigger(availableNow=True)
            .toTable("`demo-taxi`.`silver-layer`.yellow_silver")
            )
    df_yellow.awaitTermination()



In [0]:
%sql

SELECT * FROM `demo-taxi`.`bronze-layer`.fhv_bronze LIMIT 100

In [0]:

def process_fhv_silver():
    
    hrv_df = (
            spark.readStream.table("`demo-taxi`.`bronze-layer`.fhv_bronze")
                .withColumnRenamed("dropOff_datetime","dropoff_datetime")
                .withColumn("pickup_date", to_date(col("pickup_datetime")))
                .withColumn("dropoff_date", to_date(col("dropoff_datetime")))
                .withColumn("pickup_hour",hour(col("pickup_datetime")))
                .withColumn("dropoff_hour",hour(col("dropoff_datetime")))
                .withColumn("pickup_dayofweek", dayofweek(col("pickup_datetime")))
                .withColumn("dropoff_dayofweek", dayofweek(col("dropoff_datetime")))
                .withColumn("trip_duration_minutes",expr("datediff(MINUTE,pickup_datetime,dropoff_datetime)"))
                .writeStream
                .outputMode("append")
                .option("checkpointLocation", "/Volumes/demo-taxi/bronze-layer/taxi_landing/checkpoint/silver/fhv/")
                .option("mergeSchema", "true")
                .trigger(availableNow=True)
                .toTable("`demo-taxi`.`silver-layer`.fhv_silver")
    )
    hrv_df.awaitTermination()

   





In [0]:
def process_silver_tables():
    process_green_silver()
    process_yellow_silver()
    process_fhv_silver()

In [0]:
process_silver_tables()

In [0]:
%sql
-- truncate table `demo-taxi`.`silver-layer`.green_silver;
-- truncate table `demo-taxi`.`silver-layer`.yellow_silver;
-- truncate table `demo-taxi`.`silver-layer`.fhv_silver;
    
select * from `demo-taxi`.`silver-layer`.green_silver limit 10


### All Trips 


In [0]:

green_df = spark.read.table("`demo-taxi`.`silver-layer`.green_silver")

yellow_df = spark.read.table("`demo-taxi`.`silver-layer`.yellow_silver")

columns_list = [item for item in green_df.columns if item in yellow_df.columns]

(green_df
    .select(columns_list).selectExpr("*", "cast('green' as string) as taxi_type")
    .union(yellow_df.select(columns_list).selectExpr("*", "cast('yellow' as string) as taxi_type"))
    .write.mode("overwrite")
    .saveAsTable("`demo-taxi`.`silver-layer`.all_trips")
)




In [0]:
%sql
SELECT * FROM `demo-taxi`.`silver-layer`.all_trips LIMIT 100