# Python Spark- DataFrames - Project 2



This notebook exemplifies the execution of a Spark program in Python, using the DataFrames interface.
In this example, spark runs in standalone mode and reads data from the local filesystem, while in cluster mode data is read typically from HDFS dsitributed file system.

Spark documentation available at:
https://spark.apache.org/docs/2.3.1/


### Download the dataset "Taxi"

In [1]:
!wget -O Taxi_small.csv https://www.dropbox.com/s/mi1el58o88hd5u8/Taxi_Trips_151MB.csv?dl=0
    

--2020-12-08 17:22:10--  https://www.dropbox.com/s/mi1el58o88hd5u8/Taxi_Trips_151MB.csv?dl=0
Resolving www.dropbox.com (www.dropbox.com)... 162.125.68.1, 2620:100:6024:1::a27d:4401
Connecting to www.dropbox.com (www.dropbox.com)|162.125.68.1|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: /s/raw/mi1el58o88hd5u8/Taxi_Trips_151MB.csv [following]
--2020-12-08 17:22:10--  https://www.dropbox.com/s/raw/mi1el58o88hd5u8/Taxi_Trips_151MB.csv
Reusing existing connection to www.dropbox.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://uc5b01a0bd495c758bbe0b7d626b.dl.dropboxusercontent.com/cd/0/inline/BEtGQCZlZyJ_hlIilaBqbJ8VYFShKUiaHH9msio0hxRRRrchs7gAQn6Cps33lq-W7imHzzet_OfF4Ee9RfPY8YUPaakB4qxh57BqBkyXTp0kVnZqUTMSNhGX8Vt8aLg59Ec/file# [following]
--2020-12-08 17:22:11--  https://uc5b01a0bd495c758bbe0b7d626b.dl.dropboxusercontent.com/cd/0/inline/BEtGQCZlZyJ_hlIilaBqbJ8VYFShKUiaHH9msio0hxRRRrchs7gAQn6Cps33lq-W7imHzzet_OfF4Ee9Rf

## 1 - How many trips were started in each year present in the data set?

In [7]:
%%time

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('dataframes_exercise1').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_small.csv')
    logRows = lines.filter( lambda line : len(line) > 0 )   \
                        .map( lambda line : line.split(';') ) \
                        .map( lambda arr : Row(Trip_ID = arr[0], Taxi_ID = arr[1], Year = arr[2].split(" ")[0].split("/")[2], \
                                               Trip_End = arr[3], Trip_Seconds = arr[4], Trip_Miles = arr[5],\
                                               Pickup_ID = arr[6], Dropoff_ID = arr[7], Pickup_area =  arr[8], \
                                               Dropoff_area = arr[9], Fare = arr[10], Tips = arr[11], Tolls = arr[12], \
                                               Extras = arr[13], Trip_Total= arr[14],Payment_type = arr[15] , \
                                               Company = arr[16], Pickup_Centroid_Lat  = arr[17], \
                                               Pickup_Centroid_Lon = arr[18], Pickup_Centroid_Loc = arr[19], \
                                               Dropoff_Centroid_Lat = arr[20], Dropoff_Centroid_Lon = arr[21], \
                                               Dropoff_Centroid_Lo = arr[22] ))
    
    logRowsDF = spark.createDataFrame( logRows )
    
    year =  logRowsDF.groupBy('Year').count()
    
    year = year.orderBy('Year',ascending=True).show()

    
    sc.stop()
except:
    print(err)
    sc.stop()


+----+-----+
|Year|count|
+----+-----+
|2013|54409|
|2014|74753|
|2015|64761|
|2016|63628|
|2017|50006|
|2018|41567|
|2019|32797|
|2020| 6829|
+----+-----+

CPU times: user 60 ms, sys: 39.3 ms, total: 99.2 ms
Wall time: 10.5 s


## 2 - For each of the 24 hours of the day, how many taxi trips there were, what was their average trip miles and trip total cost?

### Exercise version 2A - pre-processing during Spark-RDD map transformation

