In [1]:
!hdfs dfs -ls /taxi/raw_parquet/

Found 14 items
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:14 /taxi/raw_parquet/2009
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:18 /taxi/raw_parquet/2010
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:21 /taxi/raw_parquet/2011
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:24 /taxi/raw_parquet/2012
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:27 /taxi/raw_parquet/2013
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:49 /taxi/raw_parquet/2014
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:52 /taxi/raw_parquet/2015
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:54 /taxi/raw_parquet/2016
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:56 /taxi/raw_parquet/2017
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:57 /taxi/raw_parquet/2018
drwxr-xr-x   - cluster supergroup          0 2022-04-26 09:59 /taxi/raw_parquet/2019
drwxr-xr-x   - cluster supergroup          0 2022-

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Convert To Parquet and pick columns") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-04-27 08:16:31,165 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Schema 

As we have seen, the schema changed. Let us build a function which picks the common columns we need for future queries and rename them to the same column name 

Old:
```bash
+-----------+--------------------+---------------------+---------------+------------------+-------------------+------------------+---------+-----------------+-------------------+------------------+------------+------------------+---------+-------+-------+---------+------------------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|          Start_Lon|         Start_Lat|Rate_Code|store_and_forward|            End_Lon|           End_Lat|Payment_Type|          Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|         Total_Amt|
+-----------+--------------------+---------------------+---------------+------------------+-------------------+------------------+---------+-----------------+-------------------+------------------+------------+------------------+---------+-------+-------+---------+------------------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|2.6299999999999999|-73.991956999999999|         40.721567|     null|             null|         -73.993803|40.695922000000003|        CASH|8.9000000000000004|      0.5|   null|      0|        0|9.4000000000000004|
|        VTS| 2009-01-04 03:31:00|  2009-01-04 03:38:00|              3|4.5499999999999998|-73.982101999999998|40.736289999999997|     null|             null|-73.955849999999998|40.768030000000003|      Credit|              12.1|      0.5|   null|      2|        0|              14.6|
+-----------+--------------------+---------------------+---------------+------------------+-------------------+------------------+---------+-----------------+-------------------+------------------+------------+------------------+---------+-------+-------+---------+------------------+
```

New:

```bash
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2017-01-09 11:13:28|  2017-01-09 11:25:45|              1|         3.30|         1|                 N|         263|         161|           1|       12.5|    0|    0.5|         2|           0|                  0.3|        15.3|
|       1| 2017-01-09 11:32:27|  2017-01-09 11:36:01|              1|          .90|         1|                 N|         186|         234|           1|          5|    0|    0.5|      1.45|           0|                  0.3|        7.25|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
```

We pick: 

```
Trip_Pickup_DateTime tpep_pickup_datetime
Trip_Dropoff_DateTime tpep_dropoff_datetime
Passenger_Count passenger_count
Trip_Distance trip_distance
Payment_Type payment_type
Tip_Amt tip_amount
Total_Amt total_amount
```

### Create Folder in HDFS

In [3]:
!hdfs dfs -mkdir /taxi/by_month/

### testing our approach first

In [20]:
year = "2009"
month = "01"

df = spark.read.parquet(f"/taxi/raw_parquet/{year}/{month}.parquet")\
    .withColumnRenamed("Trip_Pickup_DateTime","pickup_datetime")\
    .withColumnRenamed("Trip_Dropoff_DateTime","dropoff_datetime")\
    .withColumnRenamed("tpep_pickup_datetime","pickup_datetime")\
    .withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")\
    .withColumnRenamed("Passenger_Count","passenger_count")\
    .withColumnRenamed("Trip_Distance","trip_distance")\
    .withColumnRenamed("Payment_Type","payment_type")\
    .withColumnRenamed("Tip_Amt","tip_amount")\
    .withColumnRenamed("Total_Amt","total_amount")\
    .selectExpr(\
        "cast(pickup_datetime as timestamp)", \
        "cast(dropoff_datetime as timestamp)", \
        "cast(passenger_count as int)", \
        "cast(trip_distance as double)", \
        "cast(payment_type as string)", \
        "cast(tip_amount as double)", \
        "cast(total_amount as double)" \
    )

