##Create table for OLTP "trips" data

In [0]:
from pyspark.sql.functions import spark_partition_id, col
from functools import reduce

In [0]:
raw_trips_df = spark.read.format('delta').load("/lakehouse/bronze/trips")

In [0]:
raw_trips_df.count()

4584921

In [0]:
#Checking data skew
raw_trips_df.groupBy(spark_partition_id().alias("Core")).count().show()

+----+-------+
|Core|  count|
+----+-------+
|   0|1158715|
|   1|1158143|
|   2|1158203|
|   3|1109860|
+----+-------+



In [0]:
#Renaming Columns
ColumnNames = ["trip_id", "rideable_type", "started_at", "ended_at", "start_station_id", "end_station_id", "rider_id"]
rawColumnNames = raw_trips_df.schema.names

trips_df = reduce(lambda df, _: df.withColumnRenamed(rawColumnNames[_], ColumnNames[_]), range(len(ColumnNames)), raw_trips_df)
trips_df.printSchema()

root
 |-- trip_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- rider_id: string (nullable = true)



In [0]:
# Retyping Columns
# Note: columns "start_station_id" and "end_station_id" are set as string,
# not as integer as defined in the project OLTP schema.
# We were loose some information where the stations ID were 
# containing character
trips_df = trips_df.withColumn("started_at", col("started_at").cast("Timestamp"))
trips_df = trips_df.withColumn("ended_at", col("ended_at").cast("Timestamp"))
trips_df = trips_df.withColumn("rider_id", col("rider_id").cast("Integer"))    
trips_df.printSchema()

root
 |-- trip_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- rider_id: integer (nullable = true)



In [0]:
# checking if no rows were lost in the last transformation
trips_df.count() == raw_trips_df.count()

True

In [0]:
trips_df.createOrReplaceTempView('oltp_trips')

In [0]:
# Checking for Null values
display(spark.sql('''
          SELECT 
            trip_id, rideable_type, started_at, ended_at, start_station_id, end_station_id, rider_id
          FROM 
            oltp_trips
          WHERE
            rideable_type IS NULL
            OR
            started_at IS NULL
            OR
            ended_at IS NULL
            OR
            start_station_id IS NULL
            OR
            end_station_id IS NULL
            OR
            rider_id IS NULL  

        '''))

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id


In [0]:
#checking for duplicates
display(spark.sql(''' 
                  SELECT 
                    trip_id 
                  FROM 
                    oltp_trips 
                  GROUP BY trip_id
                  HAVING count(trip_id) > 1  
                  '''
                ))

trip_id


In [0]:
# Min Max of started_at
display(spark.sql(''' 
                  SELECT 
                    min(started_at), max(started_at)
                  FROM 
                    oltp_trips 
                  '''
                ))

min(started_at),max(started_at)
2021-02-01T01:07:04Z,2022-01-31T23:58:37Z


In [0]:
# Min Max of ended_at
display(spark.sql(''' 
                  SELECT 
                    min(ended_at), max(ended_at)
                  FROM 
                    oltp_trips 
                  '''
                ))

min(ended_at),max(ended_at)
2021-02-01T01:47:45Z,2022-02-01T00:12:04Z


In [0]:
# saving in silver as delta 
trips_df.write.format("delta").mode("overwrite").save("/lakehouse/silver/oltp_trips")


In [0]:
# create a delta table
spark.read.format("delta").load("/lakehouse/silver/oltp_trips").write.format("delta").mode("overwrite").saveAsTable("silver_trips")