In [6]:
%%time

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('dataframes_exercise2a').getOrCreate()
sc = spark.sparkContext

try :
    
    lines = sc.textFile('Taxi_small.csv')
    logRows = lines.filter(lambda line : len(line) > 0 ) \
        .map(lambda line : line.strip()) \
        .map(lambda line : line.split(';')) \
        .filter(lambda arr: arr[2] != '' and arr[5] != '' and arr[5]!='0') \
        .map(lambda arr: arr[:10]+list([float(arr[10].replace(',',''))])+arr[11:] if (arr[10]!='') else (arr[:10]+list([0])+arr[11:]))\
        .map(lambda arr: arr[:11]+list([float(arr[11].replace(',',''))])+arr[12:] if (arr[11]!='') else (arr[:11]+list([0])+arr[12:]))\
        .map(lambda arr: arr[:12]+list([float(arr[12].replace(',',''))])+arr[13:] if (arr[12]!='') else (arr[:12]+list([0])+arr[13:]))\
        .map(lambda arr: arr[:13]+list([float(arr[13].replace(',',''))])+arr[14:] if (arr[13]!='') else (arr[:13]+list([0])+arr[14:]))\
        .map(lambda arr: arr[:14]+list([float(arr[14].replace(',',''))])+arr[15:] if (arr[14]!='') else (arr[:14]+list([0])+arr[15:]))\
        .filter(lambda arr: arr[10]!=0 or arr[11]!=0 or arr[12]!=0 or arr[13]!=0 or arr[14]!=0) \
        .map(lambda arr: arr if (arr[10]+ arr[11]+ arr[12]+arr[13]==arr[14] )\
                             else (arr[:14]+[arr[10]+ arr[11]+ arr[12]+arr[13]]+arr[15:]))\
        .map( lambda arr : Row( hour = arr[2][-11:-9] + " " + arr[2][-2:], miles=float(arr[5].replace(',','')),\
                               cost= arr[14]))
    
    tripRowsDF = spark.createDataFrame( logRows )
    
    
    taxi = tripRowsDF.groupBy("hour").agg(count('hour').alias('trips'),
        round(avg("miles"),2).alias('avg_miles'),
        round(avg("cost"),2).alias('avg_cost'))
    
    taxi.orderBy("hour").show(taxi.count())
   
    
    
    
except Exception as err:
    print(err)
    sc.stop()


+-----+-----+---------+--------+
| hour|trips|avg_miles|avg_cost|
+-----+-----+---------+--------+
|01 AM| 8537|     3.22|   14.25|
|01 PM|15809|     4.31|   16.57|
|02 AM| 6637|     3.12|   13.05|
|02 PM|15589|     4.58|   17.01|
|03 AM| 4783|     3.42|   13.44|
|03 PM|16351|     4.54|    18.0|
|04 AM| 3325|     5.26|   16.64|
|04 PM|17371|     4.18|   17.19|
|05 AM| 3110|     7.81|   22.76|
|05 PM|19029|     3.84|   15.63|
|06 AM| 4572|     6.85|   20.81|
|06 PM|20326|     3.68|   15.07|
|07 AM| 8427|     4.79|   17.18|
|07 PM|19858|     3.77|   15.68|
|08 AM|12819|     3.65|   14.41|
|08 PM|17032|     4.15|   16.32|
|09 AM|14182|     4.15|   15.31|
|09 PM|15428|     4.35|   16.75|
|10 AM|13728|     4.27|   16.09|
|10 PM|14384|     3.99|   15.88|
|11 AM|14911|     4.45|   17.04|
|11 PM|12487|     4.01|   15.33|
|12 AM|10244|     3.74|   14.56|
|12 PM|15821|     4.24|   16.58|
+-----+-----+---------+--------+

CPU times: user 141 ms, sys: 32.8 ms, total: 173 ms
Wall time: 16.7 s


### Exercise version 2B - pre-processing of data after DataFrame creation