In [21]:
df.printSchema()

root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [22]:
df.show(2)

[Stage 9:>                                                          (0 + 1) / 1]

+-------------------+-------------------+---------------+-------------+------------+----------+------------+
|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|payment_type|tip_amount|total_amount|
+-------------------+-------------------+---------------+-------------+------------+----------+------------+
|2009-01-21 02:48:14|2009-01-21 03:05:13|              1|          4.9|      Credit|      3.72|       18.62|
|2009-01-05 16:28:17|2009-01-05 16:33:45|              2|          2.1|        Cash|       0.0|         7.9|
+-------------------+-------------------+---------------+-------------+------------+----------+------------+
only showing top 2 rows



                                                                                

In [23]:
year = "2021"
month = "03"

df = spark.read.parquet(f"/taxi/raw_parquet/{year}/{month}.parquet")\
    .withColumnRenamed("Trip_Pickup_DateTime","pickup_datetime")\
    .withColumnRenamed("Trip_Dropoff_DateTime","dropoff_datetime")\
    .withColumnRenamed("tpep_pickup_datetime","pickup_datetime")\
    .withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")\
    .withColumnRenamed("Passenger_Count","passenger_count")\
    .withColumnRenamed("Trip_Distance","trip_distance")\
    .withColumnRenamed("Payment_Type","payment_type")\
    .withColumnRenamed("Tip_Amt","tip_amount")\
    .withColumnRenamed("Total_Amt","total_amount")\
    .selectExpr(\
        "cast(pickup_datetime as timestamp)", \
        "cast(dropoff_datetime as timestamp)", \
        "cast(passenger_count as int)", \
        "cast(trip_distance as double)", \
        "cast(payment_type as string)", \
        "cast(tip_amount as double)", \
        "cast(total_amount as double)" \
    )

                                                                                

In [24]:
df.printSchema()

root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [25]:
df.show(2)

+-------------------+-------------------+---------------+-------------+------------+----------+------------+
|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|payment_type|tip_amount|total_amount|
+-------------------+-------------------+---------------+-------------+------------+----------+------------+
|2021-03-05 14:47:10|2021-03-05 15:16:13|              1|          5.4|           1|       6.3|        31.6|
|2021-03-05 16:46:03|2021-03-05 16:52:01|              1|         0.97|           1|       0.0|        10.3|
+-------------------+-------------------+---------------+-------------+------------+----------+------------+
only showing top 2 rows



## Read and Write

In [26]:
def read_and_write(year, month):
    df = spark.read.parquet(f"/taxi/raw_parquet/{year}/{month}.parquet")\
    .withColumnRenamed("Trip_Pickup_DateTime","pickup_datetime")\
    .withColumnRenamed("Trip_Dropoff_DateTime","dropoff_datetime")\
    .withColumnRenamed("tpep_pickup_datetime","pickup_datetime")\
    .withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")\
    .withColumnRenamed("Passenger_Count","passenger_count")\
    .withColumnRenamed("Trip_Distance","trip_distance")\
    .withColumnRenamed("Payment_Type","payment_type")\
    .withColumnRenamed("Tip_Amt","tip_amount")\
    .withColumnRenamed("Total_Amt","total_amount")\
    .selectExpr(\
        "cast(pickup_datetime as timestamp)", \
        "cast(dropoff_datetime as timestamp)", \
        "cast(passenger_count as int)", \
        "cast(trip_distance as double)", \
        "cast(payment_type as string)", \
        "cast(tip_amount as double)", \
        "cast(total_amount as double)" \
    )
    
    df.write.parquet(f"/taxi/by_month/{year}/{month}.parquet")

In [27]:
for year in ["2009", "2010", "2011", "2012", "2013", "2014", "2015", "2016", "2017", "2018", "2019", "2020", "2021", "2022"]:
    for month in ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]:
        !echo processing {year}/{month}
        read_and_write(year, month)

processing 2009/01


                                                                                

processing 2009/02


                                                                                

processing 2009/03




