In [0]:
from pyspark.sql.functions import *

In [0]:
def uploadfiledelta(df, filename):
    df.write.format("delta")\
        .mode("overwrite") \
        .save("/delta/"+filename)

In [0]:
def readfilecsv(filename):
    return spark.read.format("csv") \
            .option("inferSchema","false") \
            .option("header","true") \
            .option("sep",",") \
            .load ("/FileStore/tables/"+filename+".csv")

In [0]:
%fs rm -r dbfs:/delta/payments

In [0]:
file = ["payments","riders","stations","trips"]
payments_col = ["payment_id","date","amount","rider_id"]
riders_col = ["rider_id","first","last","address","birthday","account_start_date","account_end_date","is_member"]
stations_col = ["station_id","name","latitude","longitude"]
trips_col = ["trip_id","rideable_type","start_at","ended_at","start_station_id","end_station_id","rider_id"]



In [0]:
df_payments = readfilecsv(file[0])
df_payments = df_payments.toDF(*payments_col)
df_payments.write.format("delta").mode("overwrite").save("/delta/bronze"+file[0])
df_riders = readfilecsv(file[1])
df_riders = df_riders.toDF(*riders_col)
df_riders.write.format("delta").mode("overwrite").save("/delta/bronze"+file[1])
df_stations = readfilecsv(file[2])
df_stations = df_stations.toDF(*stations_col)
df_stations.write.format("delta").mode("overwrite").save("/delta/bronze"+file[2])
df_trips = readfilecsv(file[3])
df_trips = df_trips.toDF(*trips_col)
df_trips.write.format("delta").mode("overwrite").save("/delta/bronze"+file[3])

In [0]:
spark.sql("CREATE TABLE silver_payments USING DELTA LOCATION '/delta/bronze"+file[0]+"'")
spark.sql("CREATE TABLE silver_riders USING DELTA LOCATION '/delta/bronze"+file[1]+"'")
spark.sql("CREATE TABLE silver_stations USING DELTA LOCATION '/delta/bronze"+file[2]+"'")
spark.sql("CREATE TABLE silver_trips USING DELTA LOCATION '/delta/bronze"+file[3]+"'")

Out[21]: DataFrame[]

In [0]:
df_silver_payments = spark.read.table("silver_payments")
df_silver_riders   = spark.read.table("silver_riders")
df_silver_stations = spark.read.table("silver_stations")
df_silver_trips    = spark.read.table("silver_trips")


In [0]:
#Create df dim station
df_dim_station = df_silver_stations.withColumn("latitude", df_silver_stations["latitude"].cast('float'))\
                                    .withColumn("longitude", df_silver_stations["longitude"].cast('float'))

In [0]:
#Create df fact_payment
df_fact_payment = df_silver_payments.withColumn("amount",df_silver_payments["amount"].cast('float'))\
                                    .withColumn("rider_id", df_silver_payments["rider_id"].cast('int'))\
                                    .withColumn("date", df_silver_payments["date"].cast('date'))\
                                    .withColumn("payment_id",df_silver_payments["payment_id"].cast('int'))

In [0]:
#Create df dim_rider
df_dim_rider =  df_silver_riders.withColumn("rider_id",df_silver_riders["rider_id"].cast('int'))\
                                .withColumn("birthday",df_silver_riders["birthday"].cast('date'))\
                                .withColumn("account_start_date",df_silver_riders["account_start_date"].cast('date'))\
                                .withColumn("account_end_date",df_silver_riders["account_end_date"].cast('date'))\
                                .withColumn("is_member",df_silver_riders["is_member"].cast('boolean'))\
                                .withColumn("account_age",floor(months_between(to_timestamp(col("account_start_date"), "MM-dd-yyyy"),to_timestamp(col("birthday"), "MM-dd-yyyy"))/lit(12)))


                                

