In [1]:
import requests
import pandas as pd
import aco_lib
import os
import datetime

### Downloading Dataset 

In [113]:
taxi_type = 'green'
year = '2020'

# download_file(f'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{taxi_type}/{taxi_type}_tripdata_{year}-01.csv.gz', f'data/raw/{taxi_type}/{year}')

aco_lib.download_file(f'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{taxi_type}/{taxi_type}_tripdata_{year}-01.csv.gz', f'../data/raw/{taxi_type}/{year}')

aco_lib.download_file('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv', f'../data/lookup')

'taxi_zone_lookup.csv'

In [33]:
for taxi_type in ['green', 'yellow']:
    for year in ['2020']:
        for month in range(1,13):
            fmonth = f'{month:02d}'

            result = aco_lib.download_file(f'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{taxi_type}/{taxi_type}_tripdata_{year}-{fmonth}.csv.gz', f'../data/raw/{taxi_type}/{year}')

            print(result)


green_tripdata_2020-01.csv.gz
green_tripdata_2020-02.csv.gz
green_tripdata_2020-03.csv.gz
green_tripdata_2020-04.csv.gz
green_tripdata_2020-05.csv.gz
green_tripdata_2020-06.csv.gz
green_tripdata_2020-07.csv.gz
green_tripdata_2020-08.csv.gz
green_tripdata_2020-09.csv.gz
green_tripdata_2020-10.csv.gz
green_tripdata_2020-11.csv.gz
green_tripdata_2020-12.csv.gz
yellow_tripdata_2020-01.csv.gz
yellow_tripdata_2020-02.csv.gz
yellow_tripdata_2020-03.csv.gz
yellow_tripdata_2020-04.csv.gz
yellow_tripdata_2020-05.csv.gz
yellow_tripdata_2020-06.csv.gz
yellow_tripdata_2020-07.csv.gz
yellow_tripdata_2020-08.csv.gz
yellow_tripdata_2020-09.csv.gz
yellow_tripdata_2020-10.csv.gz
yellow_tripdata_2020-11.csv.gz
yellow_tripdata_2020-12.csv.gz


### Importing pyspark

In [50]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import DataFrame as SparkDataFrame

In [51]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('test') \
        .getOrCreate()

In [52]:
def read_local_csv(spark: SparkSession, year: int, month: int, months_n: int, taxi_type: str, schema: types.StructType) -> SparkDataFrame:
    
    """
    Function to read local csv

    Args:
      - spark (SparkSession): The spark session
      - dir_path (string): It is the path for the main.py
      - year (int): The year of data taken
      - month (int): The month of data taken
      - months_n (int): The number of months back of data retrieved
      - taxi_type (str): The type of taxi
      - schema (types.StructType): The schema used for the data
    """

    date_lst = aco_lib.get_last_months(months_n, year, month)

    path_lst = [f'../data/raw/{taxi_type}/{date_year}/{taxi_type}_tripdata_{date_year}-{date_month:02d}.csv' for date_year, date_month in date_lst]

    
    df = spark.read \
          .option('header', 'true') \
          .schema(schema) \
          .csv(path_lst)

    return df

In [53]:
df = read_local_csv(spark, 2020, 12, 6, 'green', schema_green)
df.show()

AnalysisException: Path does not exist: file:/home/aco/nyc_taxi/data/raw/green/2020/green_tripdata_2020-12.csv

### Reading csv files 

In [54]:
df = spark.read \
    .option('header', 'true') \
    .csv(f'../data/raw/green/2020/green_tripdata_2020-04.csv.gz')

                                                                                

In [55]:
df.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-04-01 00:44:02|  2020-04-01 00:52:23|                 N|         1|          42|          41|              1|         1.68|          8|  0.5|    0.

In [56]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



## Handling data type using pandas 

green data

In [7]:
df_pandas = pd.read_csv('../data/raw/green/2020/green_tripdata_2020-01.csv', nrows=1000)
df_pandas.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2019-12-18 15:52:30,2019-12-18 15:54:39,N,1,264,264,5,0.0,3.5,0.5,0.5,0.01,0.0,,0.3,4.81,1,1,0.0
1,2,2020-01-01 00:45:58,2020-01-01 00:56:39,N,5,66,65,2,1.28,20.0,0.0,0.0,4.06,0.0,,0.3,24.36,1,2,0.0
2,2,2020-01-01 00:41:38,2020-01-01 00:52:49,N,1,181,228,1,2.47,10.5,0.5,0.5,3.54,0.0,,0.3,15.34,1,1,0.0
3,1,2020-01-01 00:52:46,2020-01-01 01:14:21,N,1,129,263,2,6.3,21.0,3.25,0.5,0.0,0.0,,0.3,25.05,2,1,2.75
4,1,2020-01-01 00:19:57,2020-01-01 00:30:56,N,1,210,150,1,2.3,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,1,1,0.0


