# Big Data Processing Systems — 2020/21
## Project nº 2

Consider you want to process information about taxi rides in some city to better understand the
behavior of the community and help taxi drivers maximize their profit.
To this end, you have a data set with information about taxi drivers, including information for pickup
and drop-off location.

### PySpark SQL Exercises

Loading Data:

In [114]:
from pyspark.sql import *
from pyspark.sql.types import *

spark = SparkSession.builder.master('local[*]') \
    .appName('Exercice').getOrCreate()


fields = [
    StructField('TripID', StringType(), True),
    StructField('TaxiID', StringType(), True),
    StructField('TripStartTimestamp', TimestampType(), True),
    StructField('TripEndTimestamp', TimestampType(), True),
    StructField('TripSeconds', DoubleType(), True),
    StructField('TripMiles', DoubleType(), True),
    StructField('PickupRegionID', StringType(), True),
    StructField('DropoffRegionID', StringType(), True),
    StructField('PickupCommunity', DoubleType(), True),
    StructField('DropoffCommunity', DoubleType(), True),
    StructField('Fare', DoubleType(), True),
    StructField('Tips', DoubleType(), True),
    StructField('Tolls', DoubleType(), True),
    StructField('Extras', DoubleType(), True),
    StructField('TripTotal', DoubleType(), True),
    StructField('PaymentType', StringType(), True),
    StructField('Company', StringType(), True),
    StructField('PickupCentroidLatitude', DoubleType(), True),
    StructField('PickupCentroidLongitude', DoubleType(), True),
    StructField('PickupCentroidLocation', StringType(), True),
    StructField('DropoffCentroidLatitude', DoubleType(), True),
    StructField('DropoffCentroidLongitude', DoubleType(), True),
    StructField('DropoffCentroidLocation', StringType(), True)]

# https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
df = spark.read.csv('Taxi_Trips_151MB.csv', sep=';', schema=StructType(fields), timestampFormat="M/d/y h:m:s a")
df.createOrReplaceTempView("taxi_trips")

