In [2]:
import pyspark 
from pyspark.sql import SparkSession
import findspark
findspark.init()

spark_session = SparkSession.builder.master("local[*]").appName('spark_sql').getOrCreate()

23/03/03 03:02:02 WARN FileSystem: Cannot load filesystem: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem Unable to get public no-arg constructor
23/03/03 03:02:02 WARN FileSystem: java.lang.NoClassDefFoundError: com/google/api/client/http/HttpRequestInitializer
23/03/03 03:02:02 WARN FileSystem: java.lang.ClassNotFoundException: com.google.api.client.http.HttpRequestInitializer


In [3]:
df_green = spark_session.read.parquet("data/pq/green/*/*")

AnalysisException: Path does not exist: file:/home/idowuilekura/Desktop/dataengineering_zoomcamp/week_five/data/pq/green/*/*

In [4]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       1| 2020-01-24 22:00:56|  2020-01-24 22:06:12|                 N|         1|          95|         196|              1|         null|        6.0|  0.5|    0.

In [5]:
df_yellow = spark_session.read.parquet("data/pq/yellow/*/*")

In [6]:
# get the columns and combine them
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [11]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime','dropoff_datetime')

In [12]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime','dropoff_datetime')

In [14]:
yellow_column = df_yellow.columns

In [15]:
green_column = df_green.columns

In [13]:
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [16]:
common_columns = []

for col in green_column:
    if col in yellow_column:
        common_columns.append(col)
    else:
        pass

In [17]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [19]:
from pyspark.sql import functions as F

In [28]:
df_green_sel = df_green.select(common_columns).withColumn('service_type',F.lit('green'))

In [29]:
df_yellow_sel = df_yellow.select(common_columns).withColumn('service_type',F.lit('yellow'))

In [27]:
help(F.lit)

Help on function lit in module pyspark.sql.functions:

lit(col: Any) -> pyspark.sql.column.Column
    Creates a :class:`~pyspark.sql.Column` of literal value.
    
    .. versionadded:: 1.3.0
    
    Examples
    --------
    >>> df.select(lit(5).alias('height')).withColumn('spark_user', lit(True)).take(1)
    [Row(height=5, spark_user=True)]



In [30]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [34]:
df_trips_data.registerTempTable('trips_data')



In [35]:
df_trips_data.createOrReplaceTempView('trips_data')

In [33]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



                                                                                

In [38]:
spark_session.sql("""
SELECT service_type,
count(1)
FROM 
trips_data 
GROUP BY 1;
""").show()



+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



                                                                                

In [43]:
df_result = spark_session.sql("""
 SELECT 
    -- Reveneue grouping 
    PULocationID as revenue_zone,
    date_trunc('month', pickup_datetime) as revenue_month, 

    service_type, 

    -- Revenue calculation 
    sum(fare_amount) as revenue_monthly_fare,
    sum(extra) as revenue_monthly_extra,
    sum(mta_tax) as revenue_monthly_mta_tax,
    sum(tip_amount) as revenue_monthly_tip_amount,
    sum(tolls_amount) as revenue_monthly_tolls_amount,
    sum(improvement_surcharge) as revenue_monthly_improvement_surcharge,
    sum(total_amount) as revenue_monthly_total_amount,
    sum(congestion_surcharge) as revenue_monthly_congestion_surcharge,

    -- Additional calculations
    avg(passenger_count) as avg_montly_passenger_count,
    avg(trip_distance) as avg_montly_trip_distance

    FROM trips_data
    GROUP BY 
        1,2,3;
""")

In [44]:
df_result.show()



+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         127|2020-

                                                                                

In [47]:
df_result.write.mode('overwrite').parquet('data/report/revenue/')

                                                                                

In [56]:
df_green.createOrReplaceTempView('green_trip_data')

In [98]:
df_green_revenue = spark_session.sql("""
 SELECT 
    -- Reveneue grouping 
   
    date_trunc('hour', pickup_datetime) as hour,
     PULocationID as revenue_zone,

    -- Revenue calculation 
    sum(total_amount) as amount,
    COUNT(1) AS number_records
    FROM green_trip_data
    WHERE pickup_datetime >= '2020-01-01 00:00'
    GROUP BY 
        1,2
"""
    
)

In [102]:
df_green_revenue.write.parquet('data/report/revenue/green',mode='overwrite')



23/03/02 01:34:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [72]:
df_yellow.createOrReplaceTempView('yellow_trip_data')

In [100]:
df_yellow_revenue = spark_session.sql("""
 SELECT 
    -- Reveneue grouping 
   
    date_trunc('hour', pickup_datetime) as hour,
     PULocationID as revenue_zone,

    -- Revenue calculation 
    sum(total_amount) as amount,
    COUNT(1) AS number_records
    FROM yellow_trip_data
    WHERE pickup_datetime >= '2020-01-01 00:00'
    GROUP BY 
        1,2
"""
    
)


In [101]:
df_yellow_revenue.write.parquet('data/report/revenue/yellow',mode='overwrite')



23/03/02 01:33:15 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [103]:
df_green_revenue = spark_session.read.parquet('data/report/revenue/green')
df_yellow_revenue = spark_session.read.parquet('data/report/revenue/yellow')

In [104]:
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

In [105]:
df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')

In [106]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on =['hour','revenue_zone'], how = 'outer')

In [107]:
df_join.write.parquet('data/report/revenue/total',mode='overwrite')

23/03/02 01:35:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [108]:
df_join = spark_session.read.parquet('data/report/revenue/total')

In [109]:
df_zones = spark_session.read.parquet('zones/')

In [110]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [116]:
df_result = df_join.join(df_zones, df_join.revenue_zone == df_zones.LocationID)

In [117]:
df_result.drop('LocationID').write.parquet('tmp/revenue-zones')

23/03/02 01:47:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                