In [18]:
spark.createDataFrame(df_pandas).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('VendorID', LongType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', LongType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', LongType(), True), StructField('trip_type', LongType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [57]:
schema_green = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True), 
    types.StructField('lpep_pickup_datetime', types.TimestampType(), True), 
    types.StructField('lpep_dropoff_datetime', types.TimestampType(), True), 
    types.StructField('store_and_fwd_flag', types.StringType(), True), 
    types.StructField('RatecodeID', types.IntegerType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('passenger_count', types.IntegerType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True), 
    types.StructField('fare_amount', types.DoubleType(), True), 
    types.StructField('extra', types.DoubleType(), True), 
    types.StructField('mta_tax', types.DoubleType(), True), 
    types.StructField('tip_amount', types.DoubleType(), True), 
    types.StructField('tolls_amount', types.DoubleType(), True), 
    types.StructField('ehail_fee', types.DoubleType(), True), 
    types.StructField('improvement_surcharge', types.DoubleType(), True), 
    types.StructField('total_amount', types.DoubleType(), True), 
    types.StructField('payment_type', types.IntegerType(), True), 
    types.StructField('trip_type', types.IntegerType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True)])

In [10]:
type(schema_green)

pyspark.sql.types.StructType

In [59]:
df = spark.read \
    .option("header", "true") \
    .schema(schema_green) \
    .csv('../data/raw/green/2020/green_tripdata_2020-04.csv.gz')

In [46]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



yellow data

In [47]:
df_pandas = pd.read_csv('../data/raw/yellow/2020/yellow_tripdata_2020-01.csv', nrows=1000)
df_pandas.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2020-01-01 00:28:15,2020-01-01 00:33:03,1,1.2,1,N,238,239,1,6.0,3.0,0.5,1.47,0.0,0.3,11.27,2.5
1,1,2020-01-01 00:35:39,2020-01-01 00:43:04,1,1.2,1,N,239,238,1,7.0,3.0,0.5,1.5,0.0,0.3,12.3,2.5
2,1,2020-01-01 00:47:41,2020-01-01 00:53:52,1,0.6,1,N,238,238,1,6.0,3.0,0.5,1.0,0.0,0.3,10.8,2.5
3,1,2020-01-01 00:55:23,2020-01-01 01:00:14,1,0.8,1,N,238,151,1,5.5,0.5,0.5,1.36,0.0,0.3,8.16,0.0
4,2,2020-01-01 00:01:58,2020-01-01 00:04:16,1,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,4.8,0.0


In [48]:
spark.createDataFrame(df_pandas).schema

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


StructType([StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True)])

In [55]:
schema_yellow = types.StructType([
types.StructField('VendorID', types.IntegerType(), True), 
types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
types.StructField('passenger_count', types.IntegerType(), True), 
types.StructField('trip_distance', types.DoubleType(), True), 
types.StructField('RatecodeID', types.IntegerType(), True), 
types.StructField('store_and_fwd_flag', types.StringType(), True), 
types.StructField('PULocationID', types.IntegerType(), True), 
types.StructField('DOLocationID', types.IntegerType(), True), 
types.StructField('payment_type', types.IntegerType(), True), 
types.StructField('fare_amount', types.DoubleType(), True), 
types.StructField('extra', types.DoubleType(), True), 
types.StructField('mta_tax', types.DoubleType(), True), 
types.StructField('tip_amount', types.DoubleType(), True), 
types.StructField('tolls_amount', types.DoubleType(), True), 
types.StructField('improvement_surcharge', types.DoubleType(), True), 
types.StructField('total_amount', types.DoubleType(), True), 
types.StructField('congestion_surcharge', types.DoubleType(), True)])



## Write into parquet 


green data

In [53]:
for month in range(1,13):
    fmonth = f'{month:02d}'
    df = spark.read \
    .option("header", "true") \
    .schema(schema_green) \
    .csv(f'../data/raw/green/2020/green_tripdata_2020-{fmonth}.csv')

    df = df.repartition(24)

    df.write.parquet(f'../data/parquet/green/2020/{fmonth}/', mode='overwrite')

                                                                                


yellow data

In [56]:
for month in range(1,13):
    fmonth = f'{month:02d}'
    df = spark.read \
    .option("header", "true") \
    .schema(schema_yellow) \
    .csv(f'../data/raw/yellow/2020/yellow_tripdata_2020-{fmonth}.csv')

    df = df.repartition(24)

    df.write.parquet(f'../data/parquet/yellow/2020/{fmonth}/', mode='overwrite')

                                                                                

23/07/11 15:02:35 WARN BasicWriteTaskStatsTracker: Expected 1 files, but only saw 0. This could be due to the output format not writing empty files, or files being not immediately visible in the filesystem.


[Stage 90:==>                                                      (1 + 4) / 24]

23/07/11 15:02:35 WARN BasicWriteTaskStatsTracker: Expected 1 files, but only saw 0. This could be due to the output format not writing empty files, or files being not immediately visible in the filesystem.


