### Silver Layer
This is the cleaned and processed data layer. At this stage, data is transformed from its raw form into a structured form, which involves defining the data types for each column. This layer is typically used for exploratory data analysis and to create derived datasets.

In [0]:
from pyspark.sql.types import StringType, IntegerType, DecimalType, DateType, BooleanType, StructType, StructField
from pyspark.sql.functions import when, col, to_date, udf, asc, desc, sum as Fsum, to_timestamp

In [0]:
# Create schema for the silver layer is it doesn't exist
spark.sql('CREATE SCHEMA IF NOT EXISTS silver')

DataFrame[]

Create tables for the silver layer from the bronze layer including type conversion.
1. stations
2. riders
3. payments
4. trips

In [0]:
df_stations = spark.read.format('delta').load('/delta/bronze_stations')

df_stations = df_stations.dropDuplicates(['station_id'])

df_stations = df_stations.withColumn('latitude', df_stations['latitude'].cast(DecimalType(9,6)))
df_stations = df_stations.withColumn('longitude', df_stations['longitude'].cast(DecimalType(9,6)))

df_stations.printSchema()
display(df_stations)

df_stations.write.format('delta').mode('overwrite').saveAsTable('silver.stations')

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: decimal(9,6) (nullable = true)
 |-- longitude: decimal(9,6) (nullable = true)



station_id,name,latitude,longitude
13001,Michigan Ave & Washington St,41.883984,-87.624684
13006,LaSalle St & Washington St,41.882664,-87.63253
13008,Millennium Park,41.881032,-87.624084
13011,Canal St & Adams St,41.879255,-87.639904
13016,St. Clair St & Erie St,41.894345,-87.622798
13017,Franklin St & Chicago Ave,41.896747,-87.635668
13021,Clinton St & Lake St,41.885637,-87.641823
13022,Streeter Dr & Grand Ave,41.892278,-87.612043
13028,900 W Harrison St,41.874754,-87.649807
13029,Field Museum,41.865312,-87.617867


In [0]:
df_riders = spark.read.format('delta').load('/delta/bronze_riders')

df_riders = df_riders.dropDuplicates(['rider_id'])

df_riders = df_riders.withColumn('rider_id', df_riders['rider_id'].cast(IntegerType()))
df_riders = df_riders.withColumn('birthday', to_date('birthday', 'yyyy-MM-dd'))
df_riders = df_riders.withColumn('account_start_date', to_date('account_start_date', 'yyyy-MM-dd'))
df_riders = df_riders.withColumn('account_end_date', to_date('account_end_date', 'yyyy-MM-dd'))
df_riders = df_riders.withColumn('is_member', when(col('is_member') == 'True', True).otherwise(False))

df_riders.printSchema()
display(df_riders)

df_riders.write.format('delta').mode('overwrite').saveAsTable('silver.riders')

root
 |-- rider_id: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- address: string (nullable = true)
 |-- birthday: date (nullable = true)
 |-- account_start_date: date (nullable = true)
 |-- account_end_date: date (nullable = true)
 |-- is_member: boolean (nullable = false)



rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1090,Julie,Ortiz,513 Leonard Turnpike,1992-08-23,2021-07-07,,True
1159,David,Burns,8343 William Pike Suite 694,1991-09-21,2020-10-30,,True
1436,Vincent,Clark,47252 Daniel Rapids,1985-04-05,2021-01-20,,True
1512,Megan,Johnson,57672 Mccormick Course,1979-08-11,2019-12-27,,True
1572,John,Peterson,8859 Joshua Throughway Suite 812,2000-01-25,2021-03-09,,True
2069,Darlene,Pineda,88311 Roman Circles Suite 312,2001-01-06,2019-01-05,,True
2088,Zachary,Anderson,85166 James Walk Suite 516,2004-05-20,2015-07-23,2021-12-01,True
2136,Jonathan,Dixon,78500 Hunt Light,1980-06-07,2020-06-09,,True
2162,Deborah,Burton,65735 Rebecca Pike,1984-07-03,2021-04-22,2021-08-01,False
2294,Diana,Jacobs,74386 Elizabeth Terrace Suite 286,1977-09-03,2019-07-09,,True