processing 2009/04


                                                                                

processing 2009/05




processing 2009/06


                                                                                

processing 2009/07




processing 2009/08


                                                                                

processing 2009/09


                                                                                

processing 2009/10




processing 2009/11




processing 2009/12




processing 2010/01




processing 2010/02




processing 2010/03


                                                                                

processing 2010/04




processing 2010/05


                                                                                

processing 2010/06


                                                                                

processing 2010/07


                                                                                

processing 2010/08




processing 2010/09




processing 2010/10




processing 2010/11


                                                                                

processing 2010/12


                                                                                

processing 2011/01




processing 2011/02


                                                                                

processing 2011/03




processing 2011/04




processing 2011/05




processing 2011/06




processing 2011/07


                                                                                

processing 2011/08




processing 2011/09




processing 2011/10




processing 2011/11


                                                                                

processing 2011/12




processing 2012/01




processing 2012/02




processing 2012/03




processing 2012/04


                                                                                

processing 2012/05




processing 2012/06


                                                                                

processing 2012/07




processing 2012/08




processing 2012/09


                                                                                

processing 2012/10


                                                                                

processing 2012/11


                                                                                

processing 2012/12




processing 2013/01




processing 2013/02


                                                                                

processing 2013/03


                                                                                

processing 2013/04




processing 2013/05




processing 2013/06




processing 2013/07


                                                                                

processing 2013/08




processing 2013/09




processing 2013/10




processing 2013/11




processing 2013/12




processing 2014/01




processing 2014/02




processing 2014/03




processing 2014/04


                                                                                

processing 2014/05




processing 2014/06




processing 2014/07




processing 2014/08




processing 2014/09


                                                                                

processing 2014/10


                                                                                

processing 2014/11




processing 2014/12


                                                                                

processing 2015/01




processing 2015/02




processing 2015/03




processing 2015/04




processing 2015/05


                                                                                

processing 2015/06


                                                                                

processing 2015/07




processing 2015/08




processing 2015/09




processing 2015/10


                                                                                

processing 2015/11


                                                                                

processing 2015/12




processing 2016/01




processing 2016/02


                                                                                

processing 2016/03


                                                                                

processing 2016/04




processing 2016/05




processing 2016/06


                                                                                

processing 2016/07




processing 2016/08




processing 2016/09




processing 2016/10


                                                                                

processing 2016/11




processing 2016/12




processing 2017/01




processing 2017/02




processing 2017/03


                                                                                

processing 2017/04


                                                                                

processing 2017/05


                                                                                

processing 2017/06




processing 2017/07


                                                                                

processing 2017/08




processing 2017/09


                                                                                

processing 2017/10




processing 2017/11




processing 2017/12




processing 2018/01




processing 2018/02




processing 2018/03


                                                                                

processing 2018/04




processing 2018/05


                                                                                

processing 2018/06




processing 2018/07


                                                                                

processing 2018/08




processing 2018/09




processing 2018/10




processing 2018/11




processing 2018/12


                                                                                

processing 2019/01




processing 2019/02




processing 2019/03




processing 2019/04




processing 2019/05




processing 2019/06




processing 2019/07




processing 2019/08




processing 2019/09


                                                                                

processing 2019/10




processing 2019/11


                                                                                

processing 2019/12




processing 2020/01




processing 2020/02




processing 2020/03




processing 2020/04


                                                                                

processing 2020/05
processing 2020/06
processing 2020/07




processing 2020/08




processing 2020/09


                                                                                

processing 2020/10


                                                                                

processing 2020/11




processing 2020/12




processing 2021/01




processing 2021/02




processing 2021/03




processing 2021/04


                                                                                

processing 2021/05




processing 2021/06




processing 2021/07




processing 2021/08




processing 2021/09




processing 2021/10




processing 2021/11




processing 2021/12




processing 2022/01




processing 2022/02




processing 2022/03


                                                                                

AnalysisException: Path does not exist: hdfs://bdlc-test.el.eee.intern:9000/taxi/raw_parquet/2022/03.parquet

## Stopping Spark 

In [28]:
spark.stop()