In [8]:
%%time

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('dataframes_exercise2b').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_small.csv')
    logRows = lines.filter(lambda line : len(line) > 0 ) \
        .map(lambda line : line.strip()) \
        .map(lambda line : line.split(';')) \
        .map( lambda arr : Row( hour = arr[2][-11:-9] + " " + arr[2][-2:], 
                               miles = arr[5], cost= arr[14], Fare = arr[10], Tips = arr[11], Tolls = arr[12], \
                                               Extras = arr[13]))
    
    taxistrips = spark.createDataFrame( logRows )
    
    
    processed=taxistrips.where((col('miles')!="")& (col('miles')!="0"))
    
    processed=processed.withColumn("Miles",regexp_replace(col("miles"), ",", "")).drop(processed.miles)
   
    
    processed=processed.withColumn('fare',when(processed.Fare !='',regexp_replace(col("Fare"), ",", "")).otherwise(0))\
    .drop(processed.Fare)\
    .withColumn('tips',when(processed.Tips !='',regexp_replace(col("Tips"), ",", "")).otherwise(0))\
    .drop(processed.Tips)\
    .withColumn('tolls',when(processed.Tolls !='',regexp_replace(col("Tolls"), ",", "")).otherwise(0))\
    .drop(processed.Tolls)\
    .withColumn('extras',when(processed.Extras !='',regexp_replace(col("Extras"), ",", "")).otherwise(0))\
    .drop(processed.Extras)\
    .withColumn('t_cost',when(processed.cost !='',regexp_replace(col("cost"), ",", "")).otherwise(0))\
    .drop(processed.cost)
    
    processed=processed.where((col('fare')!=0) | (col('tips')!=0) | (col('tolls')!=0) | (col('extras')!=0) | (col('t_cost')!=0))
    
    
    processed=processed.withColumn('total_cost',when((processed.fare+processed.tips+processed.tolls+processed.extras \
                                  !=processed.t_cost)&(processed.fare+processed.tips+processed.tolls+processed.extras \
                                  !=0),processed.fare+processed.tips+processed.tolls+processed.extras)\
                                   .otherwise(processed.t_cost))\
    .drop(processed.t_cost)
    
   
   
    
    taxi = processed.groupBy("hour").agg(count('hour').alias('trips'),
        round(avg("Miles"),2).alias('avg_miles'),
        round(avg("total_cost"),2).alias('avg_cost'))
    
    taxi.orderBy("hour").show(taxi.count())
    
    
    
    
except Exception as err:
    print(err)
    sc.stop()

+-----+-----+---------+--------+
| hour|trips|avg_miles|avg_cost|
+-----+-----+---------+--------+
|01 AM| 8528|     3.21|   14.26|
|01 PM|15752|     4.28|   16.63|
|02 AM| 6634|     3.12|   13.05|
|02 PM|15519|     4.55|   17.09|
|03 AM| 4774|     3.39|   13.47|
|03 PM|16268|      4.5|   18.09|
|04 AM| 3296|     5.21|   16.79|
|04 PM|17307|     4.15|   17.25|
|05 AM| 3052|     7.68|   23.19|
|05 PM|18987|     3.82|   15.66|
|06 AM| 4516|      6.8|   21.07|
|06 PM|20291|     3.66|    15.1|
|07 AM| 8348|     4.74|   17.34|
|07 PM|19822|     3.74|   15.71|
|08 AM|12726|     3.61|   14.51|
|08 PM|17005|     4.13|   16.35|
|09 AM|14122|     4.12|   15.37|
|09 PM|15397|     4.33|   16.78|
|10 AM|13690|     4.24|   16.14|
|10 PM|14367|     3.98|    15.9|
|11 AM|14875|     4.43|   17.09|
|11 PM|12472|      4.0|   15.34|
|12 AM|10231|     3.73|   14.58|
|12 PM|15791|     4.23|   16.61|
+-----+-----+---------+--------+

CPU times: user 174 ms, sys: 165 ms, total: 338 ms
Wall time: 23.5 s