In [0]:
df_payments = spark.read.format('delta').load('/delta/bronze_payments')

df_payments = df_payments.dropDuplicates(['payment_id'])

df_payments = df_payments.withColumn('payment_id', df_payments['payment_id'].cast(IntegerType()))
df_payments = df_payments.withColumn('date', to_date('date', 'yyyy-MM-dd'))
df_payments = df_payments.withColumn('amount', df_payments['amount'].cast(DecimalType(5,2)))
df_payments = df_payments.withColumn('rider_id', df_payments['rider_id'].cast(IntegerType()))

df_payments.printSchema()
display(df_payments)

df_payments.write.format('delta').mode('overwrite').saveAsTable('silver.payments')

root
 |-- payment_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: decimal(5,2) (nullable = true)
 |-- rider_id: integer (nullable = true)



payment_id,date,amount,rider_id
296,2021-05-01,9.0,1011
467,2022-01-01,22.76,1018
675,2021-03-01,12.49,1029
691,2022-01-01,9.0,1030
829,2020-12-01,9.0,1035
1090,2019-12-01,9.0,1041
1159,2021-05-01,9.0,1042
1436,2020-06-01,11.79,1052
1512,2016-10-01,9.0,1054
1572,2017-08-01,19.13,1056


In [0]:
df_trips = spark.read.format('delta').load('/delta/bronze_trips')

df_trips = df_trips.dropDuplicates(['trip_id'])

df_trips = df_trips.withColumn('start_at', to_timestamp(col('start_at'), 'yyyy-MM-dd HH:mm:ss'))
df_trips = df_trips.withColumn('ended_at', to_timestamp(col('ended_at'), 'yyyy-MM-dd HH:mm:ss'))
df_trips = df_trips.withColumn('rider_id', df_trips['rider_id'].cast(IntegerType()))

df_trips.printSchema()
display(df_trips)

df_trips.write.format('delta').mode('overwrite').saveAsTable('silver.trips')

root
 |-- trip_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- start_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- rider_id: integer (nullable = true)



trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
3E0825E30E2EAEBE,classic_bike,2021-06-17T16:44:39Z,2021-06-17T16:50:09Z,13241,13257,14533
15FE68BD9B95AFCA,classic_bike,2021-06-03T16:45:52Z,2021-06-03T16:57:29Z,TA1307000115,13021,54945
8762A026E41185FF,classic_bike,2021-06-16T10:41:10Z,2021-06-16T10:47:02Z,13196,13021,52072
59F20D3F154DB373,classic_bike,2021-06-08T22:25:58Z,2021-06-08T22:34:43Z,KA1504000135,13021,56331
020BEC6189B807D6,classic_bike,2021-06-13T21:14:15Z,2021-06-13T21:35:54Z,TA1306000006,13021,9783
4B694AC727E2413F,classic_bike,2021-06-13T14:16:20Z,2021-06-13T14:40:34Z,15655,15643,24102
2FB9FF4F282C49B2,classic_bike,2021-06-08T22:13:18Z,2021-06-08T22:20:36Z,KA1503000073,TA1307000164,43429
0AD16EF980F26D55,electric_bike,2021-06-03T18:05:40Z,2021-06-03T18:28:44Z,KA1504000155,KA1504000171,38912
9FAD94344ABDA1F1,classic_bike,2021-06-17T17:27:42Z,2021-06-17T17:39:01Z,LP-,TA1309000033,18332
996631F681B188C0,classic_bike,2021-06-05T10:47:05Z,2021-06-05T11:05:29Z,TA1307000039,TA1309000033,55659