#### 1. (2 points) To have a rough understanding of the fluctuation in the demand for taxis per month
of the year, create and index reporting
What is the accumulated number of taxi trips per month?
Output is expected to have two columns: (month_number, #total_trips). In the example below
the values are not real and you may get different ones.

In [115]:
%%time

spark.sql('select lpad(month(TripStartTimestamp), 2, "0") as month_number,'
          '     count(*) as `#total_trips` '
          'from taxi_trips '
          'group by month(TripStartTimestamp)') \
    .cache().show(truncate=False)

+------------+------------+
|month_number|#total_trips|
+------------+------------+
|12          |29252       |
|01          |30357       |
|06          |35016       |
|03          |35260       |
|05          |34979       |
|09          |31466       |
|04          |32884       |
|08          |32747       |
|07          |32141       |
|10          |33618       |
|11          |30017       |
|02          |31013       |
+------------+------------+

CPU times: user 3.26 ms, sys: 6.51 ms, total: 9.77 ms
Wall time: 3.33 s


#### 2. (3 points) To have a rough understanding of the taxi service, create and index reporting
For each pickup region, report the list of unique dropoff regions?
Output is expected to have two columns: (pickup_region_ID, list_of_dropoff_region_ID). In the
example below the values are not real and you should get different ones (the output is nicely
wrapped in this text to facilitate reading).

In [116]:
%%time

spark.sql('select PickupRegionID as pickup_region_ID,'
          'collect_list(DropoffRegionID) as list_of_dropoff_region_ID '
          'from (select distinct PickupRegionID,DropoffRegionID '
          '      from taxi_trips'
          '      where PickupRegionID is not null and DropoffRegionID is not null) '
          'group by PickupRegionID') \
    .cache().show(truncate=False)

+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### 3. (4 points) To have a rough understanding of the total price of each taxi ride 
(based in the pickup and drop off regions), create and index reporting
What is the expected charge/cost of a taxi ride, given the pickup region ID, the weekday
(0=Monday, 6=Sunday) and time in format “hour AM/PM”?
Output is expected to have two columns: (month_number, avg_total_trip_cost). In the example
below the values are not real and you should get different ones.

In [117]:
%%time

# define an UDF as the date_format function doesn't allow to extract the day of the week as a numeric value
def day_off_week(d):
    return d.weekday()


# registering the UDF so it can be used inside the SQL statements
spark.udf.register("day_off_week", day_off_week)



spark.sql("select concat(PickupRegionID,'_',day_off_week(tripstarttimestamp),date_format(tripstarttimestamp, '_hh_a')) "
          "as pickup, round(avg (TripTotal),2) as avg_total_trip_cost "
          "from taxi_trips "
          "where PickupRegionID is not null and PickupRegionID <> '' "
          "group by PickupRegionID,tripstarttimestamp") \
    .cache().show(truncate=False)


+-------------------+-------------------+
|pickup             |avg_total_trip_cost|
+-------------------+-------------------+
|17031081600_3_12_PM|5.85               |
|17031071500_5_12_PM|3.25               |
|17031050200_5_08_PM|5.65               |
|17031081500_5_12_AM|12.45              |
|17031222200_4_10_PM|5.25               |
|17031080202_3_09_AM|7.45               |
|17031842300_1_08_AM|43.8               |
|17031081000_5_03_PM|7.05               |
|17031080202_3_11_PM|10.44              |
|17031081500_5_10_AM|10.45              |
|17031838100_3_10_PM|7.05               |
|17031320100_5_10_AM|8.25               |
|17031320100_3_03_PM|5.85               |
|17031062800_5_09_PM|8.05               |
|17031081500_5_12_AM|13.05              |
|17031061902_6_10_AM|6.45               |
|17031320100_4_05_PM|6.45               |
|17031320100_2_09_AM|39.25              |
|17031081300_2_05_PM|5.05               |
|17031081201_2_10_AM|10.94              |
+-------------------+-------------

In [118]:
%%time

# variant without UDF just for measuring processing time

spark.sql("select concat(PickupRegionID,'_',date_format(tripstarttimestamp, '_hh_a')) "
          "as pickup, round(avg (TripTotal),2) as avg_total_trip_cost "
          "from taxi_trips "
          "where PickupRegionID is not null and PickupRegionID <> '' "
          "group by PickupRegionID,tripstarttimestamp") \
    .cache().show(truncate=False)

+------------------+-------------------+
|pickup            |avg_total_trip_cost|
+------------------+-------------------+
|17031081600__12_PM|5.85               |
|17031071500__12_PM|3.25               |
|17031050200__08_PM|5.65               |
|17031081500__12_AM|12.45              |
|17031222200__10_PM|5.25               |
|17031080202__09_AM|7.45               |
|17031842300__08_AM|43.8               |
|17031081000__03_PM|7.05               |
|17031080202__11_PM|10.44              |
|17031081500__10_AM|10.45              |
|17031838100__10_PM|7.05               |
|17031320100__10_AM|8.25               |
|17031320100__03_PM|5.85               |
|17031062800__09_PM|8.05               |
|17031081500__12_AM|13.05              |
|17031061902__10_AM|6.45               |
|17031320100__05_PM|6.45               |
|17031320100__09_AM|39.25              |
|17031081300__05_PM|5.05               |
|17031081201__10_AM|10.94              |
+------------------+-------------------+
only showing top

#### 4. (1 to 3 points) An optional 4th index (different from the one proposed for map-reduce) that will
answer a non-trivial and interesting question over the given data set.

top 3 companies driving from the most popular locations (more than 5000 trips from there)

In [119]:
%%time

# top 3 companies driving from the most popular locations (more than 5000 trips from there)
spark.sql('with trips_per_company as ( '
          '    select pickupregionid,company, count(*) ntrips '
          '    from taxi_trips '
          '    where company is not null and company <> "" '
          '       and pickupregionid is not null and pickupregionid <> "" '
          '    group by pickupregionid,company), '
          'ranked_trips_per_company as ( '
          '    select pickupregionid,company, ntrips, '
          '           row_number() over (partition by pickupregionid order by ntrips desc) r, '
          '           sum(ntrips) over (partition by pickupregionid) total_sum '
          '    from trips_per_company), '
          'filtered_data as ( '
          '    select pickupregionid,company, total_sum,ntrips,r '
          '    from ranked_trips_per_company '
          '    where r <= 3 and total_sum >= 5000 '
          '    order by total_sum desc, pickupregionid asc, r asc) '
          'select pickupregionid,company,ntrips '
          'from filtered_data') \
    .cache().show(truncate=False)

+--------------+---------------------------------+------+
|pickupregionid|company                          |ntrips|
+--------------+---------------------------------+------+
|17031839100   |Taxi Affiliation Services        |8485  |
|17031839100   |Flash Cab                        |3485  |
|17031839100   |Yellow Cab                       |2308  |
|17031320100   |Taxi Affiliation Services        |4567  |
|17031320100   |Flash Cab                        |1768  |
|17031320100   |Blue Ribbon Taxi Association Inc.|1396  |
|17031980000   |Taxi Affiliation Services        |2955  |
|17031980000   |Flash Cab                        |1404  |
|17031980000   |Dispatch Taxi Affiliation        |950   |
|17031081500   |Taxi Affiliation Services        |3452  |
|17031081500   |Flash Cab                        |1201  |
|17031081500   |Choice Taxi Association          |1002  |
|17031081700   |Taxi Affiliation Services        |3110  |
|17031081700   |Flash Cab                        |1476  |
|17031081700  

In [120]:
spark.stop()