In [24]:
import os

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

In [2]:
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

In [3]:
spark = (
    SparkSession
    .builder
    .appName('Yello Taxi')
    .master('local[*]')
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/06 19:24:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

## Q1. Read the data for January. How many columns are there?

In [5]:
df = spark.read.parquet('data/yellow_tripdata_2023-01.parquet')

                                                                                

In [6]:
len(df.columns)

19

## Answer: 19

## Q2. What's the standard deviation of the trips duration in January?

In [7]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [8]:
df.select('tpep_pickup_datetime', 'tpep_dropoff_datetime').show()

+--------------------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|
+--------------------+---------------------+
| 2023-01-01 00:32:10|  2023-01-01 00:40:36|
| 2023-01-01 00:55:08|  2023-01-01 01:01:27|
| 2023-01-01 00:25:04|  2023-01-01 00:37:49|
| 2023-01-01 00:03:48|  2023-01-01 00:13:25|
| 2023-01-01 00:10:29|  2023-01-01 00:21:19|
| 2023-01-01 00:50:34|  2023-01-01 01:02:52|
| 2023-01-01 00:09:22|  2023-01-01 00:19:49|
| 2023-01-01 00:27:12|  2023-01-01 00:49:56|
| 2023-01-01 00:21:44|  2023-01-01 00:36:40|
| 2023-01-01 00:39:42|  2023-01-01 00:50:36|
| 2023-01-01 00:53:01|  2023-01-01 01:01:45|
| 2023-01-01 00:43:37|  2023-01-01 01:17:18|
| 2023-01-01 00:34:44|  2023-01-01 01:04:25|
| 2023-01-01 00:09:29|  2023-01-01 00:29:23|
| 2023-01-01 00:33:53|  2023-01-01 00:49:15|
| 2023-01-01 00:13:04|  2023-01-01 00:22:10|
| 2023-01-01 00:45:11|  2023-01-01 01:07:39|
| 2023-01-01 00:04:33|  2023-01-01 00:19:22|
| 2023-01-01 00:03:36|  2023-01-01 00:09:36|
| 2023-01-

In [9]:
df_with_duration = df.select(
    *df.columns,
    ((
        F.col('tpep_dropoff_datetime') - F.col('tpep_pickup_datetime')
    ).cast('long') / 60).alias('duration')
)

In [10]:
df_with_duration.select(F.stddev('duration')).show()



+------------------+
|  stddev(duration)|
+------------------+
|42.594351241955756|
+------------------+



                                                                                

## Answer: 42.59

## Q3. Dropping outliers (keep only the records where the duration was between 1 and 60 minutes (inclusive))

## What fraction of the records left after you dropped the outliers?

In [11]:
df_with_duration.filter((F.col('duration') > 1) & (F.col('duration') <= 60)).count() / df_with_duration.count() * 100

                                                                                

98.11286547457485

## ANSWER: 98%

## Q4. One-hot encoding

In [12]:
class DataPrep(Transformer):
    def __init__(self) -> None:
        super().__init__()

    def _transform(self, dataset: DataFrame) -> DataFrame:
        return (
            dataset
            .select(
                *dataset.columns,
                (
                    (
                        F.col('tpep_dropoff_datetime') - F.col('tpep_pickup_datetime')
                    )
                    .cast('long') / 60
                )
                .alias('duration')
            )
            .filter((F.col('duration') > 1) & (F.col('duration') <= 60))
        )

In [13]:
data_prep = DataPrep()
ohe = OneHotEncoder(inputCols=["PULocationID", "DOLocationID"], outputCols=["PU_OHE", "DO_OHE"])
assembler = VectorAssembler(inputCols=["PU_OHE", "DO_OHE"], outputCol="features")
data_prep_pipeline = Pipeline(stages=[data_prep, ohe, assembler])

In [14]:
data_prep_model = data_prep_pipeline.fit(df)

                                                                                

In [15]:
train_df = data_prep_model.transform(df)

In [16]:
train_df.select("features", "duration").show(truncate=False)

+-------------------------+------------------+
|features                 |duration          |
+-------------------------+------------------+
|(530,[161,406],[1.0,1.0])|8.433333333333334 |
|(530,[43,502],[1.0,1.0]) |6.316666666666666 |
|(530,[48,503],[1.0,1.0]) |12.75             |
|(530,[138,272],[1.0,1.0])|9.616666666666667 |
|(530,[107,344],[1.0,1.0])|10.833333333333334|
|(530,[161,402],[1.0,1.0])|12.3              |
|(530,[239,408],[1.0,1.0])|10.45             |
|(530,[142,465],[1.0,1.0])|22.733333333333334|
|(530,[164,501],[1.0,1.0])|14.933333333333334|
|(530,[141,372],[1.0,1.0])|10.9              |
|(530,[234,333],[1.0,1.0])|8.733333333333333 |
|(530,[79,529],[1.0,1.0]) |33.68333333333333 |
|(530,[164,408],[1.0,1.0])|29.683333333333334|
|(530,[138,298],[1.0,1.0])|19.9              |
|(530,[33,326],[1.0,1.0]) |15.366666666666667|
|(530,[79,451],[1.0,1.0]) |9.1               |
|(530,[90,313],[1.0,1.0]) |22.466666666666665|
|(530,[113,520],[1.0,1.0])|14.816666666666666|
|(530,[237,50

## ANSWER: 530 (close to 515)

## Q5. Training a model

In [17]:
lr = LinearRegression(featuresCol="features", labelCol="duration")
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="duration", metricName="rmse")
train_pipeline = Pipeline(stages=[data_prep_pipeline, lr])

In [26]:
lr_model = train_pipeline.fit(df)

24/06/06 19:27:53 WARN Instrumentation: [4b2fb51f] regParam is zero, which might cause numerical instability and overfitting.
24/06/06 19:28:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/06 19:28:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/06/06 19:28:01 WARN Instrumentation: [4b2fb51f] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [27]:
evaluator.evaluate(lr_model.transform(df))

                                                                                

7.64823034523142

## ANSWER: 7.64

## Q6. Evaluating the model

In [29]:
test_df = spark.read.parquet('data/yellow_tripdata_2023-02.parquet')

In [30]:
evaluator.evaluate(lr_model.transform(test_df))

                                                                                

7.810105882139033

## ANSWER: 7.81

In [31]:
spark.stop()