## 3 For each of the 24 hours of the day, which are the (up to) 5 most popular routes (pairs pickup/dropoff regions) according to the the total number of taxi trips? Also reportand the average fare (total trip cost).

### Exercise version 3A - pre-processing dataset during Spark-RDD map transformation

In [9]:
%%time

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('dataframes_exercise3a').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_small.csv')
    logRows = lines.filter(lambda line : len(line) > 0 ) \
        .map(lambda line : line.strip()) \
        .map(lambda line : line.split(';')) \
        .filter(lambda arr: arr[2] != '' and arr[5] != '' and arr[5]!='0' and arr[6] != '' and arr[7] != '' ) \
        .map(lambda arr: arr[:10]+list([float(arr[10].replace(',',''))])+arr[11:] if (arr[10]!='') else (arr[:10]+list([0])+arr[11:]))\
        .map(lambda arr: arr[:11]+list([float(arr[11].replace(',',''))])+arr[12:] if (arr[11]!='') else (arr[:11]+list([0])+arr[12:]))\
        .map(lambda arr: arr[:12]+list([float(arr[12].replace(',',''))])+arr[13:] if (arr[12]!='') else (arr[:12]+list([0])+arr[13:]))\
        .map(lambda arr: arr[:13]+list([float(arr[13].replace(',',''))])+arr[14:] if (arr[13]!='') else (arr[:13]+list([0])+arr[14:]))\
        .map(lambda arr: arr[:14]+list([float(arr[14].replace(',',''))])+arr[15:] if (arr[14]!='') else (arr[:14]+list([0])+arr[15:]))\
        .filter(lambda arr: arr[10]!=0 or arr[11]!=0 or arr[12]!=0 or arr[13]!=0 or arr[14]!=0) \
        .map(lambda arr: arr if (arr[10]+ arr[11]+ arr[12]+arr[13]==arr[14] )\
                             else (arr[:14]+[arr[10]+ arr[11]+ arr[12]+arr[13]]+arr[15:]))\
        .map( lambda arr : Row(hour_location = (arr[2][11:][:2] + " " + arr[2][11:][9:] + "_" + arr[6] + "_" + arr[7]),
                                miles = float(arr[5].replace(',','')), cost= arr[14]))
    
    logRowsDF = spark.createDataFrame( logRows )

    
    route = logRowsDF.groupBy('hour_location').agg(count('hour_location').alias('#trips'),
        round(avg('miles'),2).alias('avg_miles'),
        round(avg('cost'),2).alias('avg_cost')).orderBy('hour_location','#trips', ascending=False)
    
    
    route = route.select(split(col('hour_location'),'_').getItem(0).alias('hour'),
                         split(col('hour_location'),'_').getItem(1).alias('Pick_up'),
                         split(col('hour_location'),'_').getItem(2).alias('Drop_off'),
                         '#trips','avg_miles','avg_cost')
   

    
    window = Window.partitionBy(route['hour']).orderBy(route['#trips'].desc())
    
    
    top = route.select('*',row_number().over(window).alias('rank')).filter(col('rank')<=5)
    top.orderBy('hour','rank').drop('rank').show(120)
    
    
    
except Exception as err:
    print(err)
    sc.stop()

+-----+-----------+-----------+------+---------+--------+
| hour|    Pick_up|   Drop_off|#trips|avg_miles|avg_cost|
+-----+-----------+-----------+------+---------+--------+
|01 AM|17031081700|17031081700|    63|      0.7|    7.15|
|01 AM|17031081700|17031081800|    56|     0.71|    6.94|
|01 AM|17031081700|17031839100|    35|     0.92|     7.9|
|01 AM|17031081700|17031320100|    34|     0.94|     7.6|
|01 AM|17031081800|17031081800|    32|     0.62|    5.97|
|01 PM|17031839100|17031839100|   316|     0.74|    6.83|
|01 PM|17031320100|17031839100|   219|     0.91|    7.53|
|01 PM|17031839100|17031320100|   215|     0.98|    7.48|
|01 PM|17031081500|17031839100|   144|      1.1|    8.01|
|01 PM|17031081700|17031839100|   136|     0.76|    7.13|
|02 AM|17031081700|17031081800|    53|     0.62|    6.63|
|02 AM|17031081700|17031081700|    44|     0.74|    6.91|
|02 AM|17031081700|17031320100|    35|     0.95|    7.75|
|02 AM|17031081800|17031081800|    27|     1.15|     8.4|
|02 AM|1703108

