Use PySpark MLlib to train a simple regression model.
- Load a dataset for regression analysis.
- Preprocess the data by selecting relevant features.
- Use VectorAssembler to create feature vectors.
- Train a regression model using LinearRegression from MLlib.
- Evaluate model performance using RMSE and R-squared values.

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Assessment5').getOrCreate()

In [None]:
data = spark.read.csv(path = '/content/taxi_trip_pricing.csv',header="True",inferSchema="True")

In [None]:
data.show(5)

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+------------------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|        Trip_Price|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+------------------+
|           19.35|    Morning|    Weekday|              3|               Low|  Clear|     3.56|        0.8|           0.32|                53.82|           36.2624|
|           47.59|  Afternoon|    Weekday|              1|              High|  Clear|     NULL|       0.62|           0.43|                40.57|              NULL|
|           36.87|    Evening|    Weekend|              1|              High|  Clear|      2.7|       1.21|           0.15|                37.27|           52.9032|
|         

MLlib (Machine Learning Library) is **PySpark’s scalable ML framework designed for big data machine learning**

 MLlib Has 4 Key Components:

-  Feature Engineering (Transform raw data into machine-readable format)
-  Supervised Learning (Regression & Classification)
-  Unsupervised Learning (Clustering & Recommendation)
-  Model Evaluation & Tuning

**Feature Engineering**
- VectorAssembler : Combines multiple columns into a single feature vector.
- StringIndexer : Converts categorical string values into numerical indices.
- OneHotEncoder : Converts categorical features into binary sparse vectors.
- StandardScaler : Scales features to have unit variance.
- MinMaxScaler : Scales features to a given range (e.g., [0,1]).

**Supervised Learning**
- LinearRegression
- LogisticRegression : Binary classification using logistic regression.
- DecisionTreeClassifier : Tree-based classification model.
- RandomForestClassifier : Ensemble of decision trees for better performance.
- GBTClassifier (Gradient-Boosted Trees) : Gradient-Boosted Trees for classification.



**Unsupervised Learning**
- KMeans : Partitions data into K clusters.
- GaussianMixture : Probabilistic clustering using Gaussian Mixtures.
- LDA (Latent Dirichlet Allocation for topic modeling) : Topic modeling algorithm for text data.

**Model Selection & Tuning**
- CrossValidator :
- TrainValidationSplit
- ParamGridBuilder

In [None]:
data.printSchema()

