In [3]:
bucket_name = "uber-data-analytics-raw"
path = f"gs://{bucket_name}/yellow_tripdata_2009-01.parquet"

df = spark.read.parquet(path)

In [10]:
df.show(2)

+-----------+--------------------+---------------------+---------------+-------------+----------+---------+---------+-----------------+----------+---------+------------+--------+---------+-------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|Trip_Distance| Start_Lon|Start_Lat|Rate_Code|store_and_forward|   End_Lon|  End_Lat|Payment_Type|Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+-------------+----------+---------+---------+-----------------+----------+---------+------------+--------+---------+-------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|         2.63|-73.991957|40.721567|     null|             null|-73.993803|40.695922|        CASH|     8.9|      0.5|   null|    0.0|      0.0|      9.4|
|        VTS| 2009-01-04 03:31:00|  2009-01-04 03:38:00|              3|         4.55|-73.982102| 40

In [7]:
df.printSchema()

root
 |-- vendor_name: string (nullable = true)
 |-- Trip_Pickup_DateTime: string (nullable = true)
 |-- Trip_Dropoff_DateTime: string (nullable = true)
 |-- Passenger_Count: long (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Start_Lon: double (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Rate_Code: double (nullable = true)
 |-- store_and_forward: double (nullable = true)
 |-- End_Lon: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)



In [17]:
import datetime

s = df.rdd.map(lambda x:datetime.datetime.strptime(x[1],'%Y-%m-%d %H:%M:%S'))
s.top(2)

[datetime.datetime(2009, 1, 31, 23, 59, 59),
 datetime.datetime(2009, 1, 31, 23, 59, 59)]

In [18]:
type(s)

pyspark.rdd.PipelinedRDD

In [96]:
datetime_dim = df[['Trip_Pickup_DateTime','Trip_Dropoff_DateTime']]
datetime_dim.printSchema()

root
 |-- Trip_Pickup_DateTime: string (nullable = true)
 |-- Trip_Dropoff_DateTime: string (nullable = true)



In [97]:
datetime_dim.count()

14092413

In [98]:
datetime_dim = datetime_dim.dropDuplicates()
datetime_dim.count()

8604662

In [147]:
from pyspark.sql.functions import monotonically_increasing_id
datetime_dim = datetime_dim.withColumn('Datetime_ID',monotonically_increasing_id())
# datetime_dim.printSchema()
datetime_dim.count()

8604662

In [100]:
from pyspark.sql.functions import to_timestamp
datetime_dim = datetime_dim.withColumn('Trip_Pickup_DateTime',to_timestamp(datetime_dim['Trip_Pickup_DateTime']))
datetime_dim = datetime_dim.withColumn('Trip_Dropoff_DateTime',to_timestamp(datetime_dim['Trip_Dropoff_DateTime']))

In [148]:
datetime_dim.printSchema()

root
 |-- Datetime_ID: long (nullable = false)
 |-- Trip_Pickup_DateTime: timestamp (nullable = true)
 |-- Pickup_Hour: integer (nullable = true)
 |-- Pickup_Day: integer (nullable = true)
 |-- Pickup_Month: integer (nullable = true)
 |-- Pickup_Year: integer (nullable = true)
 |-- Pickup_Weekday: integer (nullable = true)
 |-- Trip_Dropoff_DateTime: timestamp (nullable = true)
 |-- Dropoff_Hour: integer (nullable = true)
 |-- Dropoff_Day: integer (nullable = true)
 |-- Dropoff_Month: integer (nullable = true)
 |-- Dropoff_Year: integer (nullable = true)
 |-- Dropoff_Weekday: integer (nullable = true)



In [102]:
from pyspark.sql.functions import hour,dayofmonth,year,dayofweek,month
datetime_dim = datetime_dim.withColumn('Pickup_Hour',hour(datetime_dim['Trip_Pickup_DateTime']))
datetime_dim = datetime_dim.withColumn('Pickup_Day',dayofmonth(datetime_dim['Trip_Pickup_DateTime']))
datetime_dim = datetime_dim.withColumn('Pickup_Month',month(datetime_dim['Trip_Pickup_DateTime']))
datetime_dim = datetime_dim.withColumn('Pickup_Year',year(datetime_dim['Trip_Pickup_DateTime']))
datetime_dim = datetime_dim.withColumn('Pickup_Weekday',dayofweek(datetime_dim['Trip_Pickup_DateTime']))

datetime_dim = datetime_dim.withColumn('Dropoff_Hour',hour(datetime_dim['Trip_Dropoff_DateTime']))
datetime_dim = datetime_dim.withColumn('Dropoff_Day',dayofmonth(datetime_dim['Trip_Dropoff_DateTime']))
datetime_dim = datetime_dim.withColumn('Dropoff_Month',month(datetime_dim['Trip_Dropoff_DateTime']))
datetime_dim = datetime_dim.withColumn('Dropoff_Year',year(datetime_dim['Trip_Dropoff_DateTime']))
datetime_dim = datetime_dim.withColumn('Dropoff_Weekday',dayofweek(datetime_dim['Trip_Dropoff_DateTime']))

In [103]:
datetime_dim.show(3)

+--------------------+---------------------+-----------+-----------+----------+------------+-----------+--------------+------------+-----------+-------------+------------+---------------+
|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Datetime_ID|Pickup_Hour|Pickup_Day|Pickup_Month|Pickup_Year|Pickup_Weekday|Dropoff_Hour|Dropoff_Day|Dropoff_Month|Dropoff_Year|Dropoff_Weekday|
+--------------------+---------------------+-----------+-----------+----------+------------+-----------+--------------+------------+-----------+-------------+------------+---------------+
| 2009-01-01 00:01:00|  2009-01-01 00:08:42|          0|          0|         1|           1|       2009|             5|           0|          1|            1|        2009|              5|
| 2009-01-01 00:09:53|  2009-01-01 00:28:46|          1|          0|         1|           1|       2009|             5|           0|          1|            1|        2009|              5|
| 2009-01-01 00:13:21|  2009-01-01 00:38:55|          2|    

In [104]:
datetime_dim.printSchema()

root
 |-- Trip_Pickup_DateTime: timestamp (nullable = true)
 |-- Trip_Dropoff_DateTime: timestamp (nullable = true)
 |-- Datetime_ID: long (nullable = false)
 |-- Pickup_Hour: integer (nullable = true)
 |-- Pickup_Day: integer (nullable = true)
 |-- Pickup_Month: integer (nullable = true)
 |-- Pickup_Year: integer (nullable = true)
 |-- Pickup_Weekday: integer (nullable = true)
 |-- Dropoff_Hour: integer (nullable = true)
 |-- Dropoff_Day: integer (nullable = true)
 |-- Dropoff_Month: integer (nullable = true)
 |-- Dropoff_Year: integer (nullable = true)
 |-- Dropoff_Weekday: integer (nullable = true)



In [105]:
datetime_dim = datetime_dim[['Datetime_ID','Trip_Pickup_DateTime','Pickup_Hour','Pickup_Day','Pickup_Month','Pickup_Year','Pickup_Weekday','Trip_Dropoff_DateTime','Dropoff_Hour','Dropoff_Day','Dropoff_Month','Dropoff_Year','Dropoff_Weekday']]

In [107]:
datetime_dim.printSchema()

root
 |-- Datetime_ID: long (nullable = false)
 |-- Trip_Pickup_DateTime: timestamp (nullable = true)
 |-- Pickup_Hour: integer (nullable = true)
 |-- Pickup_Day: integer (nullable = true)
 |-- Pickup_Month: integer (nullable = true)
 |-- Pickup_Year: integer (nullable = true)
 |-- Pickup_Weekday: integer (nullable = true)
 |-- Trip_Dropoff_DateTime: timestamp (nullable = true)
 |-- Dropoff_Hour: integer (nullable = true)
 |-- Dropoff_Day: integer (nullable = true)
 |-- Dropoff_Month: integer (nullable = true)
 |-- Dropoff_Year: integer (nullable = true)
 |-- Dropoff_Weekday: integer (nullable = true)



In [None]:
datetime_dim.show(5)

+-----------+--------------------+-----------+----------+------------+-----------+--------------+---------------------+------------+-----------+-------------+------------+---------------+
|Datetime_ID|Trip_Pickup_DateTime|Pickup_Hour|Pickup_Day|Pickup_Month|Pickup_Year|Pickup_Weekday|Trip_Dropoff_DateTime|Dropoff_Hour|Dropoff_Day|Dropoff_Month|Dropoff_Year|Dropoff_Weekday|
+-----------+--------------------+-----------+----------+------------+-----------+--------------+---------------------+------------+-----------+-------------+------------+---------------+
|          0| 2009-01-01 00:01:00|          0|         1|           1|       2009|             5|  2009-01-01 00:08:42|           0|          1|            1|        2009|              5|
|          1| 2009-01-01 00:09:53|          0|         1|           1|       2009|             5|  2009-01-01 00:28:46|           0|          1|            1|        2009|              5|
|          2| 2009-01-01 00:13:21|          0|         1|   

In [109]:
pickup_location_dim = df[['Start_Lon','Start_Lat']].dropDuplicates()
pickup_location_dim.printSchema()

root
 |-- Start_Lon: double (nullable = true)
 |-- Start_Lat: double (nullable = true)



In [149]:
pickup_location_dim.count()

13297284

In [111]:
pickup_location_dim = pickup_location_dim.withColumn('Pickup_Location_ID',monotonically_increasing_id())
pickup_location_dim = pickup_location_dim[['Pickup_Location_ID','Start_Lon','Start_Lat']]
pickup_location_dim.printSchema()

root
 |-- Pickup_Location_ID: long (nullable = false)
 |-- Start_Lon: double (nullable = true)
 |-- Start_Lat: double (nullable = true)



In [113]:
drop_location_dim = df[['End_Lon','End_Lat']].dropDuplicates()
drop_location_dim = drop_location_dim.withColumn('Drop_Location_ID',monotonically_increasing_id())
drop_location_dim = drop_location_dim[['Drop_Location_ID','End_Lon','End_Lat']]
drop_location_dim.printSchema()

root
 |-- Drop_Location_ID: long (nullable = false)
 |-- End_Lon: double (nullable = true)
 |-- End_Lat: double (nullable = true)



In [150]:
drop_location_dim.count()

13458194

In [152]:
passenger_count_dim = df[['Passenger_Count']].dropDuplicates()
passenger_count_dim = passenger_count_dim.withColumn('Passenger_Count_ID',monotonically_increasing_id())
passenger_count_dim = passenger_count_dim[['Passenger_Count_ID','Passenger_Count']]
passenger_count_dim.printSchema()

root
 |-- Passenger_Count_ID: long (nullable = false)
 |-- Passenger_Count: long (nullable = true)



In [153]:
passenger_count_dim.count()

8

In [116]:
trip_distance_dim = df[['Trip_Distance']].dropDuplicates()
trip_distance_dim = trip_distance_dim.withColumn('Trip_Distance_ID',monotonically_increasing_id())
trip_distance_dim = trip_distance_dim[['Trip_Distance_ID','Trip_Distance']]
trip_distance_dim.printSchema()

root
 |-- Trip_Distance_ID: long (nullable = false)
 |-- Trip_Distance: double (nullable = true)



In [154]:
trip_distance_dim.count()

5716

In [156]:
from pyspark.sql import functions as f
from pyspark.sql import types as t

# rate_code_dict = {
#     1:'Standard rate',
#     2:'JFK',
#     3:'Newark',
#     4:'Nassau or Westchester',
#     5:'Negotiated fare',
#     6:'Group ride'
# }

# d = sc.broadcast(rate_code_dict)

def mapping(x):
    rate_code_dict = {
    1:'Standard rate',
    2:'JFK',
    3:'Newark',
    4:'Nassau or Westchester',
    5:'Negotiated fare',
    6:'Group ride'
    }
    return rate_code_dict.get(x)


function = f.udf(lambda x:mapping(x))

rate_code_dim = df[['Rate_Code']].dropDuplicates()
rate_code_dim = rate_code_dim.withColumn('Rate_Code_ID',monotonically_increasing_id())
rate_code_dim = rate_code_dim[['Rate_Code_ID','Rate_Code']]
# rate_code_dim = rate_code_dim.withColumn('Rate_Code_Name',function(rate_code_dim['Rate_Code']))
# rate_code_dim.printSchema()

In [159]:
rate_code_dim.show()

+------------+---------+
|Rate_Code_ID|Rate_Code|
+------------+---------+
|360777252864|     null|
+------------+---------+



In [161]:
df.select('Rate_Code').filter("Rate_code != null").show()

+---------+
|Rate_Code|
+---------+
+---------+



In [181]:
payment_type_dim = df[['Payment_Type']].dropDuplicates()
payment_type_dim.show()

+------------+
|Payment_Type|
+------------+
|   No Charge|
|        CASH|
|      Credit|
|        Cash|
|     Dispute|
|      CREDIT|
+------------+



In [178]:
from pyspark.sql import functions as f

def switch(x):
    return x.upper()

n = f.udf(lambda x:switch(x))

In [185]:
payment_type_dim = payment_type_dim.withColumn('Payment_Type',n(payment_type_dim['Payment_Type']))
payment_type_dim = payment_type_dim.withColumn('Payment_Type_Name',payment_type_dim['Payment_Type'])

payment_type_dim.show()

+------------+-----------------+
|Payment_Type|Payment_Type_Name|
+------------+-----------------+
|     DISPUTE|          DISPUTE|
|   NO CHARGE|        NO CHARGE|
|        CASH|             CASH|
|      CREDIT|           CREDIT|
+------------+-----------------+



In [184]:
payment_type_dim = payment_type_dim.dropDuplicates()
payment_type_dim.show()

+------------+
|Payment_Type|
+------------+
|     DISPUTE|
|   NO CHARGE|
|        CASH|
|      CREDIT|
+------------+



In [187]:

def mapping(x):
    payment_types = {
    'CREDIT':1,
    'DISPUTE':2,
    'NO CHARGE':3,
    'CASH':4,
    'UNKNOWN':5,
    'VOIDED TRIP':6
    }
    return payment_types.get(x)

p = f.udf(lambda x: mapping(x))

payment_type_dim = payment_type_dim.withColumn('Payment_Type',p(payment_type_dim['Payment_Type_Name']))
payment_type_dim.show()
    

+------------+-----------------+
|Payment_Type|Payment_Type_Name|
+------------+-----------------+
|           2|          DISPUTE|
|           3|        NO CHARGE|
|           4|             CASH|
|           1|           CREDIT|
+------------+-----------------+



In [194]:
payment_type_dim = payment_type_dim.withColumn('Payment_Type_ID',monotonically_increasing_id())
payment_type_dim.show()

+------------+-----------------+---------------+
|Payment_Type|Payment_Type_Name|Payment_Type_ID|
+------------+-----------------+---------------+
|           2|          DISPUTE|   214748364800|
|           3|        NO CHARGE|   472446402560|
|           4|             CASH|   523986010112|
|           1|           CREDIT|  1666447310848|
+------------+-----------------+---------------+



In [192]:
g = payment_type_dim.rdd.zipWithIndex().toDF()

In [193]:
g.show()

+--------------+---+
|            _1| _2|
+--------------+---+
|  [2, DISPUTE]|  0|
|[3, NO CHARGE]|  1|
|     [4, CASH]|  2|
|   [1, CREDIT]|  3|
+--------------+---+



In [204]:
fact_table = df[['Vendor_Name','Fare_Amt','surcharge','mta_tax','Tip_Amt','Tolls_Amt','Total_Amt']]
fact_table.show()

+-----------+-----------------+---------+-------+-------+---------+---------+
|Vendor_Name|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+-----------------+---------+-------+-------+---------+---------+
|        VTS|              8.9|      0.5|   null|    0.0|      0.0|      9.4|
|        VTS|             12.1|      0.5|   null|    2.0|      0.0|     14.6|
|        VTS|             23.7|      0.0|   null|   4.74|      0.0|    28.44|
|        DDS|             14.9|      0.5|   null|   3.05|      0.0|    18.45|
|        DDS|              3.7|      0.0|   null|    0.0|      0.0|      3.7|
|        DDS|              6.1|      0.5|   null|    0.0|      0.0|      6.6|
|        DDS|              5.7|      0.0|   null|    1.0|      0.0|      6.7|
|        VTS|              6.1|      0.5|   null|    0.0|      0.0|      6.6|
|        CMT|8.699999999999998|      0.0|   null|    1.3|      0.0|     10.0|
|        CMT|              5.9|      0.0|   null|    0.0|      0

In [205]:
def m(x):
    vendor = {
        'CMT':1,
        'VTS':2,
        'DDS':3
    }
    return vendor.get(x)

v = f.udf(lambda x:m(x))

fact_table = fact_table.withColumn('Vendor_ID',v(fact_table['Vendor_Name']))
fact_table.show()

+-----------+-----------------+---------+-------+-------+---------+---------+---------+
|Vendor_Name|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|Vendor_ID|
+-----------+-----------------+---------+-------+-------+---------+---------+---------+
|        VTS|              8.9|      0.5|   null|    0.0|      0.0|      9.4|        2|
|        VTS|             12.1|      0.5|   null|    2.0|      0.0|     14.6|        2|
|        VTS|             23.7|      0.0|   null|   4.74|      0.0|    28.44|        2|
|        DDS|             14.9|      0.5|   null|   3.05|      0.0|    18.45|        3|
|        DDS|              3.7|      0.0|   null|    0.0|      0.0|      3.7|        3|
|        DDS|              6.1|      0.5|   null|    0.0|      0.0|      6.6|        3|
|        DDS|              5.7|      0.0|   null|    1.0|      0.0|      6.7|        3|
|        VTS|              6.1|      0.5|   null|    0.0|      0.0|      6.6|        2|
|        CMT|8.699999999999998| 

In [206]:
fact_table.count()

14092413

In [234]:
df = df.withColumn('Payment_Type',n(df['Payment_Type']))
df.show()

+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|         Start_Lon|Start_Lat|Rate_Code|store_and_forward|           End_Lon|  End_Lat|Payment_Type|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|              2.63|        -73.991957|40.721567|     null|             null|        -73.993803|40.695922|        CASH|              8.9|      0.5|   null|    0.0| 

In [235]:
dfl = df.join(passenger_count_dim, on = 'Passenger_Count').join(trip_distance_dim,on = 'Trip_Distance').join(datetime_dim,on=['Trip_Pickup_DateTime','Trip_Dropoff_DateTime']).join(pickup_location_dim,on=['Start_Lon','Start_Lat']).join(drop_location_dim,on=['End_Lon','End_Lat']).join(payment_type_dim,df.Payment_Type == payment_type_dim.Payment_Type_Name)

In [237]:
fact_table = dfl[['Vendor_Name','Datetime_ID','Pickup_Location_ID','Drop_Location_ID','Passenger_Count_ID','Trip_Distance_ID','Payment_Type_ID','Fare_Amt','surcharge','mta_tax','Tip_Amt','Tolls_Amt','Total_Amt']]
fact_table.show()

+-----------+-------------+------------------+----------------+------------------+----------------+---------------+-----------------+---------+-------+-------+---------+-----------------+
|Vendor_Name|  Datetime_ID|Pickup_Location_ID|Drop_Location_ID|Passenger_Count_ID|Trip_Distance_ID|Payment_Type_ID|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|        Total_Amt|
+-----------+-------------+------------------+----------------+------------------+----------------+---------------+-----------------+---------+-------+-------+---------+-----------------+
|        CMT| 472446438536|      231928283310|     17179919176|      919123001344|    274877906944|   214748364800|              6.5|      0.0|   null|    0.0|      0.0|              6.5|
|        CMT|  17179888002|     1382979478439|     17179878459|      592705486848|   1288490188800|   214748364800|              5.3|      0.0|   null|    0.0|      0.0|              5.3|
|        CMT| 833223689205|       68719532223|     171799254

In [238]:
fact_table.count()

14092413

In [None]:
fact_table = fact_table.withColumn('Vendor_ID',v(fact_table['Vendor_Name']))
fact_table.show()

+-----------+-------------+------------------+----------------+------------------+----------------+---------------+-----------------+---------+-------+-------+---------+-----------------+---------+
|Vendor_Name|  Datetime_ID|Pickup_Location_ID|Drop_Location_ID|Passenger_Count_ID|Trip_Distance_ID|Payment_Type_ID|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|        Total_Amt|Vendor_ID|
+-----------+-------------+------------------+----------------+------------------+----------------+---------------+-----------------+---------+-------+-------+---------+-----------------+---------+
|        CMT| 532575980194|      266287975017|            2653|      592705486848|   1546188226573|   214748364800|             36.1|      0.0|   null|    0.0|     15.0|             51.1|        1|
|        CMT| 901943170969|      824633782061|           61881|      592705486848|   1245540515840|   214748364800|              6.1|      0.0|   null|    0.0|      0.0|              6.1|        1|
|        C