### Exercise version 3B - pre-processing after DataFrame creation and concatenation

In [10]:
%%time

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('dataframes_exercise3b').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_small.csv')
    logRows = lines.filter(lambda line : len(line) > 0 ) \
        .map(lambda line : line.strip()) \
        .map(lambda line : line.split(';')) \
        .filter(lambda arr: arr[2] != '' and arr[5] != '' and arr[6] != '' and arr[7] != '' ) \
        .map( lambda arr : Row(hour_location = (arr[2][11:][:2] + " " + arr[2][11:][9:] + "_" + arr[6] + "_" + arr[7]),
                                miles = arr[5], cost= arr[14],Fare = arr[10], Tips = arr[11], \
                               Tolls = arr[12], Extras = arr[13] ))
    
    tripRowsDF = spark.createDataFrame( logRows ) 
    
    
    processed=tripRowsDF.where((col('miles')!='')& (col('miles')!='0'))
    processed=processed.withColumn("Miles",regexp_replace(col("miles"), ",", "")).drop(processed.miles)
    processed=processed.withColumn('fare',when(processed.Fare !='',regexp_replace(col("Fare"), ",", "")).otherwise(0))\
    .drop(processed.Fare)\
    .withColumn('tips',when(processed.Tips !='',regexp_replace(col("Tips"), ",", "")).otherwise(0))\
    .drop(processed.Tips)\
    .withColumn('tolls',when(processed.Tolls !='',regexp_replace(col("Tolls"), ",", "")).otherwise(0))\
    .drop(processed.Tolls)\
    .withColumn('extras',when(processed.Extras !='',regexp_replace(col("Extras"), ",", "")).otherwise(0))\
    .drop(processed.Extras)\
    .withColumn('t_cost',when(processed.cost !='',regexp_replace(col("cost"), ",", "")).otherwise(0))\
    .drop(processed.cost)
    
   
    
    
    processed=processed.where((col('fare')!=0) | (col('tips')!=0) | (col('tolls')!=0) | (col('extras')!=0) | (col('t_cost')!=0))
    
    
    processed=processed.withColumn('total_cost',when(processed.fare+processed.tips+processed.tolls+processed.extras \
                                  !=processed.t_cost,processed.fare+processed.tips+processed.tolls+processed.extras)\
                                   .otherwise(processed.t_cost))\
    .drop(processed.t_cost)
    
    
    
    route = processed.groupBy('hour_location').agg(count('hour_location').alias('#trips'),
        round(avg('miles'),2).alias('avg_miles'),
        round(avg('total_cost'),2).alias('avg_cost')).orderBy('hour_location','#trips', ascending=False)
    
    
    route = route.select(split(col('hour_location'),'_').getItem(0).alias('hour'),
                         split(col('hour_location'),'_').getItem(1).alias('Pick_up'),
                         split(col('hour_location'),'_').getItem(2).alias('Drop_off'),
                         '#trips','avg_miles','avg_cost')
   
    route=route.where((col('hour')!='')& (col('Pick_up')!='0') & (col('Drop_off')!='0'))

    
    window = Window.partitionBy(route['hour']).orderBy(route['#trips'].desc())
    
    
    top = route.select('*',row_number().over(window).alias('rank')).filter(col('rank')<=5)
    top.orderBy('hour','rank').drop('rank').show(120)
    
    
    
except Exception as err:
    print(err)
    sc.stop()
    