root
 |-- Trip_Distance_km: double (nullable = true)
 |-- Time_of_Day: string (nullable = true)
 |-- Day_of_Week: string (nullable = true)
 |-- Passenger_Count: integer (nullable = true)
 |-- Traffic_Conditions: string (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Base_Fare: double (nullable = true)
 |-- Per_Km_Rate: double (nullable = true)
 |-- Per_Minute_Rate: double (nullable = true)
 |-- Trip_Duration_Minutes: double (nullable = true)
 |-- Trip_Price: double (nullable = true)



In [None]:
from pyspark.sql.functions import sum,isnull,col
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

null_counts.show()

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|Trip_Price|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+
|              50|         50|         50|             50|                50|     50|       50|         50|             50|                   50|        49|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+



In [18]:
numerical_cols = ['Trip_Distance_km', 'Passenger_Count', 'Base_Fare',
                  'Per_Km_Rate', 'Per_Minute_Rate', 'Trip_Duration_Minutes',
                  'Trip_Price']
categorical_cols = ['Time_of_Day', 'Day_of_Week', 'Traffic_Conditions', 'Weather']

In [19]:
def median_values_for_numerical_col(df, column):
    median_value = df.approxQuantile(column, [0.5], 0.0)[0]  # Approximate median
    return median_value

In [22]:
for col_name in numerical_cols:
    median_value = median_values_for_numerical_col(data, col_name)
    data = data.fillna({col_name: median_value})

In [23]:
from pyspark.sql.functions import sum,isnull,col
null_counts_after_num_col = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

null_counts_after_num_col.show()

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|Trip_Price|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+
|               0|         50|         50|              0|                50|     50|        0|          0|              0|                    0|         0|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+



In [26]:
from pyspark.sql.functions import desc
def compute_mode(df, column):
    return df.groupBy(column).count().orderBy(desc("count")).first()[0]

In [27]:
for col_name in categorical_cols:
    mode_value = compute_mode(data, col_name)
    data = data.fillna({col_name: mode_value})

In [28]:
data.show()

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+------------------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|        Trip_Price|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+------------------+
|           19.35|    Morning|    Weekday|              3|               Low|  Clear|     3.56|        0.8|           0.32|                53.82|           36.2624|
|           47.59|  Afternoon|    Weekday|              1|              High|  Clear|     3.52|       0.62|           0.43|                40.57| 50.07450000000001|
|           36.87|    Evening|    Weekend|              1|              High|  Clear|      2.7|       1.21|           0.15|                37.27|           52.9032|
|         

In [29]:
from pyspark.sql.functions import sum,isnull,col
null_counts_after_allCols = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

null_counts_after_allCols.show()

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|Trip_Price|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+
|               0|          0|          0|              0|                 0|      0|        0|          0|              0|                    0|         0|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+----------+



Now as we have Categorical Columns in the data we'll use StringIndexer and OneHotEncoder

- **StringIndexer** maps categorical values to numerical indices, assigning a unique number to each category based on frequency.

**How does it assign numbers?**
- The most frequent category gets index 0
- The second most frequent gets 1, and so on...

**OneHotEncoder** converts indexed categories into binary vectors, which are more suitable for ML models.

- Uses **SparseVector representation** to efficiently store one-hot encoded data.
- Uses SparseVector format by default *((size, [indices], [values]))*



In [31]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [33]:
indexers = [StringIndexer(inputCol=col, outputCol=col + "_Index").fit(data) for col in categorical_cols]
for indexer in indexers:
    data = indexer.transform(data)


Here input are the columns present in the categorical_col list

- output column will be the same column name with "index" suffix

- **fit(data)** is to train the indexer to assign values based on the frequencies

- **transform(data)** helps to transform the data from converting categorical values into numerical values

In [35]:
data.columns

['Trip_Distance_km',
 'Time_of_Day',
 'Day_of_Week',
 'Passenger_Count',
 'Traffic_Conditions',
 'Weather',
 'Base_Fare',
 'Per_Km_Rate',
 'Per_Minute_Rate',
 'Trip_Duration_Minutes',
 'Trip_Price',
 'Time_of_Day_Index',
 'Day_of_Week_Index',
 'Traffic_Conditions_Index',
 'Weather_Index']

In [36]:
data.show(2)

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+-----------------+-----------------+-----------------+------------------------+-------------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|       Trip_Price|Time_of_Day_Index|Day_of_Week_Index|Traffic_Conditions_Index|Weather_Index|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+-----------------+-----------------+-----------------+------------------------+-------------+
|           19.35|    Morning|    Weekday|              3|               Low|  Clear|     3.56|        0.8|           0.32|                53.82|          36.2624|              1.0|              0.0|                     0.0|          0.0|
|           47.59|  Afternoon|    Weekday|  

In [37]:
# now apply one hot encoding on the string indexer

encoder = OneHotEncoder(inputCols=[col + "_Index" for col in categorical_cols],
                        outputCols=[col + "_OneHot" for col in categorical_cols])
model = encoder.fit(data)
data = model.transform(data)

***Can't we use only StringIndexer to train the model ? ***

- Yes, it can be done but it depends upon the use case like if we are performing ML algos like decision trees, random forest, etc which do not assume any relationship between numerical values and can deal with the indexes directly
but not in algos like Linear regressions etc

In [38]:
data.columns

['Trip_Distance_km',
 'Time_of_Day',
 'Day_of_Week',
 'Passenger_Count',
 'Traffic_Conditions',
 'Weather',
 'Base_Fare',
 'Per_Km_Rate',
 'Per_Minute_Rate',
 'Trip_Duration_Minutes',
 'Trip_Price',
 'Time_of_Day_Index',
 'Day_of_Week_Index',
 'Traffic_Conditions_Index',
 'Weather_Index',
 'Time_of_Day_OneHot',
 'Day_of_Week_OneHot',
 'Traffic_Conditions_OneHot',
 'Weather_OneHot']

In [39]:
data.show(2)

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+-----------------+-----------------+-----------------+------------------------+-------------+------------------+------------------+-------------------------+--------------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|       Trip_Price|Time_of_Day_Index|Day_of_Week_Index|Traffic_Conditions_Index|Weather_Index|Time_of_Day_OneHot|Day_of_Week_OneHot|Traffic_Conditions_OneHot|Weather_OneHot|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+-----------------+-----------------+-----------------+------------------------+-------------+------------------+------------------+-------------------------+--------------+
|           19.35|    Morning|    Weekday|    

In [41]:
# now adding all the numerical columns

all_features = numerical_cols + [col + "_OneHot" for col in categorical_cols]
all_features

['Trip_Distance_km',
 'Passenger_Count',
 'Base_Fare',
 'Per_Km_Rate',
 'Per_Minute_Rate',
 'Trip_Duration_Minutes',
 'Trip_Price',
 'Time_of_Day_OneHot',
 'Day_of_Week_OneHot',
 'Traffic_Conditions_OneHot',
 'Weather_OneHot']

In [44]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=all_features, outputCol="features")


