In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType

##Create Transformation Payments table

In [0]:
payments = spark.table("default.payments")
display(payments.head(10))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


In [0]:
payments.dropDuplicates(["payment_id"]).write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable("default.transformation_payments")

## Create Transformation Stations table

In [0]:
stations = spark.table("default.stations")
display(stations.head(10))

station_id,name,latitude,longitude
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001
TA1306000012,Wells St & Huron St,41.89475366666667,-87.63440200000001


In [0]:
stations.dropDuplicates(["station_id"]).write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable("default.transformation_stations")

## Create Transformation Riders table

In [0]:
riders = spark.table("default.riders")
display(riders.head(10))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True
1010,Tracy,Austin,92973 Mary Ville,1996-04-07,2019-12-27,,True


In [0]:
riders.dropDuplicates(["rider_id"]).write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable("default.transformation_riders")

## Create Transformation Trips table

In [0]:
trips = spark.table("default.trips")
display(trips.head(10))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24 15:43:33,2021-02-24 15:49:05,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01 17:47:42,2021-02-01 17:48:33,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11 18:33:53,2021-02-11 18:35:09,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27 15:13:39,2021-02-27 15:36:36,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20 08:59:42,2021-02-20 09:17:04,KP1705001026,KP1705001026,57068
A977EB7FE7F5CD3A,classic_bike,2021-02-20 08:58:16,2021-02-20 08:58:41,KP1705001026,KP1705001026,32712


In [0]:
trips.dropDuplicates(["trip_id"]).write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable("default.transformation_trips")

## Create Bike Type table

In [0]:
# bike_list = trips.select('rideable_type').distinct().collect()
bike_list = trips.rdd.map(lambda x: x.rideable_type).collect()
bike_list = list(set(bike_list))

bike_type_list = []
for i in bike_list:
    bike_type_list.append([bike_list.index(i), i])
# bike_type = (bike_list, index =[1, 2, 3], columns =['Bike Type'])

In [0]:
bike_columns = ["biketype_id", "biketype"]
biketype_df = spark.createDataFrame(data=bike_type_list, schema = bike_columns)
display(biketype_df)

biketype_id,biketype
0,electric_bike
1,classic_bike
2,docked_bike


In [0]:
biketype_df.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable("default.transformation_biketypes")

## Create table Date table

In [0]:
day_df = spark.sql('SELECT start_at, ended_at FROM transformation_trips')
startDate, endDate = day_df.select(min('start_at'), max('ended_at')).first()
spark.sql(f"SELECT EXPLODE(SEQUENCE(TO_DATE('{startDate}'), TO_DATE('{endDate}'), INTERVAL 1 DAY)) as date").createOrReplaceTempView('dates')

In [0]:
%sql SELECT * FROM dates

date
2021-02-01
2021-02-02
2021-02-03
2021-02-04
2021-02-05
2021-02-06
2021-02-07
2021-02-08
2021-02-09
2021-02-10


In [0]:
transformation_date = spark.sql("SELECT \
                        date \
                        , DAY(date) AS Day \
                        , MONTH(date) AS Month \
                        , YEAR(date) AS Year \
                        , WEEKDAY(date) AS Weekday \
                        , CASE QUARTER(date) WHEN 1 THEN \"Q1\" WHEN 2 THEN \"Q2\" WHEN 3 THEN \"Q3\" WHEN 4 THEN \"Q4\" END AS Quarter \
                        FROM dates"
                      )

transformation_date.printSchema()
display (transformation_date)

root
 |-- date: date (nullable = false)
 |-- Day: integer (nullable = false)
 |-- Month: integer (nullable = false)
 |-- Year: integer (nullable = false)
 |-- Weekday: integer (nullable = false)
 |-- Quarter: string (nullable = true)



date,Day,Month,Year,Weekday,Quarter
2021-02-01,1,2,2021,0,Q1
2021-02-02,2,2,2021,1,Q1
2021-02-03,3,2,2021,2,Q1
2021-02-04,4,2,2021,3,Q1
2021-02-05,5,2,2021,4,Q1
2021-02-06,6,2,2021,5,Q1
2021-02-07,7,2,2021,6,Q1
2021-02-08,8,2,2021,0,Q1
2021-02-09,9,2,2021,1,Q1
2021-02-10,10,2,2021,2,Q1


In [0]:
transformation_date.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable("default.transformation_date")