In [2]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession, Row
from datetime import datetime, date
from pyspark.sql import functions as F
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

In [3]:
df_green = spark.read.parquet("/datasets/green_tripdata_2020-01.parquet")
df_yellow = spark.read.parquet("/datasets/yellow_tripdata_2020-01.parquet")
#df_green.columns
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [4]:
df_yellow = df_yellow \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime") 
df_green = df_green \
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime") 

In [7]:
same_columns = list(set(df_yellow.columns).intersection(set(df_green.columns)))

In [11]:
df_green_intersect = df_green.select(same_columns).withColumn("service_type", F.lit("green"))
df_yellow_intersect = df_yellow.select(same_columns).withColumn("service_type", F.lit("yellow"))
df_join = df_green_intersect.unionAll(df_yellow_intersect)
df_join.show()

+------------+-------------+------------------+----------+-----+---------------------+-------+--------+-----------+------------+------------+----------+------------+--------------------+---------------+-------------------+-------------------+------------+------------+
|tolls_amount|trip_distance|store_and_fwd_flag|RatecodeID|extra|improvement_surcharge|mta_tax|VendorID|fare_amount|total_amount|payment_type|tip_amount|DOLocationID|congestion_surcharge|passenger_count|    pickup_datetime|   dropoff_datetime|PULocationID|service_type|
+------------+-------------+------------------+----------+-----+---------------------+-------+--------+-----------+------------+------------+----------+------------+--------------------+---------------+-------------------+-------------------+------------+------------+
|         0.0|          0.0|                 N|       1.0|  0.5|                  0.3|    0.5|       2|        3.5|        4.81|         1.0|      0.01|         264|                 0.0|       

In [13]:
df_join.createOrReplaceTempView("trips_data")
spark.sql(""" 
    SELECT * FROM trips_data limit 10;
""")

DataFrame[tolls_amount: double, trip_distance: double, store_and_fwd_flag: string, RatecodeID: double, extra: double, improvement_surcharge: double, mta_tax: double, VendorID: bigint, fare_amount: double, total_amount: double, payment_type: double, tip_amount: double, DOLocationID: bigint, congestion_surcharge: double, passenger_count: double, pickup_datetime: timestamp_ntz, dropoff_datetime: timestamp_ntz, PULocationID: bigint, service_type: string]

In [21]:
spark.sql(""" 
    SELECT date_trunc('day', pickup_datetime) as operating_day, SUM(fare_amount) as total_fare FROM trips_data WHERE pickup_datetime  like '2020-01-%' GROUP BY date_trunc('day', pickup_datetime) order by operating_day;
""").show()

+-------------------+------------------+
|      operating_day|        total_fare|
+-------------------+------------------+
|2020-01-01 00:00:00|2534312.9700000323|
|2020-01-02 00:00:00| 2454027.490000019|
|2020-01-03 00:00:00| 2586000.640000038|
|2020-01-04 00:00:00| 2477682.280000023|
|2020-01-05 00:00:00|2381192.0900000334|
|2020-01-06 00:00:00|2628954.4500000756|
|2020-01-07 00:00:00| 2816743.490000075|
|2020-01-08 00:00:00|2990385.9300000835|
|2020-01-09 00:00:00| 3256853.190000062|
|2020-01-10 00:00:00| 3255155.520000089|
|2020-01-11 00:00:00|2926044.6000000434|
|2020-01-12 00:00:00| 2671900.150000052|
|2020-01-13 00:00:00| 2952113.180000025|
|2020-01-14 00:00:00|2969501.2100000256|
|2020-01-15 00:00:00| 2993489.930000032|
|2020-01-16 00:00:00|3269153.5200000303|
|2020-01-17 00:00:00| 3314316.640000031|
|2020-01-18 00:00:00| 2405617.920000019|
|2020-01-19 00:00:00|2347138.9700000198|
|2020-01-20 00:00:00| 2259042.620000005|
+-------------------+------------------+
only showing top