In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DateType, DoubleType, StringType, BooleanType, FloatType
from pyspark.sql.functions import date_format

# From CSV to Delta

In [0]:
table_dict={"riders":["rider_id","first","last","address","birthday","startdate","enddate","member_flag"],
      "stations":["station_id","name","latitude","logtitude"],
      "payments":["payment_id","date","amount","account_number"],
      "trips":["trip_id","rideable_type","start_at","ent_at","start_station_id","end_station_id","member_id"]
     }

In [0]:
for tablename, columns in table_dict.items():
    print(tablename, columns)
    df =spark.read.format("csv").option("inferSchema","false").option("sep",",").load("/FileStore/bronze/{}.csv".format(tablename))
    df = df.toDF(*columns)
    
    print("before dropduplicate row count",df.count())
    df.drop_duplicates()
    df.dropDuplicates([columns[0]])
    print("after dropduplicate row count",df.count())
    
    display(df)
    df.write.format("delta").mode("overwrite").save("/delta/bronze/{}".format(tablename)) 

rider_id,first,last,address,birthday,startdate,enddate,member_flag
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True


station_id,name,latitude,logtitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


payment_id,date,amount,account_number
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


trip_id,rideable_type,start_at,ent_at,start_station_id,end_station_id,member_id
89E7AA6C29227EFF,classic_bike,2021-02-12 16:14:56,2021-02-12 16:21:43,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24 15:43:33,2021-02-24 15:49:05,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01 17:47:42,2021-02-01 17:48:33,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11 18:33:53,2021-02-11 18:35:09,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27 15:13:39,2021-02-27 15:36:36,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20 08:59:42,2021-02-20 09:17:04,KP1705001026,KP1705001026,57068


# CREATE DELTA TABLE
## for Bronze

In [0]:

spark.sql("CREATE TABLE IF NOT EXISTS RIDERS USING DELTA LOCATION '/delta/bronze/riders'")
spark.sql("CREATE TABLE IF NOT EXISTS STATIONS USING DELTA LOCATION '/delta/bronze/stations'")
spark.sql("CREATE TABLE IF NOT EXISTS PAYMENTS USING DELTA LOCATION '/delta/bronze/payments'")
spark.sql("CREATE TABLE IF NOT EXISTS TRIPS USING DELTA LOCATION '/delta/bronze/trips'")

# Transform to fact and dimension from Bronze
## for Gold

In [0]:
rider_df = spark.sql('SELECT rider_id,birthday,startdate,member_flag FROM RIDERS')
rider_df.printSchema()


rider_df =rider_df.withColumn("AccountStartAge", F.floor(F.datediff(F.current_timestamp(), F.col("birthday"))/365.25).cast(IntegerType()))
rider_df =rider_df.withColumn('rider_id', F.col('rider_id').cast(IntegerType())).withColumn('birthday', F.col('birthday').cast(DateType())).withColumn('startdate', F.col('startdate').cast(DateType())).withColumn('member_flag', F.col('member_flag').cast(BooleanType()))

rider_df.printSchema()
display(rider_df)

#rider_df.write.format("delta").mode("overwrite").option("mergeSchema","true").save("/delta/gold/riders_dimension")

rider_df.write.format("delta").mode("append").saveAsTable("rider_dimension")

rider_id,birthday,startdate,member_flag,AccountStartAge
1000,1989-02-13,2019-04-23,True,33
1001,1976-08-10,2019-11-01,True,46
1002,1998-08-10,2022-02-04,True,24
1003,1999-03-29,2019-08-26,False,23
1004,1969-04-11,2019-09-14,True,53
1005,1974-08-27,2020-03-24,False,48
1006,2004-01-30,2020-11-27,True,18
1007,1988-01-11,2016-12-11,False,34
1008,1987-02-21,2021-03-28,True,35
1009,1981-02-07,2020-06-12,True,41


In [0]:
stations_df = spark.sql('SELECT station_id,name FROM STATIONS')
stations_df.printSchema()


stations_df =stations_df.withColumn('station_id', F.col('station_id').cast(StringType())).withColumn('name', F.col('name').cast(StringType()))

stations_df.printSchema()
display(stations_df)

#stations_df.write.format("delta").mode("overwrite").save("/delta/gold/stations_dimension")
stations_df.write.format("delta").mode("append").saveAsTable("station_dimension")


