In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.types as t
import pyspark.sql.functions as f
from pyspark.sql import Window

In [2]:
spark_session = (SparkSession.builder.master('local').appName('NYC Taxi app').config(conf=SparkConf()).getOrCreate())

In [3]:
spark_session

In [4]:
data = [("Maksym", 19), ("Roman", 19), ("Borys", 19)]
simple_schema = t.StructType([
    t.StructField("FirstName", t.StringType(), True),
    t.StructField("Age", t.IntegerType(), True)
])
simple_df = spark_session.createDataFrame(data, simple_schema)


In [5]:
simple_df.printSchema()

root
 |-- FirstName: string (nullable = true)
 |-- Age: integer (nullable = true)



In [96]:
trip_data_paths = [
    'trip_data_1.csv',
    'trip_data_2.csv',
    'trip_data_3.csv',
    'trip_data_4.csv',
    'trip_data_5.csv',
    'trip_data_6.csv',
    'trip_data_7.csv',
    'trip_data_8.csv',
    'trip_data_9.csv',
    'trip_data_10.csv',
    'trip_data_11.csv',
    'trip_data_12.csv',
] 
trip_fare_paths = [
    'trip_fare_1.csv',
    'trip_fare_2.csv',
    'trip_fare_3.csv',
    'trip_fare_4.csv',
    'trip_fare_5.csv',
    'trip_fare_6.csv',
    'trip_fare_7.csv',
    'trip_fare_8.csv',
    'trip_fare_9.csv',
    'trip_fare_10.csv',
    'trip_fare_11.csv',
    'trip_fare_12.csv',
]

In [97]:
trip_data_schema = t.StructType([
    t.StructField('medallion', t.StringType(), True),
    t.StructField('hack_license', t.StringType(), True),
    t.StructField('vendor_id', t.StringType(), True),
    t.StructField('rate_code', t.IntegerType(), True),
    t.StructField('store_and_fwd_flag', t.StringType(), True),
    t.StructField('pickup_datetime', t.TimestampType(), True),
    t.StructField('dropoff_datetime', t.TimestampType(), True),
    t.StructField('passenger_count', t.IntegerType(), True),
    t.StructField('trip_time_in_secs', t.IntegerType(), True),
    t.StructField('trip_distance', t.FloatType(), True),
    t.StructField('pickup_longitude', t.DoubleType(), True),
    t.StructField('pickup_latitude', t.DoubleType(), True),
    t.StructField('dropoff_longitude', t.DoubleType(), True),
    t.StructField('dropoff_latitude', t.DoubleType(), True)
])

In [98]:
trip_fare_schema = t.StructType([
    t.StructField('medallion', t.StringType(), True),
    t.StructField('hack_license', t.StringType(), True),
    t.StructField('vendor_id', t.StringType(), True),
    t.StructField('pickup_datetime', t.TimestampType(), True),
    t.StructField('payment_type', t.StringType(), True),
    t.StructField('fare_amount', t.FloatType(), True),
    t.StructField('surcharge', t.FloatType(), True),
    t.StructField('mta_tax', t.FloatType(), True),
    t.StructField('tip_amount', t.FloatType(), True),
    t.StructField('tolls_amount', t.FloatType(), True),
    t.StructField('total_amount', t.FloatType(), True)
])

In [99]:
trip_data_df = spark_session.read.csv(
                                    trip_data_paths[0], 
                                    header=True, 
                                    nullValue='null',
                                    dateFormat='yyyy-MM-dd HH:mm:ss',
                                    schema=trip_data_schema
                                    )
trip_fare_df = spark_session.read.csv(
                                     trip_fare_paths[0], 
                                     header=True,
                                     nullValue='null',
                                     dateFormat='yyyy-MM-dd HH:mm:ss',
                                     schema=trip_fare_schema
                                     )