+-----+-----------+-----------+------+---------+--------+
| hour|    Pick_up|   Drop_off|#trips|avg_miles|avg_cost|
+-----+-----------+-----------+------+---------+--------+
|01 AM|17031081700|17031081700|    63|      0.7|    7.15|
|01 AM|17031081700|17031081800|    56|     0.71|    6.94|
|01 AM|17031081700|17031839100|    35|     0.92|     7.9|
|01 AM|17031081700|17031320100|    34|     0.94|     7.6|
|01 AM|17031081800|17031081800|    32|     0.62|    5.97|
|01 PM|17031839100|17031839100|   316|     0.74|    6.83|
|01 PM|17031320100|17031839100|   219|     0.91|    7.53|
|01 PM|17031839100|17031320100|   215|     0.98|    7.48|
|01 PM|17031081500|17031839100|   144|      1.1|    8.01|
|01 PM|17031081700|17031839100|   136|     0.76|    7.13|
|02 AM|17031081700|17031081800|    53|     0.62|    6.63|
|02 AM|17031081700|17031081700|    44|     0.74|    6.91|
|02 AM|17031081700|17031320100|    35|     0.95|    7.75|
|02 AM|17031081800|17031081800|    27|     1.15|     8.4|
|02 AM|1703108

### Exercise version 3C - pre-processing after DataFrame creation and no concatenation before aggregation

In [11]:
%%time

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('dataframes_exercise3c').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_small.csv')
    logRows = lines.filter(lambda line : len(line) > 0 ) \
        .map(lambda line : line.strip()) \
        .map(lambda line : line.split(';')) \
        .filter(lambda arr: arr[2] != '' and arr[5] != '' and arr[6] != '' and arr[7] != '' ) \
        .map( lambda arr : Row(hour = (arr[2][11:][:2] + " " + arr[2][11:][9:]),pickup= arr[6], dropoff=arr[7],
                                miles = arr[5], cost= arr[14],Fare = arr[10], Tips = arr[11], \
                               Tolls = arr[12], Extras = arr[13] ))
    
    tripRowsDF = spark.createDataFrame( logRows ) 
    
    
    processed=tripRowsDF.where((col('miles')!='')& (col('miles')!='0') & (col('hour')!='')& (col('pickup')!='')&\
                               (col('dropoff')!=''))
    processed=processed.withColumn("Miles",regexp_replace(col("miles"), ",", "")).drop(processed.miles)
   
    processed=processed.withColumn('fare',when(processed.Fare !='',regexp_replace(col("Fare"), ",", "")).otherwise(0))\
    .drop(processed.Fare)\
    .withColumn('tips',when(processed.Tips !='',regexp_replace(col("Tips"), ",", "")).otherwise(0))\
    .drop(processed.Tips)\
    .withColumn('tolls',when(processed.Tolls !='',regexp_replace(col("Tolls"), ",", "")).otherwise(0))\
    .drop(processed.Tolls)\
    .withColumn('extras',when(processed.Extras !='',regexp_replace(col("Extras"), ",", "")).otherwise(0))\
    .drop(processed.Extras)\
    .withColumn('t_cost',when(processed.cost !='',regexp_replace(col("cost"), ",", "")).otherwise(0))\
    .drop(processed.cost)
    
    processed=processed.where((col('fare')!=0) | (col('tips')!=0) | (col('tolls')!=0) | (col('extras')!=0) | (col('t_cost')!=0))
    
    
    processed=processed.withColumn('total_cost',when(processed.fare+processed.tips+processed.tolls+processed.extras \
                                  !=processed.t_cost,processed.fare+processed.tips+processed.tolls+processed.extras)\
                                   .otherwise(processed.t_cost))\
    .drop(processed.t_cost)
    
    
    
    route = processed.groupBy('hour','pickup','dropoff').agg(count('hour').alias('#trips'),
        round(avg('miles'),2).alias('avg_miles'),
        round(avg('total_cost'),2).alias('avg_cost')).orderBy('hour','pickup','dropoff','#trips', ascending=False)
    
    
    
    window = Window.partitionBy(route['hour']).orderBy(route['#trips'].desc())
    
    
    top = route.select('*',row_number().over(window).alias('rank')).filter(col('rank')<=5)
    top.orderBy('hour','rank').drop('rank').show(120)
    
    
    
