In [53]:
import findspark
findspark.init()
from itertools import chain
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

In [68]:
uber_df = spark.read.csv('data/uber_data.csv', header=True, inferSchema=True).drop_duplicates()
uber_df = uber_df.withColumn("trip_id", monotonically_increasing_id())
uber_df

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, RatecodeID: int, store_and_fwd_flag: string, dropoff_longitude: double, dropoff_latitude: double, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, trip_id: bigint]

In [55]:
# Extract hours, day, month, year, weekday and create new columns.
datetime_dim = uber_df.withColumn("datetime_id", monotonically_increasing_id())\
                      .withColumn("pick_hour", hour("tpep_pickup_datetime"))\
                      .withColumn("pick_day", dayofmonth("tpep_pickup_datetime"))\
                      .withColumn("pick_month", month("tpep_pickup_datetime"))\
                      .withColumn("pick_year", year("tpep_pickup_datetime"))\
                      .withColumn("pick_weekday", dayofweek("tpep_pickup_datetime"))\
                      .withColumn("drop_hour", hour("tpep_dropoff_datetime"))\
                      .withColumn("drop_day", dayofmonth("tpep_dropoff_datetime"))\
                      .withColumn("drop_month", month("tpep_dropoff_datetime"))\
                      .withColumn("drop_year", year("tpep_dropoff_datetime"))\
                      .withColumn("drop_weekday", dayofweek("tpep_dropoff_datetime"))\
                      .selectExpr('datetime_id', 'tpep_pickup_datetime', 'pick_hour', 'pick_day', 'pick_month', 'pick_year', 
                                  'pick_weekday', 'tpep_dropoff_datetime', 'drop_hour', 'drop_day', 'drop_month', 'drop_year',
                                  'drop_weekday')



In [56]:
passenger_count_dim = uber_df.withColumn("passenger_count_id", monotonically_increasing_id())\
                             .select('passenger_count_id','passenger_count')

trip_distance_dim = uber_df.withColumn("trip_distance_id", monotonically_increasing_id())\
                           .select('trip_distance_id','trip_distance')

In [57]:
rate_code_dim = uber_df.withColumn("rate_code_id", monotonically_increasing_id())\
                       .withColumn("rate_code_name", when(col("RatecodeID") == 1, lit("Standard rate"))
                                                    .when(col("RatecodeID") == 2, lit("JFK"))
                                                    .when(col("RatecodeID") == 3, lit("Newark"))
                                                    .when(col("RatecodeID") == 4, lit("Nassau or Westchester"))
                                                    .when(col("RatecodeID") == 5, lit("Negotiated fare"))
                                                    .when(col("RatecodeID") == 6, lit("Group ride")))\
                                                    .select('rate_code_id','RatecodeID','rate_code_name')

In [58]:
pickup_location_dim = uber_df.withColumn("pickup_location_id", monotonically_increasing_id())\
                           .select('pickup_location_id','pickup_latitude','pickup_longitude')

dropoff_location_dim = uber_df.withColumn("dropoff_location_id", monotonically_increasing_id())\
                           .select('dropoff_location_id','dropoff_latitude','dropoff_longitude')

In [59]:
payment_type_dict = {
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}
mapping_expr = create_map([lit(x) for x in chain(*payment_type_dict.items())])

payment_type_dim = (
    uber_df.withColumn("payment_type_id", monotonically_increasing_id())
           .withColumn("payment_type_name", mapping_expr[col("payment_type")])
           .select("payment_type_id", "payment_type", "payment_type_name")
)

payment_type_dim.dropDuplicates(["payment_type"]).select("payment_type", "payment_type_name").show(truncate=False)

+------------+-----------------+
|payment_type|payment_type_name|
+------------+-----------------+
|1           |Credit card      |
|2           |Cash             |
|3           |No charge        |
|4           |Dispute          |
+------------+-----------------+



In [67]:
id = ['passenger_count_id', 'trip_distance_id', 'rate_code_id', 'pickup_location_id', 'dropoff_location_id', 'datetime_id', 'payment_type_id']
df = [passenger_count_dim, trip_distance_dim, rate_code_dim, pickup_location_dim, dropoff_location_dim, datetime_dim, payment_type_dim]
fact_table = uber_df
for i in range(len(df)):
    fact_table = fact_table.join(df[i], on=col('trip_id') == col(id[i]), how='inner')


fact_table = fact_table.select('trip_id','VendorID', 'datetime_id', 'passenger_count_id',
               'trip_distance_id', 'rate_code_id', 'store_and_fwd_flag', 'pickup_location_id', 'dropoff_location_id',
               'payment_type_id', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
               'improvement_surcharge', 'total_amount')
fact_table

DataFrame[trip_id: bigint, VendorID: int, datetime_id: bigint, passenger_count_id: bigint, trip_distance_id: bigint, rate_code_id: bigint, store_and_fwd_flag: string, pickup_location_id: bigint, dropoff_location_id: bigint, payment_type_id: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double]