In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf 
from pyspark.context import SparkContext
from pyspark.sql import functions as F
import pandas as pd

In [3]:
credentials_location = '/home/alex/.google/credentials/sanguine-form-376720-9f11d64bdbbd.json'

conf = SparkConf() \
        .setMaster('local[*]') \
        .setAppName('test') \
        .set('spark.jars', '/home/alex/lib/gcs-connector-hadoop3-latest.jar') \
        .set('spark.hadoop.google.cloud.auth.service.account.enable', 'true') \
        .set('spark.hadoop.google.cloud.auth.service.account.json.keyfile', credentials_location)

In [4]:
sc = SparkContext(conf=conf)

23/03/02 17:01:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/02 17:01:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [6]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [7]:
df_green = spark.read \
           .parquet('gs://iskanderrus_dez_data_lake_sanguine-form-376720/unified/green/*/*')

                                                                                

In [8]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [9]:
df_green = df_green \
.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
.withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [10]:
df_green.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2|2020-01-11 04:05:54|2020-01-11 04:13:49|                 N|       1.0|         129|         129|            1.0|         0.81|        6.5|  0.5|    0.5|      0.71

                                                                                

In [11]:
df_yellow = spark.read \
           .parquet('gs://iskanderrus_dez_data_lake_sanguine-form-376720/unified/yellow/*/*')

In [12]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [13]:
df_yellow = df_yellow \
.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [14]:
df_yellow.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1|2020-01-02 18:49:18|2020-01-02 19:06:24|            0.0|          2.7|       1.0|                 N|         263|         230|           1|       13.0|  3.5|    0.5|      3.45|         0

                                                                                

In [15]:
common_cols = []

for col in df_green.columns: 
    if col in set(df_yellow.columns): 
        common_cols.append(col)

In [16]:
common_cols

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [17]:
df_green_sel = df_green \
.select(common_cols) \
.withColumn('service_type', F.lit('green'))

In [18]:
df_yellow_sel = df_yellow \
.select(common_cols) \
.withColumn('service_type', F.lit('yellow'))

In [19]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [20]:
df_trips_data.show(5)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2020-01-11 04:05:54|2020-01-11 04:13:49|                 N|       1.0|         129|         129|            1.0|         0.81|        6.5|  0.5|    0.5|      0.71|         0.0|       

In [21]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2802931|
|      yellow|55553400|
+------------+--------+



                                                                                

In [22]:
df_trips_data.createOrReplaceTempView('trips_data')

In [23]:
spark.sql("""
SELECT 
    service_type, 
    count(1) as trips_count
FROM 
    trips_data
GROUP BY
    service_type; 
""").show()



+------------+-----------+
|service_type|trips_count|
+------------+-----------+
|       green|    2802931|
|      yellow|   55553400|
+------------+-----------+





In [24]:
df_result = spark.sql("""
SELECT 
    PULocationID as revenue_location,
    EXTRACT(MONTH FROM pickup_datetime) AS revenue_month, 
    service_type, 
    
    SUM(fare_amount)  AS revenue_monthly_fare,
    SUM(extra)  AS revenue_monthly_extra,
    SUM(mta_tax)  AS revenue_monthly_mta_tax,
    SUM(tip_amount)  AS revenue_monthly_tip_amount,
    SUM(tolls_amount)  AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge)  AS revenue_monthly_improvement_surcharge,
    SUM(total_amount)  AS revenue_monthly_total_amount,
    SUM(congestion_surcharge)  AS revenue_monthly_congestion_surcharge,
    
    COUNT(*) AS total_monthly_trips, 
    AVG(passenger_count) AS avg_monthly_passenger_count, 
    AVG(trip_distance) as avg_monthly_trip_distance

FROM 
    trips_data
GROUP BY
    revenue_location, revenue_month, service_type
ORDER BY revenue_location ASC, revenue_month ASC; 
""")

In [25]:
df_result.coalesce(1).write.parquet('gs://iskanderrus_dez_data_lake_sanguine-form-376720/reports/monthly_revenue_by_service_type.parquet', mode='overwrite')

                                                                                

In [29]:
df = pd.read_parquet('gs://iskanderrus_dez_data_lake_sanguine-form-376720/reports/monthly_revenue_by_service_type.parquet/part-00000-f2e3d77a-a09e-4bf2-a623-20c390dc5f89-c000.snappy.parquet')

In [30]:
df.head(24)

Unnamed: 0,revenue_location,revenue_month,service_type,revenue_monthly_fare,revenue_monthly_extra,revenue_monthly_mta_tax,revenue_monthly_tip_amount,revenue_monthly_tolls_amount,revenue_monthly_improvement_surcharge,revenue_monthly_total_amount,revenue_monthly_congestion_surcharge,total_monthly_trips,avg_monthly_passenger_count,avg_monthly_trip_distance
0,1,1,green,491.0,0.0,0.0,74.41,0.0,1.5,566.91,0.0,5,1.2,0.0
1,1,1,yellow,65249.83,33.5,115.0,9530.27,2913.45,237.3,78086.85,7.5,808,1.582609,0.929022
2,1,2,green,384.5,1.0,0.5,53.34,0.0,1.5,440.84,0.0,5,1.0,2.8
3,1,2,yellow,52619.82,14.5,89.5,6900.12,2021.26,192.9,61845.05,5.0,656,1.509924,1.071829
4,1,3,green,481.0,0.0,1.0,90.76,0.0,2.1,574.86,0.0,7,1.285714,0.0
5,1,3,yellow,31782.28,15.8,55.5,4158.57,1201.93,118.2,37347.28,15.0,400,1.433584,1.317425
6,1,4,green,407.05,0.0,0.5,1.0,0.0,1.2,412.25,0.0,4,1.0,4.6825
7,1,4,yellow,7635.41,6.8,17.0,630.21,124.6,26.7,8448.22,7.5,92,1.247191,2.431848
8,1,5,yellow,9761.66,1.0,23.0,1339.19,227.05,39.3,11396.2,2.5,144,1.608696,0.966458
9,1,6,green,194.0,0.0,0.0,31.66,0.0,0.6,226.26,0.0,2,1.5,0.03


In [31]:
df.shape

(6217, 14)