[Stage 90:====>                                                    (2 + 4) / 24]

23/07/11 15:02:37 WARN BasicWriteTaskStatsTracker: Expected 1 files, but only saw 0. This could be due to the output format not writing empty files, or files being not immediately visible in the filesystem.




23/07/11 15:02:37 WARN BasicWriteTaskStatsTracker: Expected 1 files, but only saw 0. This could be due to the output format not writing empty files, or files being not immediately visible in the filesystem.


                                                                                

## Spark Dataframe

In [60]:
df = spark.read.parquet('../data/parquet/yellow/2020/01/')

In [61]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



Column selection

In [63]:
df.select('tpep_pickup_datetime', 'tpep_pickup_datetime', 'PULocationID', 'DOLocationID').head(4)

[Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 22, 15, 43), tpep_dropoff_datetime=datetime.datetime(2020, 1, 3, 22, 18, 43), PULocationID=236, DOLocationID=236),
 Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 9, 54, 11), tpep_dropoff_datetime=datetime.datetime(2020, 1, 3, 9, 59, 25), PULocationID=234, DOLocationID=186),
 Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 8, 34, 27), tpep_dropoff_datetime=datetime.datetime(2020, 1, 3, 8, 43, 50), PULocationID=161, DOLocationID=143),
 Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 18, 17, 51), tpep_dropoff_datetime=datetime.datetime(2020, 1, 3, 19, 1, 28), PULocationID=7, DOLocationID=233)]

filtering by values

In [64]:
df.select('tpep_pickup_datetime', 'tpep_pickup_datetime', 'PULocationID', 'DOLocationID').filter(df['VendorID'] == 2).head(4)

[Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 22, 15, 43), tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 22, 15, 43), PULocationID=236, DOLocationID=236),
 Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 9, 54, 11), tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 9, 54, 11), PULocationID=234, DOLocationID=186),
 Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 8, 34, 27), tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 8, 34, 27), PULocationID=161, DOLocationID=143),
 Row(tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 18, 17, 51), tpep_pickup_datetime=datetime.datetime(2020, 1, 3, 18, 17, 51), PULocationID=7, DOLocationID=233)]

## Combining 2 datasets

In [17]:
df_green = spark.read.parquet('../data/parquet/green/*/*')
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')


df_yellow = spark.read.parquet('../data/parquet/yellow/*/*')
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')


In [18]:
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [19]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [20]:
common_colums

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

23/07/13 09:23:43 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 56593084 ms exceeds timeout 120000 ms
23/07/13 09:23:46 WARN SparkContext: Killing executors is not supported by current scheduler.
23/07/13 09:23:47 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage

In [72]:
from pyspark.sql import functions as F

df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))

In [74]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [78]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 1734051|
|      yellow|29986003|
+------------+--------+



                                                                                

### Querying a dataset with temp tables

In [79]:
df_trips_data.createOrReplaceTempView('trips_data')

In [80]:
spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""").show()



+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green| 1734051|
|      yellow|29986003|
+------------+--------+



                                                                                

In [87]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_monthly_passenger_count,
    AVG(trip_distance) AS avg_monthly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [83]:
df_result.write.parquet('../data/report/revenue/')

                                                                                

In [91]:
df_result.coalesce(1).write.parquet('../data/report/revenue/', mode='overwrite')

                                                                                

## Joins in spark

In [93]:
df_result = spark.read.parquet('../data/report/revenue/')

In [94]:
df_result.show()

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_monthly_passenger_count|avg_monthly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+
|          33

In [60]:
df_zones = spark.read.csv('../data/lookup/taxi_zone_lookup.csv', header=True)

In [61]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [63]:
df.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-04-01 00:44:02|  2020-04-01 00:52:23|                 N|         1|          42|          41|              1|         1.68|        8.0|  0.5|    0.

In [64]:
df_result_joined = df.join(df_zones, df['PULocationID'] == df_zones['LocationID'])



In [65]:
for i in ['Borough', 'Zone']:
    df_result_joined = df_result_joined.withColumnRenamed(i,
                                  'prefix_' + i)

In [67]:
df_result_joined.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+----------+--------------+--------------------+------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|LocationID|prefix_Borough|         prefix_Zone|service_zone|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+----------+--------------+------------------

In [105]:
df_result_joined.show()

+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+----------+-------------+--------------------+------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_monthly_passenger_count|avg_monthly_trip_distance|LocationID|      Borough|                Zone|service_zone|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------

In [107]:
df_result_joined.drop('revenue_zone').show()

+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+---------------------------+-------------------------+----------+-------------+--------------------+------------+
|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_monthly_passenger_count|avg_monthly_trip_distance|LocationID|      Borough|                Zone|service_zone|
+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+--------------------------

In [108]:


df_result_joined.drop('LocationID').write.parquet('../data/report/revenue_zones/')