In [0]:
#Create df_rider_birthday
df_rider_birthday = df_dim_rider.select("birthday","rider_id").withColumnRenamed("rider_id","id")

In [0]:

#Create df fact_trip
df_fact_trip = df_silver_trips.withColumn("start_at",to_timestamp(col("start_at")))\
                              .withColumn("ended_at",to_timestamp(col("ended_at")))\
                              .withColumnRenamed("start_at", "started_date")\
                              .withColumnRenamed("ended_at", "ended_date")\
                              .withColumn("total_time_trip", round((unix_timestamp("ended_date") - unix_timestamp('started_date'))/60))\
                              .withColumn("rider_id",df_silver_trips["rider_id"].cast('int'))
df_fact_trip = df_fact_trip.join(df_rider_birthday, df_fact_trip.rider_id == df_rider_birthday.id,"inner")
df_fact_trip = df_fact_trip.withColumn("ride_age",floor(months_between(to_timestamp(col("started_date"), "MM-dd-yyyy"),to_timestamp(col("birthday"), "MM-dd-yyyy"))/lit(12)))
df_fact_trip = df_fact_trip.select("trip_id","rider_id","started_date","ended_date","start_station_id","end_station_id","rideable_type","ride_age","total_time_trip")
                              


                                

In [0]:
def generate_series(start, stop, interval):
    # Determine start and stops in epoch seconds
    start, stop = spark.createDataFrame(
        [(start, stop)], ("start", "stop")
    ).select(
        [col(c).cast("timestamp").cast("long") for c in ("start", "stop")
    ]).first()
    # Create range with increments and cast to timestamp
    return spark.range(start, stop, interval).select(
        col("id").cast("timestamp").alias("date")
    )


In [0]:
min_date = (df_dim_rider.agg(min("account_start_date"))).collect()[0][0]

In [0]:
print(min_date)

2013-01-31


In [0]:
beginDate = "2013-01-31"
endDate = "2043-01-31"

df_dim_date = generate_series(beginDate, endDate, 60 * 60)
df_dim_date = df_dim_date.withColumn('date', to_timestamp('date', 'EEE, MM/dd/yy hh:mm a')) \
                        .withColumn('year', year('date')) \
                        .withColumn('quater', quarter('date')) \
                        .withColumn('month', month('date')) \
                        .withColumn('day', dayofmonth('date')) \
                        .withColumn('hour', hour('date')) \
                        .withColumn('week', weekofyear('date')) \
                        .withColumn('dayofweek', dayofweek('date'))
display(df_dim_date)

date,year,quater,month,day,hour,week,dayofweek
2013-01-31T00:00:00.000+0000,2013,1,1,31,0,5,5
2013-01-31T01:00:00.000+0000,2013,1,1,31,1,5,5
2013-01-31T02:00:00.000+0000,2013,1,1,31,2,5,5
2013-01-31T03:00:00.000+0000,2013,1,1,31,3,5,5
2013-01-31T04:00:00.000+0000,2013,1,1,31,4,5,5
2013-01-31T05:00:00.000+0000,2013,1,1,31,5,5,5
2013-01-31T06:00:00.000+0000,2013,1,1,31,6,5,5
2013-01-31T07:00:00.000+0000,2013,1,1,31,7,5,5
2013-01-31T08:00:00.000+0000,2013,1,1,31,8,5,5
2013-01-31T09:00:00.000+0000,2013,1,1,31,9,5,5


In [0]:
df_dim_date.write.format("delta").mode("overwrite").saveAsTable("gold_dim_date")
df_fact_trip.write.format("delta").mode("overwrite").saveAsTable("gold_fact_trip")
df_dim_rider.write.format("delta").mode("overwrite").saveAsTable("gold_dim_rider")
df_fact_payment.write.format("delta").mode("overwrite").saveAsTable("gold_fact_payment")
df_dim_station.write.format("delta").mode("overwrite").saveAsTable("gold_din_station")