station_id,name
525,Glenwood Ave & Touhy Ave
KA1503000012,Clark St & Lake St
637,Wood St & Chicago Ave
13216,State St & 33rd St
18003,Fairbanks St & Superior St
KP1705001026,LaSalle Dr & Huron St
13253,Lincoln Ave & Waveland Ave
KA1503000044,Rush St & Hubbard St
KA1504000140,Winchester Ave & Elston Ave
TA1305000032,Clinton St & Madison St


In [0]:
payments_df = spark.sql('SELECT payment_id,date,amount,account_number FROM PAYMENTS')
payments_df.printSchema()


payments_df =payments_df.withColumn('payment_id', F.col('payment_id').cast(StringType())).withColumn('date', F.col('date').cast(DateType())).withColumn('amount', F.col('amount').cast(IntegerType())).withColumn('account_number', F.col('account_number').cast(IntegerType()))


payments_df.printSchema()
display(payments_df)

#payments_df.write.format("delta").mode("overwrite").save("/delta/gold/payments_fact")
payments_df.write.format("delta").mode("append").saveAsTable("payment_fact")


payment_id,date,amount,account_number
1,2019-05-01,9,1000
2,2019-06-01,9,1000
3,2019-07-01,9,1000
4,2019-08-01,9,1000
5,2019-09-01,9,1000
6,2019-10-01,9,1000
7,2019-11-01,9,1000
8,2019-12-01,9,1000
9,2020-01-01,9,1000
10,2020-02-01,9,1000


In [0]:
trips_df = spark.sql('SELECT trip_id,start_at,ent_at,start_station_id,member_id FROM TRIPS')
trips_df.printSchema()


trips_df = trips_df\
            .withColumn('trip_id', F.col('trip_id').cast(StringType()))\
            .withColumn('start_at', F.col('start_at').cast(TimestampType()))\
            .withColumn('ent_at', F.col('ent_at').cast(TimestampType()))\
            .withColumn('start_station_id', F.col('start_station_id').cast(StringType()))\
            .withColumn('member_id', F.col('member_id').cast(IntegerType()))

trips_df = trips_df\
            .join(rider_df, trips_df.member_id == rider_df.rider_id, "left")\
            .select('trip_id','start_at','ent_at','start_station_id','member_id','birthday')

trips_df.printSchema()
display(trips_df)
                     
trips_df =trips_df\
            .withColumn('Druation', F.col("ent_at").cast("long")-F.col("start_at").cast("long"))\
            .withColumn('Age_at_Time_of_Trip', F.floor(F.datediff(F.col("start_at"), F.col("birthday"))/365.25).cast(IntegerType()))\
            .withColumn('timeDay', date_format('start_at', 'HH'))

trips_df =trips_df\
            .withColumn('start_at', F.col('start_at').cast(DateType()))\
            .withColumn('ent_at', F.col('ent_at').cast(DateType()))\
            .drop("birthday")

trips_df.printSchema()
display(trips_df)

#trips_df.write.format("delta").mode("overwrite").save("/delta/gold/trip_fact")
trips_df.write.format("delta").mode("append").saveAsTable("trip_fact")

trip_id,start_at,ent_at,start_station_id,member_id,birthday
89E7AA6C29227EFF,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,71934,1983-08-26
0FEFDE2603568365,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,47854,1982-12-10
E6159D746B2DBB91,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,70870,1987-12-22
B32D3199F1C2E75B,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,58974,2002-01-07
83E463F23575F4BF,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,39608,1949-06-15
BDAA7E3494E8D545,2021-02-24T15:43:33.000+0000,2021-02-24T15:49:05.000+0000,18003,36267,1994-01-07
A772742351171257,2021-02-01T17:47:42.000+0000,2021-02-01T17:48:33.000+0000,KP1705001026,50104,1987-06-22
295476889D9B79F8,2021-02-11T18:33:53.000+0000,2021-02-11T18:35:09.000+0000,18003,19618,1997-02-14
362087194BA4CC9A,2021-02-27T15:13:39.000+0000,2021-02-27T15:36:36.000+0000,KP1705001026,16732,2001-07-01
21630F715038CCB0,2021-02-20T08:59:42.000+0000,2021-02-20T09:17:04.000+0000,KP1705001026,57068,1973-11-28