In [100]:
for i in range(1, len(trip_data_paths), 1):
    trip_data_df = trip_data_df.union(spark_session.read.csv(
                                    trip_data_paths[i], 
                                    header=True, 
                                    nullValue='null',
                                    dateFormat='yyyy-MM-dd HH:mm:ss',
                                    schema=trip_data_schema
                                    ))
    trip_fare_df = trip_fare_df.union(spark_session.read.csv(
                                     trip_fare_paths[i], 
                                     header=True,
                                     nullValue='null',
                                     dateFormat='yyyy-MM-dd HH:mm:ss',
                                     schema=trip_fare_schema
                                     ))
    

In [12]:
print(trip_data_df.count())
print(trip_fare_df.count())

44516019
44516019


In [13]:
trip_data_df = trip_data_df.dropna()

In [14]:
trip_fare_df = trip_fare_df.dropna()

In [15]:
trip_data_df.explain()

== Physical Plan ==
Union
:- *(1) Filter atleastnnonnulls(14, medallion#4, hack_license#5, vendor_id#6, rate_code#7, store_and_fwd_flag#8, pickup_datetime#9, dropoff_datetime#10, passenger_count#11, trip_time_in_secs#12, trip_distance#13, pickup_longitude#14, pickup_latitude#15, dropoff_longitude#16, dropoff_latitude#17)
:  +- FileScan csv [medallion#4,hack_license#5,vendor_id#6,rate_code#7,store_and_fwd_flag#8,pickup_datetime#9,dropoff_datetime#10,passenger_count#11,trip_time_in_secs#12,trip_distance#13,pickup_longitude#14,pickup_latitude#15,dropoff_longitude#16,dropoff_latitude#17] Batched: false, DataFilters: [atleastnnonnulls(14, medallion#4, hack_license#5, vendor_id#6, rate_code#7, store_and_fwd_flag#8..., Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/maxde/nyc-taxi-spark-project/trip_data_1.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<medallion:string,hack_license:string,vendor_id:string,rate_code:int,store_and_fwd_flag:str...
:- *(2) Filt

In [16]:
trip_data_df.show()

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N|2013-01-01 15:11:48|2013-01-01 15:18:10|              4|              382|          1.0|      -73.978165|      40.757977|       -73.989838|       40.751171|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...| 

In [17]:
trip_fare_df.show()

+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|2013-01-01 15:11:48|         CSH|        6.5|      0.0|    0.5|       0.0|         0.0|         7.0|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      CMT|2013-01-06 00:18:35|         CSH|        6.0|      0.5|    0.5|       0.0|         0.0|         7.0|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      CMT|2013-01-05 18:49:41|         CSH|        5.5|      1.0|    0.5|       0.0|         0.0|         7.0|
|DFD2202EE08F7A8DC...|51EE87E3205C985EF...|      CMT|2013-01-07 23:54:15|   

In [18]:
trip_data_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)



In [19]:
trip_fare_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: float (nullable = true)



In [20]:
trip_data_df.describe().show()

+-------+--------------------+--------------------+---------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+
|summary|           medallion|        hack_license|vendor_id|         rate_code|store_and_fwd_flag|   passenger_count| trip_time_in_secs|     trip_distance|  pickup_longitude|  pickup_latitude| dropoff_longitude| dropoff_latitude|
+-------+--------------------+--------------------+---------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+
|  count|            22418017|            22418017| 22418017|          22418017|          22418017|          22418017|          22418017|          22418017|          22418017|         22418017|          22418017|         22418017|
|   mean|                null|                null|     null|1.0331022141699

In [21]:
trip_fare_df.describe().show()

+-------+--------------------+--------------------+---------+------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+
|summary|           medallion|        hack_license|vendor_id|payment_type|       fare_amount|          surcharge|            mta_tax|        tip_amount|       tolls_amount|      total_amount|
+-------+--------------------+--------------------+---------+------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+
|  count|            44516019|            44516019| 44516019|    44516019|          44516019|           44516019|           44516019|          44516019|           44516019|          44516019|
|   mean|                null|                null|     null|        null|11.811171271636024|0.31866156854682565|0.49840430475150976|1.2947101801197403|0.21223752693827827| 14.13518485850585|
| stddev|                null|          

In [22]:
print(trip_data_df.count())
print(trip_fare_df.count())

22418017
44516019


In [102]:
trip_data_df = trip_data_df.withColumn('pickup_year', 
                                       f.col('pickup_datetime')
                                       .substr(startPos=0, length=4))

trip_data_df = trip_data_df.withColumn('pickup_month',
                                      f.col('pickup_datetime')
                                       .substr(startPos=6, length=2))

trip_data_df = trip_data_df.withColumn('pickup_day',
                                      f.col('pickup_datetime')
                                       .substr(startPos=9, length=2))

trip_data_df = trip_data_df.withColumn('pickup_time',
                                      f.col('pickup_datetime')
                                       .substr(startPos=11, length=8))

trip_data_df = trip_data_df.withColumn('dropoff_year',
                                      f.col('dropoff_datetime')
                                       .substr(startPos=0, length=4))

trip_data_df = trip_data_df.withColumn('dropoff_month',
                                      f.col('dropoff_datetime')
                                      .substr(startPos=6, length=2))

trip_data_df = trip_data_df.withColumn('dropoff_day', 
                                      f.col('dropoff_datetime')
                                      .substr(startPos=9, length=2))

trip_data_df = trip_data_df.withColumn('dropoff_time', 
                                      f.col('dropoff_datetime')
                                      .substr(startPos=11, length=8))



In [103]:
trip_fare_df = trip_fare_df.withColumn('pickup_year', 
                                       f.col('pickup_datetime')
                                       .substr(startPos=0, length=4))

trip_fare_df = trip_fare_df.withColumn('pickup_month',
                                      f.col('pickup_datetime')
                                       .substr(startPos=6, length=2))

trip_fare_df = trip_fare_df.withColumn('pickup_day',
                                      f.col('pickup_datetime')
                                       .substr(startPos=9, length=2))

trip_fare_df = trip_fare_df.withColumn('pickup_time',
                                      f.col('pickup_datetime')
                                       .substr(startPos=11, length=8))

In [25]:
trip_data_df.show()

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+------------+----------+-----------+------------+-------------+-----------+------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_year|pickup_month|pickup_day|pickup_time|dropoff_year|dropoff_month|dropoff_day|dropoff_time|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+------------+----------+-----------+------------+-------------+---------

In [76]:
trip_fare_df.show()

+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+-----------+------------+----------+-----------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|pickup_year|pickup_month|pickup_day|pickup_time|
+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+-----------+------------+----------+-----------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|2013-01-01 15:11:48|         CSH|        6.5|      0.0|    0.5|       0.0|         0.0|         7.0|       2013|          01|        01|    15:11:4|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      CMT|2013-01-06 00:18:35|         CSH|        6.0|      0.5|    0.5|       0.0|         0.0|         7.0|       2013|          01|        06|   

In [26]:
# 1 Did the drivers fulfill the monthly norm for the number of transportations for January 2013 ?

In [27]:
travel_time_df = trip_data_df.select(f.avg('trip_time_in_secs'))

In [28]:
avg_time_travel_value = travel_time_df.first()['avg(trip_time_in_secs)']

In [29]:
FULL_TIME_WORK_DAY_IN_SECS = 28800

In [30]:
MINIMUM_AMOUNT_OF_TRIPS = round(FULL_TIME_WORK_DAY_IN_SECS/avg_time_travel_value)

In [31]:
broadcast_amount_of_trips = spark_session.sparkContext.broadcast(MINIMUM_AMOUNT_OF_TRIPS)

In [32]:
new_data_df = trip_data_df.filter(
                                  (f.col('pickup_year') == '2013')
                                     &
                                  (f.col('pickup_month') == '01')
                                 )

In [33]:
info_df = new_data_df.groupBy('hack_license')\
                     .agg(f.count('hack_license').alias('total_trips_amount_by_jan_2013'))\
                     .orderBy('total_trips_amount_by_jan_2013', ascending=False) 

In [34]:
info_df = info_df.withColumn('minimal_trips_amount', f.lit(broadcast_amount_of_trips.value))

In [35]:
info_df = info_df.withColumn('is_min_amount_fulfiled', 
                             f.when(info_df.total_trips_amount_by_jan_2013 >= info_df.minimal_trips_amount, 'Y').otherwise('N'))

In [36]:
info_df.show()

+--------------------+------------------------------+--------------------+----------------------+
|        hack_license|total_trips_amount_by_jan_2013|minimal_trips_amount|is_min_amount_fulfiled|
+--------------------+------------------------------+--------------------+----------------------+
|CE625FD96D0FAFC81...|                          1556|                  41|                     Y|
|3D757E111C78F5CAC...|                          1514|                  41|                     Y|
|3AAB94CA53FE93A64...|                          1486|                  41|                     Y|
|03173DD93C1171DA1...|                          1382|                  41|                     Y|
|2BF7915E6DC625234...|                          1379|                  41|                     Y|
|847349F8845A667D9...|                          1325|                  41|                     Y|
|C4E07B8C06FAB8E54...|                          1325|                  41|                     Y|
|3578782ABD6492CEB..

In [37]:
# 2 Where are passengers picked up and dropped off most often?

In [38]:
DECIMAL_PLACES = 3

In [39]:
geolocation_df = trip_data_df.select('pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude')
geolocation_df = geolocation_df.filter(
                                       (f.col('pickup_longitude') != 0.0) & 
                                       (f.col('pickup_latitude') != 0.0) &
                                       (f.col('dropoff_longitude') != 0.0) &
                                       (f.col('dropoff_latitude') != 0.0)
                                      )

geolocation_pickup_df = geolocation_df.withColumns({'pickup_longitude_rounded':f.round(f.col('pickup_longitude'), DECIMAL_PLACES),
                                                  'pickup_latitude_rounded':f.round(f.col('pickup_latitude'), DECIMAL_PLACES)})
geolocation_dropoff_df = geolocation_df.withColumns({'dropoff_longitude_rounded':f.round(f.col('dropoff_longitude'), DECIMAL_PLACES),
                                                 'dropoff_latitude_rounded':f.round(f.col('dropoff_latitude'), DECIMAL_PLACES)})

In [40]:
group_pickup_df = geolocation_pickup_df.groupBy("pickup_longitude_rounded", "pickup_latitude_rounded")\
                  .agg(f.count('pickup_latitude_rounded').alias('total_captured_passanges'))\
                  .orderBy('total_captured_passanges', ascending=False)

In [41]:
group_dropoff_df = geolocation_dropoff_df.groupBy("dropoff_longitude_rounded", "dropoff_latitude_rounded")\
                  .agg(f.count('dropoff_longitude_rounded').alias('total_delivered_passanges'))\
                  .orderBy('total_delivered_passanges', ascending=False)

In [42]:
group_pickup_df.show()

+------------------------+-----------------------+------------------------+
|pickup_longitude_rounded|pickup_latitude_rounded|total_captured_passanges|
+------------------------+-----------------------+------------------------+
|                 -73.994|                 40.751|                  103090|
|                 -73.991|                  40.75|                   94624|
|                 -73.992|                  40.75|                   89453|
|                 -73.863|                 40.769|                   70022|
|                 -73.871|                 40.774|                   66991|
|                 -73.978|                 40.752|                   64506|
|                 -73.873|                 40.774|                   60372|
|                  -73.99|                 40.757|                   58845|
|                 -73.991|                 40.756|                   57877|
|                 -74.006|                  40.74|                   52671|
|           

In [43]:
group_dropoff_df.show()

+-------------------------+------------------------+-------------------------+
|dropoff_longitude_rounded|dropoff_latitude_rounded|total_delivered_passanges|
+-------------------------+------------------------+-------------------------+
|                  -73.991|                   40.75|                   100509|
|                  -73.992|                  40.749|                    75600|
|                  -73.995|                   40.75|                    73773|
|                  -73.991|                  40.751|                    60052|
|                  -73.978|                  40.752|                    52628|
|                  -73.992|                   40.75|                    46980|
|                   -73.99|                  40.757|                    43981|
|                  -73.977|                  40.752|                    43348|
|                  -73.871|                  40.774|                    41521|
|                  -73.979|                  40.762|

In [44]:
#3 What is the average fare compared to the average tip for each driver 

In [45]:
window_func = Window.partitionBy("hack_license")

In [46]:
avg_fare_tip_df = trip_fare_df.withColumn("avg_fare_amount", f.avg("fare_amount").over(window_func))

In [47]:
avg_fare_tip_df = avg_fare_tip_df.withColumn("avg_tip_amount", f.avg("tip_amount").over(window_func))

In [48]:
result_3_df = avg_fare_tip_df.select("hack_license", "avg_fare_amount", "avg_tip_amount").distinct()

In [49]:
result_3_df.show()

+--------------------+------------------+------------------+
|        hack_license|   avg_fare_amount|    avg_tip_amount|
+--------------------+------------------+------------------+
|001EEDEA00E57988E...|12.276877133105803|1.3789419783644836|
|005DC767BC7CA7827...| 9.873570520965693|1.0283418012240941|
|0074114ECFC6F04AA...|10.533576642335767| 1.232364963046717|
|007944DEAF33EF8D4...|10.438325991189428|1.2403414072591958|
|00927C48BA4C1B2B1...| 9.903508771929825|0.9121052570510328|
|00A290CDF574658C1...|13.762094304960195|1.3525719491893795|
|00AE05F56D451E89E...|11.288732394366198|1.2822238690681684|
|00B938EAB7C933C9B...| 9.738257107540173|1.0834301593327809|
|00C2BC1F860FC66BA...|11.253306878306878|1.3253571417098915|
|00E1B4CE8617887CC...| 9.933431952662723|1.0867159735146887|
|00F6456D2C764D4CC...|11.464147286821705|1.3994428259401777|
|0105AF397277017F5...| 9.621951219512194| 1.150139373146283|
|013DB7F394A06CD24...|11.524844720496894|1.1336180104046876|
|013F4117864733D5E...| 1

In [50]:
# 4 How many tips earned each driver during January 2013

In [51]:
january_df = trip_data_df.filter((f.col('pickup_year') == '2013') & (f.col('pickup_month') == '01'))

In [52]:
trip_data_combined = trip_fare_df.join(january_df, ['medallion', 'hack_license', 'pickup_month'])

In [53]:
total_tip_amount_by_driver = trip_data_combined.groupBy("hack_license")\
                            .agg(f.sum("tip_amount").alias("total_tip_amount"))\
                            .orderBy('total_tip_amount', ascending=False)

In [54]:
total_tip_amount_by_driver.show()

+--------------------+------------------+
|        hack_license|  total_tip_amount|
+--------------------+------------------+
|3AAB94CA53FE93A64...| 2716931.219559759|
|2BF7915E6DC625234...|2615370.0248940587|
|CE625FD96D0FAFC81...|  2509625.71612373|
|3D757E111C78F5CAC...|2507880.4385518506|
|C4E07B8C06FAB8E54...|  2190609.24410522|
|23DF80C977D15141F...| 1910286.724994406|
|314466DCAC0F3733D...| 1903747.138882231|
|847349F8845A667D9...| 1757665.495171398|
|FEAF9F0D8BD84ED94...|1751654.6430503726|
|8ED700AFCB22F6AC1...|1744427.5129508972|
|1802778DABBD76460...| 1743358.135653019|
|704A570E66ABB1941...|1686106.5783829056|
|7F95B1590B7DA7B13...|1655020.2986228839|
|615566EB4F4EDB606...| 1650491.952904433|
|C779D022381EA6EFE...|1634595.8315155506|
|9875258E117C159CA...| 1626919.795090329|
|D425F45DE6E2271BA...|1610717.3569993973|
|95FF48E4CF5F5F51C...|1608564.7179958671|
|3578782ABD6492CEB...|1597679.6766882539|
|92E10581A1EF35744...|1592304.4344895035|
+--------------------+------------

In [91]:
# 6 For each driver, which driver has the highest average number of passengers per trip? 

In [56]:
window_spec = Window.partitionBy("hack_license")

In [89]:
new_df = trip_data_df.withColumn("avg_passenger_count", f.avg("passenger_count").over(window_spec))

In [58]:
new_df = new_df.withColumn("rank", f.rank().over(window_spec.orderBy("avg_passenger_count")))

In [59]:
highest_avg_passenger_count_driver = new_df.filter(f.col("rank") == 1).orderBy(f.desc('avg_passenger_count'))

In [60]:
highest_avg_passenger_count_driver = highest_avg_passenger_count_driver.select("hack_license", "avg_passenger_count").distinct()

In [61]:
highest_avg_passenger_count_driver.show()

+--------------------+-------------------+
|        hack_license|avg_passenger_count|
+--------------------+-------------------+
|0008B3E338CE8C337...|                1.5|
|000A4EBF1CEB9C6BD...| 1.3586387434554974|
|0012703023AC1788D...| 1.4365443425076452|
|001EEDEA00E57988E...|               1.25|
|002D30B52456EC2B4...| 1.0678899082568807|
|003A4E7151035D313...| 1.3176605504587156|
|003B75F3C6DD9A234...| 1.0119047619047619|
|003C68DFE1EBE1205...|                1.0|
|005DC767BC7CA7827...| 1.0038143674507312|
|006F4A25C76331739...| 1.5201793721973094|
|00747A7A258EDA144...|   1.02803738317757|
|00759C8ECBA7565D6...| 1.0236220472440944|
|00771DD1930D7E9DE...|  1.289228159457167|
|0077228F0441716B4...| 1.2657856093979443|
|007944DEAF33EF8D4...|  1.171806167400881|
|0085204AEBDD4D803...| 1.3144560357675112|
|0086B9E2B25EA077F...|  1.433188615533044|
|008B6475D6D0D689B...|  1.002669514148425|
|008BB66DD209F8976...| 1.0051546391752577|
|0091088BB742146A7...| 1.0147379912663756|
+----------

In [90]:
# 5 How did the average fare change during the day for each vendor from January to December 2013?

In [105]:
january_to_december_df = trip_fare_df.filter(trip_fare_df.pickup_year == '2013')

In [106]:
window_spec_ = Window.partitionBy("vendor_id", "pickup_month")

In [107]:
january_to_december_df = january_to_december_df.withColumn("avg_fare_amount",
                                                           f.avg("fare_amount")\
                                                           .over(window_spec_))

In [110]:
result_5_df = january_to_december_df.select("vendor_id", "pickup_month", "avg_fare_amount").distinct()

In [109]:
result_5_df.show()

+---------+------------+------------------+
|vendor_id|pickup_month|   avg_fare_amount|
+---------+------------+------------------+
|      CMT|          06|12.458074280997451|
|      VTS|          07|12.486517200709097|
|      CMT|          07|12.206085362879517|
|      VTS|          12|12.792086398021143|
|      CMT|          10| 12.48274666878785|
|      CMT|          11|12.297992846872287|
|      VTS|          06| 12.74885933650377|
|      VTS|          11|12.590354511199076|
|      VTS|          03| 12.16350612955464|
|      CMT|          02|11.591116324910375|
|      CMT|          05| 12.45473378045649|
|      CMT|          01|11.529089919193797|
|      VTS|          10|12.807381541295982|
|      CMT|          03| 11.90858558995323|
|      VTS|          08|12.696985524742793|
|      CMT|          12|12.460453301926826|
|      VTS|          04|12.407652987031982|
|      CMT|          08| 12.45163098429392|
|      CMT|          09| 12.57394519776087|
|      VTS|          01|11.80267

In [68]:
# Write resulted dataframes after transformation to new CSV files 

In [69]:
info_df.toPandas().to_csv('result1.csv')

In [70]:
group_pickup_df.toPandas().to_csv('result2_1.csv')

In [71]:
group_dropoff_df.toPandas().to_csv('result2_2.csv')

In [72]:
result_3_df.toPandas().to_csv('result3.csv')

In [73]:
total_tip_amount_by_driver.toPandas().to_csv('result4.csv')

In [None]:
result_5_df.toPandas().to_csv('result5.csv')

In [None]:
highest_avg_passenger_count_driver.toPandas().to_csv('result6.csv')

In [None]:
spark_session.close()