In [8]:
df = spark.read.parquet("/home/mutwiri/hadoop/spark/data/yellow_tripdata_2021-01.parquet")

In [9]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [10]:
df.head(2)

[Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 30, 10), tpep_dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 36, 12), passenger_count=1.0, trip_distance=2.1, RatecodeID=1.0, store_and_fwd_flag='N', PULocationID=142, DOLocationID=43, payment_type=2, fare_amount=8.0, extra=3.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=11.8, congestion_surcharge=2.5, airport_fee=None),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2021, 1, 1, 0, 51, 20), tpep_dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 52, 19), passenger_count=1.0, trip_distance=0.2, RatecodeID=1.0, store_and_fwd_flag='N', PULocationID=238, DOLocationID=151, payment_type=2, fare_amount=3.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=4.3, congestion_surcharge=0.0, airport_fee=None)]

In [11]:
taxi = df.select("VendorID", "passenger_count", "trip_distance", "tip_amount", "total_amount" ).show()

+--------+---------------+-------------+----------+------------+
|VendorID|passenger_count|trip_distance|tip_amount|total_amount|
+--------+---------------+-------------+----------+------------+
|       1|            1.0|          2.1|       0.0|        11.8|
|       1|            1.0|          0.2|       0.0|         4.3|
|       1|            1.0|         14.7|      8.65|       51.95|
|       1|            0.0|         10.6|      6.05|       36.35|
|       2|            1.0|         4.94|      4.06|       24.36|
|       1|            1.0|          1.6|      2.35|       14.15|
|       1|            1.0|          4.1|       0.0|        17.3|
|       1|            1.0|          5.7|       0.0|        21.8|
|       1|            1.0|          9.1|       0.0|        28.8|
|       1|            2.0|          2.7|      3.15|       18.95|
|       2|            3.0|         6.11|       0.0|        24.3|
|       2|            2.0|         1.21|      2.49|       10.79|
|       1|            2.0

In [16]:
df.select("VendorID").distinct().count()

3

In [28]:
best_paid_vendor_1_trips = df.filter(df["VendorID"]==1).\
                        filter(df["passenger_count"]> 0).\
                        orderBy(df["total_amount"].desc()).\
                        select("VendorID", "passenger_count", "trip_distance", "tip_amount", "total_amount" ). \
                        limit(5)


In [29]:
best_paid_vendor_1_trips.show()

+--------+---------------+-------------+----------+------------+
|VendorID|passenger_count|trip_distance|tip_amount|total_amount|
+--------+---------------+-------------+----------+------------+
|       1|            1.0|        427.7|   1140.44|      2292.4|
|       1|            1.0|        267.7|     369.4|      1108.2|
|       1|            1.0|          0.0|      0.05|      900.35|
|       1|            1.0|        260.5|    149.03|       894.2|
|       1|            1.0|        271.4|       0.0|      872.54|
+--------+---------------+-------------+----------+------------+



### Using UDF

In [55]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType

In [56]:
taxi2 = df.select("VendorID", "passenger_count", "trip_distance", "tip_amount", "total_amount", "store_and_fwd_flag" ).show()

+--------+---------------+-------------+----------+------------+------------------+
|VendorID|passenger_count|trip_distance|tip_amount|total_amount|store_and_fwd_flag|
+--------+---------------+-------------+----------+------------+------------------+
|       1|            1.0|          2.1|       0.0|        11.8|                 N|
|       1|            1.0|          0.2|       0.0|         4.3|                 N|
|       1|            1.0|         14.7|      8.65|       51.95|                 N|
|       1|            0.0|         10.6|      6.05|       36.35|                 N|
|       2|            1.0|         4.94|      4.06|       24.36|                 N|
|       1|            1.0|          1.6|      2.35|       14.15|                 N|
|       1|            1.0|          4.1|       0.0|        17.3|                 N|
|       1|            1.0|          5.7|       0.0|        21.8|                 N|
|       1|            1.0|          9.1|       0.0|        28.8|            

In [57]:
lower = udf(lambda word: word.lower())

24/01/05 11:37:07 WARN SimpleFunctionRegistry: The function tip_percentage replaced a previously registered function.


<function __main__.tip_percentage(tip, total)>

In [72]:
def tip_percentage(tip, total):
    return total/tip

spark.udf.register("tip_percentage", tip_percentage, FloatType())

taxi_df_with_udf = df.select("VendorID", "passenger_count", "trip_distance", "tip_amount", "total_amount",  \
                                lower("store_and_fwd_flag").alias("lower_flag"), \
                                 tip_percentage(col("tip_amount"), col("total_amount") ).alias("new"))

24/01/05 11:53:27 WARN SimpleFunctionRegistry: The function tip_percentage replaced a previously registered function.


In [73]:
taxi_df_with_udf.show()

+--------+---------------+-------------+----------+------------+----------+------------------+
|VendorID|passenger_count|trip_distance|tip_amount|total_amount|lower_flag|               new|
+--------+---------------+-------------+----------+------------+----------+------------------+
|       1|            1.0|          2.1|       0.0|        11.8|         n|              NULL|
|       1|            1.0|          0.2|       0.0|         4.3|         n|              NULL|
|       1|            1.0|         14.7|      8.65|       51.95|         n| 6.005780346820809|
|       1|            0.0|         10.6|      6.05|       36.35|         n| 6.008264462809918|
|       2|            1.0|         4.94|      4.06|       24.36|         n|               6.0|
|       1|            1.0|          1.6|      2.35|       14.15|         n|6.0212765957446805|
|       1|            1.0|          4.1|       0.0|        17.3|         n|              NULL|
|       1|            1.0|          5.7|       0.0