trip_id,start_at,ent_at,start_station_id,member_id,Druation,Age_at_Time_of_Trip,timeDay
89E7AA6C29227EFF,2021-02-12,2021-02-12,525,71934,407,37,16
0FEFDE2603568365,2021-02-14,2021-02-14,525,47854,1171,38,17
E6159D746B2DBB91,2021-02-09,2021-02-09,KA1503000012,70870,532,33,19
B32D3199F1C2E75B,2021-02-02,2021-02-02,637,58974,265,19,17
83E463F23575F4BF,2021-02-23,2021-02-23,13216,39608,914,71,15
BDAA7E3494E8D545,2021-02-24,2021-02-24,18003,36267,332,27,15
A772742351171257,2021-02-01,2021-02-01,KP1705001026,50104,51,33,17
295476889D9B79F8,2021-02-11,2021-02-11,18003,19618,76,23,18
362087194BA4CC9A,2021-02-27,2021-02-27,KP1705001026,16732,1377,19,15
21630F715038CCB0,2021-02-20,2021-02-20,KP1705001026,57068,1042,47,8


In [0]:
#spark.sql("CREATE TABLE IF NOT EXISTS TRIP_FACT USING DELTA LOCATION '/delta/gold/trip_fact'")
#spark.sql("CREATE TABLE IF NOT EXISTS PAYMENT_FACT USING DELTA LOCATION '/delta/gold/payments_fact'")
#spark.sql("CREATE TABLE IF NOT EXISTS STATION_DIMENSION USING DELTA LOCATION '/delta/gold/stations_dimension'")
#spark.sql("CREATE TABLE IF NOT EXISTS RIDER_DIMENSION USING DELTA LOCATION '/delta/gold/riders_dimension'")

# Delta Lake Utility

In [0]:
%sql
DESCRIBE HISTORY RIDERS


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2022-10-11T00:47:02.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,2.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-10-10T23:43:05.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,1.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-10-10T23:34:08.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,0.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-10-10T13:44:50.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12


In [0]:

%sql
OPTIMIZE RIDERS


path,metrics
dbfs:/delta/bronze/riders,"List(1, 2, List(2196373, 2196373, 2196373.0, 1, 2196373), List(603586, 1672344, 1137965.0, 2, 2275930), 0, null, 1, 2, 0, true)"


In [0]:
%sql
DESCRIBE HISTORY RIDERS

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2022-10-11T00:56:50.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [], batchId -> 0, auto -> false)",,List(4340680823722087),1010-004202-lpuvrl68,3.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 2275930, p25FileSize -> 2196373, minFileSize -> 2196373, numAddedFiles -> 1, maxFileSize -> 2196373, p75FileSize -> 2196373, p50FileSize -> 2196373, numAddedBytes -> 2196373)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-10-11T00:47:02.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,2.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-10-10T23:43:05.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,1.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-10-10T23:34:08.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,0.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-10-10T13:44:50.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12


In [0]:
%sql
VACUUM RIDERS

path
dbfs:/delta/bronze/riders


In [0]:
%sql
DESCRIBE HISTORY RIDERS

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
6,2022-10-11T01:02:54.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,VACUUM END,Map(status -> COMPLETED),,List(4340680823722087),1010-004202-lpuvrl68,5.0,SnapshotIsolation,True,"Map(numDeletedFiles -> 0, numVacuumedDirectories -> 1)",,Databricks-Runtime/10.4.x-scala2.12
5,2022-10-11T01:02:51.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,VACUUM START,"Map(retentionCheckEnabled -> true, defaultRetentionMillis -> 604800000)",,List(4340680823722087),1010-004202-lpuvrl68,4.0,SnapshotIsolation,True,Map(numFilesToDelete -> 0),,Databricks-Runtime/10.4.x-scala2.12
4,2022-10-11T00:56:50.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,OPTIMIZE,"Map(predicate -> [], zOrderBy -> [], batchId -> 0, auto -> false)",,List(4340680823722087),1010-004202-lpuvrl68,3.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 2275930, p25FileSize -> 2196373, minFileSize -> 2196373, numAddedFiles -> 1, maxFileSize -> 2196373, p75FileSize -> 2196373, p50FileSize -> 2196373, numAddedBytes -> 2196373)",,Databricks-Runtime/10.4.x-scala2.12
3,2022-10-11T00:47:02.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,2.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
2,2022-10-10T23:43:05.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,1.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
1,2022-10-10T23:34:08.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,0.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
0,2022-10-10T13:44:50.000+0000,318730776510627,student_7pmnze2zl8ta5tuv_00826808@vocareumvocareum.onmicrosoft.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4340680823722087),1010-004202-lpuvrl68,,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 75000, numOutputBytes -> 2275930)",,Databricks-Runtime/10.4.x-scala2.12