except Exception as err:
    print(err)
    sc.stop()
    

+-----+-----------+-----------+------+---------+--------+
| hour|     pickup|    dropoff|#trips|avg_miles|avg_cost|
+-----+-----------+-----------+------+---------+--------+
|01 AM|17031081700|17031081700|    63|      0.7|    7.15|
|01 AM|17031081700|17031081800|    56|     0.71|    6.94|
|01 AM|17031081700|17031839100|    35|     0.92|     7.9|
|01 AM|17031081700|17031320100|    34|     0.94|     7.6|
|01 AM|17031081800|17031081800|    32|     0.62|    5.97|
|01 PM|17031839100|17031839100|   316|     0.74|    6.83|
|01 PM|17031320100|17031839100|   219|     0.91|    7.53|
|01 PM|17031839100|17031320100|   215|     0.98|    7.48|
|01 PM|17031081500|17031839100|   144|      1.1|    8.01|
|01 PM|17031081700|17031839100|   136|     0.76|    7.13|
|02 AM|17031081700|17031081800|    53|     0.62|    6.63|
|02 AM|17031081700|17031081700|    44|     0.74|    6.91|
|02 AM|17031081700|17031320100|    35|     0.95|    7.75|
|02 AM|17031081800|17031081800|    27|     1.15|     8.4|
|02 AM|1703108

# 4 Caculate average and maximum tip for each year.


In [12]:
%%time

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('dataframes_exercise4').getOrCreate()
sc = spark.sparkContext

try :
    lines = sc.textFile('Taxi_small.csv')
    logRows = lines.filter( lambda line : len(line) > 0 )   \
                        .map( lambda line : line.split(';') ) \
                        .filter(lambda arr: arr[2] != ''and arr[11] != '') \
                        .map( lambda arr : Row(Trip_ID = arr[0], Taxi_ID = arr[1], Year = arr[2].split(" ")[0].split("/")[2], \
                                               Trip_End = arr[3], Trip_Seconds = arr[4], Trip_Miles = arr[5],\
                                               Pickup_ID = arr[6], Dropoff_ID = arr[7], Pickup_area =  arr[8], \
                                               Dropoff_area = arr[9], Fare = arr[10], Tips = float(arr[11].replace(' , ', '')), Tolls = arr[12], \
                                               Extras = arr[13], Trip_Total= arr[14],Payment_type = arr[15] , \
                                               Company = arr[16], Pickup_Centroid_Lat  = arr[17], \
                                               Pickup_Centroid_Lon = arr[18], Pickup_Centroid_Loc = arr[19], \
                                               Dropoff_Centroid_Lat = arr[20], Dropoff_Centroid_Lon = arr[21], \
                                               Dropoff_Centroid_Lo = arr[22] ))
    
    logRowsDF = spark.createDataFrame( logRows )
    
    
    tip = logRowsDF.groupBy("Year").agg(round(avg('Tips'),2).alias('Avg_tips'),
        round(max("Tips"),2).alias('Biggest_tip'))
    
    
    tip.orderBy("Year").show()
    
    
    
    
except Exception as err:
    print(err)
    sc.stop()


+----+--------+-----------+
|Year|Avg_tips|Biggest_tip|
+----+--------+-----------+
|2013|    0.96|       99.0|
|2014|    1.12|      120.0|
|2015|    1.37|      100.0|
|2016|    1.49|       60.0|
|2017|    1.55|      261.0|
|2018|    1.72|       90.0|
|2019|    1.86|       59.0|
|2020|     1.5|       30.0|
+----+--------+-----------+

CPU times: user 86.4 ms, sys: 20 ms, total: 106 ms
Wall time: 10.4 s
