## TLC Trip Record Data
The yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data used in the attached datasets were collected and provided to the NYC Taxi and Limousine Commission (TLC) by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs (TPEP/LPEP). The trip data was not created by the TLC, and TLC makes no representations as to the accuracy of these data.

The For-Hire Vehicle (“FHV”) trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are generated from the FHV Trip Record submissions made by bases. Note: The TLC publishes base trip record data as submitted by the bases, and we cannot guarantee or confirm their accuracy or completeness. Therefore, this may not represent the total amount of trips dispatched by all TLC-licensed bases. The TLC performs routine reviews of the records and takes enforcement actions when necessary to ensure, to the extent possibvle, complete and accurate information.

## Data source
https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

## Method
- Using the pipline , import CSV files using pyarrow: 
- PySpark SQL to merge files, transform data, and analyse the Data.

## Download csv files and transform them to paquet files
Using the pipeline explained in the Readme file. you can download csv files and transform them to paquet files.

## What we do here
Here first initialize the spark. Please install pyspark first. Then we
1. Read all parquet files for green and yellow taxi and check schema
2. Create two extra columns for *'hourOfDay'* and *'dayofweek'*
3. Merge all files in parquet and avra format
4. Apply some queries to check the data

Then we merge all parquet files, transform files to avra, transform data, and analyse the Data
For the initial run, we would like to have a new file.  After the runing the code, we refuse to replace the files. 

## Starting the spark to calculate some observable and query the data

In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
     .builder\
     .master("local[*]")\
     .config("spark.executor.memory", "8g")\
     .config("spark.driver.memory", "8g")\
     .config("spark.memory.offHeap.enabled",True)\
     .config("spark.memory.offHeap.size","8g")   \
     .appName("taxi-Data")\
     .getOrCreate()
print(spark.sparkContext.getConf().getAll())
print(spark)

22/04/11 11:19:11 WARN Utils: Your hostname, LAPTOP-5GKK0U9L resolves to a loopback address: 127.0.1.1; using 172.29.201.166 instead (on interface eth0)
22/04/11 11:19:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/11 11:19:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/04/11 11:19:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