data = assembler.transform(data)

In [45]:
data.show()

+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+------------------+-----------------+-----------------+------------------------+-------------+------------------+------------------+-------------------------+--------------+--------------------+
|Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|Base_Fare|Per_Km_Rate|Per_Minute_Rate|Trip_Duration_Minutes|        Trip_Price|Time_of_Day_Index|Day_of_Week_Index|Traffic_Conditions_Index|Weather_Index|Time_of_Day_OneHot|Day_of_Week_OneHot|Traffic_Conditions_OneHot|Weather_OneHot|            features|
+----------------+-----------+-----------+---------------+------------------+-------+---------+-----------+---------------+---------------------+------------------+-----------------+-----------------+------------------------+-------------+------------------+------------------+-------------------------+--------------+--

vector is created at the end of the table where it is list of all the numerical_col values followed by categorical col values i.e. after encoded seperated by a comma

**Why Vector Assembly ? **

- Machine learning models in PySpark require a single feature column in vector format.

In [48]:
data.select("features").show(truncate=False)

+-----------------------------------------------------------------------------------+
|features                                                                           |
+-----------------------------------------------------------------------------------+
|[19.35,3.0,3.56,0.8,0.32,53.82,36.2624,0.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0]            |
|[47.59,1.0,3.52,0.62,0.43,40.57,50.07450000000001,1.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0] |
|[36.87,1.0,2.7,1.21,0.15,37.27,52.9032,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0]            |
|[30.33,4.0,3.48,0.51,0.15,116.81,36.4698,0.0,0.0,1.0,1.0,1.0,0.0,1.0,0.0]          |
|[25.79,3.0,2.93,0.63,0.32,22.64,15.618000000000002,0.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0]|
|[8.64,2.0,2.55,1.71,0.48,89.33,60.202799999999996,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0] |
|[3.85,4.0,3.51,1.66,0.29,5.05,11.2645,1.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0]             |
|[43.44,3.0,2.97,1.87,0.23,61.79,101.1216,0.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0]          |
|[30.45,3.0,2.77,1.78,0.34,110.33,50.07450000000001,0.

Here:
- If the vectors are enclosed in a tuple then that representation is Sparse Vector Representation i.e. when most of the values are Zeros
- IF the vectors are not enclosed in a tuple then it is a dense vector representation and when are mostly non-zeros

In [49]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [54]:
target_col = 'Trip_Price'

In [55]:
final_data_model = data.select('features',target_col)

In [57]:
final_data_model.show(3)

+--------------------+-----------------+
|            features|       Trip_Price|
+--------------------+-----------------+
|[19.35,3.0,3.56,0...|          36.2624|
|[47.59,1.0,3.52,0...|50.07450000000001|
|[36.87,1.0,2.7,1....|          52.9032|
+--------------------+-----------------+
only showing top 3 rows



In [60]:
train_df, test_df = final_data_model.randomSplit([0.7, 0.3], seed=42)


In [62]:
lr = LinearRegression(featuresCol="features", labelCol=target_col)
lr_model = lr.fit(train_df)


In [64]:
predict_the_tripPrice = lr_model.transform(test_df)

In [65]:
evaluate_rmse = RegressionEvaluator(labelCol=target_col, metricName="rmse")
evaluate_r2 = RegressionEvaluator(labelCol=target_col, metricName="r2")

In [67]:
evaluate_rmse , evaluate_r2

(RegressionEvaluator_0dc0b663b674, RegressionEvaluator_f9542dae1b05)

In [70]:
rmse = evaluate_rmse.evaluate(predict_the_tripPrice)
r2 = evaluate_r2.evaluate(predict_the_tripPrice)

In [72]:
rmse,r2

(1.8876695734522429e-13, 1.0)