[('spark.app.startTime', '1649668762646'), ('spark.memory.offHeap.size', '8g'), ('spark.sql.warehouse.dir', 'file:/home/alireza/taxi-statistic/spark-warehouse'), ('spark.driver.host', '172.29.201.166'), ('spark.executor.id', 'driver'), ('spark.driver.port', '39441'), ('spark.app.name', 'taxi-Data'), ('spark.rdd.compress', 'True'), ('spark.driver.memory', '8g'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.memory', '8g'), ('spark.submit.pyFiles', ''), ('spark.submit.deployMode', 'client'), ('spark.memory.offHeap.enabled', 'True'), ('spark.app.id', 'local-1649668765436'), ('spark.ui.showConsoleProgress', 'true')]
<pyspark.sql.session.SparkSession object at 0x7fc33ce4a100>


### 1. Read all parquet files for green and yellow taxi and check schema
First we read all files in the *'file_path'* directory created by pipeline and check their schema. 

In [2]:
#Read files for green taxi
file_path="./data/green_par/*"
parqDF_green=spark.read.parquet(file_path)

#print schema and show first five columns
print('Green taxi information:')
parqDF_green.printSchema()
parqDF_green.show(n=5)

#----------------------------------------------------------------
#Read files for yellow taxi
file_path="./data/yellow_par/*2019*"
parqDF_yellow=spark.read.parquet(file_path)

#print schema and show first five columns
print('Yellow taxi information:')
parqDF_yellow.printSchema()
parqDF_yellow.show(n=5)

                                                                                

Green taxi information:
root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: long (nullable = true)



                                                                                

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+
|       2| 2018-10-01 02:05:48|  2018-10-01 02:21:49|                 N|         1|         255|          97|              2|         4.37|       16.0|  0.5|    0.5|       0.0|         0.0|     null|                  0.3|     

### 2. Create two extra columns for *'hourOfDay'* and *'dayofweek'*
create to extra columns and add them to the dataframe.

In [3]:
#create two extra columns for green taxi. and remove 'ehail_fee' for type problem. 
parqDF_green=parqDF_green.drop('ehail_fee')
from pyspark.sql.functions import hour,dayofweek
parqDF_green=parqDF_green.withColumn("hourOfDay", hour('lpep_pickup_datetime')).withColumn( "dayofweek",dayofweek('lpep_pickup_datetime'))


#create two extra columns for yellow taxi
parqDF_yellow=parqDF_yellow.withColumn("hourOfDay", hour('tpep_pickup_datetime')).withColumn( "dayofweek",dayofweek('tpep_pickup_datetime'))


### 3. Merge all files in parquet and avra format
Now we can merge them into parquet and avra format. 

Spark has no intrinsic library to use avra. You should add an external library to spark. 

To load the library, before starting the spark, type the following command in the terminal

```terminal
spark-submit --packages org.apache.spark:spark-avro_2.12:3.2.1 
```


In [4]:
import pyspark.sql.avro

#Write parquet for green taxi
file_parquet="./data/green_merged_parquet"
parqDF_green.write.mode("overwrite").parquet(file_parquet) 

#Write avro for green taxi
file_avro="./data/green_merged_avro"
parqDF_green.write.mode("overwrite").save(file_avro)


#------------------------------------
#Write parquet for yellow taxi
file_parquet="./data/yellow_merged_parquet"
parqDF_yellow.write.mode("overwrite").parquet(file_parquet) 

#Write avra for yellow taxi
file_avro="./data/yellow_merged_avro"
parqDF_yellow.write.mode("overwrite").save(file_avro)

### 4. Apply some queries to check the data
At the moment, our download data contains 36 month of data between 2018-08 and 2021-07. The data set did not updated ofter that time. 

First we define two different queries *'ParquetTable_green'* and *'ParquetTable_yellow'* for the tamplate of SQL.
It containes the follwoing columns:
- *lpep_dropoff_datetime* : drop off date time
- *lpep_pickup_datetime*: pick up date time
- *trip_distance*: trip distance
- *dayofweek*: day of week
- *hourOfDay*: number of passangers
- *taxi*: kind of taxi *'g'* for green taxi and *'y'* for yellow taxi

In [5]:
parqDF_green.createOrReplaceTempView("ParquetTable_green")
parqDF_yellow.createOrReplaceTempView("ParquetTable_yellow")

spark.sql('''    SELECT  lpep_dropoff_datetime,
                         lpep_pickup_datetime,
                         (bigint(lpep_dropoff_datetime)) - (bigint(lpep_pickup_datetime)) AS rideTimesec, 
                         trip_distance,
                         dayofweek, 
                         hourOfDay,
                         passenger_count, 
                         'g' AS taxi
                 FROM ParquetTable_green
                    '''
          ).show()
spark.sql('''    SELECT tpep_dropoff_datetime  AS lpep_dropoff_datetime,
                       tpep_pickup_datetime  AS lpep_pickup_datetime,
                       (bigint(tpep_dropoff_datetime)) - (bigint(tpep_pickup_datetime)) AS rideTimesec, 
                       trip_distance,
                       dayofweek, 
                       hourOfDay,
                       passenger_count, 
                      'y' AS taxi
                FROM ParquetTable_yellow'''
          ).show()

+---------------------+--------------------+-----------+-------------+---------+---------+---------------+----+
|lpep_dropoff_datetime|lpep_pickup_datetime|rideTimesec|trip_distance|dayofweek|hourOfDay|passenger_count|taxi|
+---------------------+--------------------+-----------+-------------+---------+---------+---------------+----+
|  2018-10-01 02:21:49| 2018-10-01 02:05:48|        961|         4.37|        2|        2|              2|   g|
|  2018-10-01 02:31:50| 2018-10-01 02:24:19|        451|         1.45|        2|        2|              2|   g|
|  2018-10-01 02:21:15| 2018-10-01 02:12:06|        549|         2.04|        2|        2|              1|   g|
|  2018-10-01 02:39:23| 2018-10-01 02:34:42|        281|         0.91|        2|        2|              1|   g|
|  2018-10-01 03:01:28| 2018-10-01 02:50:21|        667|         3.53|        2|        2|              1|   g|
|  2018-10-01 02:24:09| 2018-10-01 02:23:15|         54|         0.19|        2|        2|              

### Query for The average distance driven by yellow and green taxis per hour
The average distance driven by green taxi is 27.79 km/h and by yellow taxi is 9.98 km/h.

In [6]:
query1='''WITH time_distance AS (
                SELECT (bigint(to_timestamp(lpep_dropoff_datetime))) - (bigint(to_timestamp(lpep_pickup_datetime))) AS rideTimesec, 
                       trip_distance,
                       'g' AS taxi
                FROM ParquetTable_green
                UNION ALL 
                SELECT (bigint(to_timestamp(tpep_dropoff_datetime))) - (bigint(to_timestamp(tpep_pickup_datetime))) AS rideTimesec, 
                       trip_distance,
                      'y' AS taxi
                FROM ParquetTable_yellow
                    ) 
            SELECT taxi,(sum(trip_distance)/ sum(rideTimesec)*3600) AS average_distancePerHour  
            from  time_distance
            GROUP BY taxi
    '''
spark.sql( query1 ).show()



+----+-----------------------+
|taxi|average_distancePerHour|
+----+-----------------------+
|   g|      27.79075306314514|
|   y|      9.988950865483991|
+----+-----------------------+



                                                                                

### Query for Day of the week in 2019 and 2020 which has the lowest number of single rider trips
Day 1 (Monday) has the lowest number of single rider trips.

In [11]:
query2='''WITH time_distance AS (
                SELECT  lpep_pickup_datetime,
                       dayofweek,
                       passenger_count,
                       'g' AS taxi
                FROM ParquetTable_green
                UNION ALL 
                SELECT  tpep_dropoff_datetime as lpep_pickup_datetime,
                       dayofweek,
                       passenger_count,
                       'y' AS taxi
                FROM ParquetTable_yellow
                    ) 
        SELECT  dayofweek,count(dayofweek) AS count_dayofweek
        FROM time_distance
        WHERE ((lpep_pickup_datetime BETWEEN CAST('2019-01-01 00:00:01' as Timestamp) AND CAST('2021-01-01 00:00:01' as Timestamp)) ) AND passenger_count=1
        GROUP BY dayofweek,passenger_count
        ORDER BY count_dayofweek 
        '''
spark.sql( query2 ).show()




+---------+---------------+---------------+
|dayofweek|count_dayofweek|passenger_count|
+---------+---------------+---------------+
|        1|        7991247|              1|
|        2|        8502436|              1|
|        7|        8867558|              1|
|        3|        9574237|              1|
|        4|        9890865|              1|
|        6|       10007853|              1|
|        5|       10167470|              1|
+---------+---------------+---------------+



                                                                                

### The top 3 of the busiest hours
The top 3 of the busiest hours are 21, 19, and 21.

In [12]:
query3='''WITH time_distance AS (
                SELECT  hourOfDay,
                       'g' AS taxi
                FROM ParquetTable_green
                UNION ALL 
                SELECT hourOfDay,
                      'y' AS taxi
                FROM ParquetTable_yellow
                    ) 
            
    SELECT  hourOfDay,count(hourOfDay) AS count_hour
    FROM time_distance
    GROUP BY hourOfDay
    ORDER BY count_hour DESC
    LIMIT 3
    
    ;'''
spark.sql( query3 ).show()




+---------+----------+
|hourOfDay|count_hour|
+---------+----------+
|       20|   6139115|
|       19|   6036296|
|       21|   5661044|
+---------+----------+



                                                                                

## Bonus
**Q)** Your data scientists want to make future predictions based on weather conditions. How would you expand your pipeline to help your colleagues with this task?

**A:** The weather information can be found some other data bases like https://www.visualcrossing.com/weather/weather-data-services which is a payed database.
We can a function in the pipeline to add some extra columns like temprature and wheather condition ('sunny', 'rainy', ...) based on the above database.

**Q)** Another colleague approaches to you. He is an Excel guru and makes all kind of stuff using this tool forever. So he needs all the available taxi trip records in the XLSX format. Can you re-use your current pipeline? How does this output compares to your existing formats? Do you have performance concerns?

**A)** I recommend him to use https://www.cdata.com/drivers/parquet/excel/#:~:text=The%20Parquet%20Excel%20Add%2DIn,based%20data%20analysis%2C%20and%20more! . This Excel Add-In for Parquet Read, Write, and Update Parquet from Excel. The Parquet Excel Add-In is a powerful tool that allows you to connect with live Parquet data, directly from Microsoft Excel. 
For huge data set XLSX format need huge amound of hard drive. Excel efficency for huge number of columns and rows are so low. 
