## **Master in Applied Artificial Intelligence**
### **Course: Artificial Intelligence and Automatic Learning**
### Prof. Iván Olmos
#### Tecnológico de Monterrey

### **Supervised and Unsupervised Learning**
##### **Name and enrollment number:**
* Mario Guillen de la Torre - A01796701

## **I. Introduction**

Machine learning algorithms are mainly divided into two categories: supervised and unsupervised algorithms. Each one expects a different data structure and is used to solve problems of different natures.

##### **1.1 Supervised Algorithms**
Supervised algorithms expect a dependent or “target” variable whose relationship with the independent variables can be analyzed to find patterns and generate a predictive model. The data type of the dependent variable, whether categorical or continuous, will determine the type of problem to address: classification or regression.

In classification problems, the goal is to predict the discrete category to which a new record belongs. Classic algorithms include:

- Naive Bayes classifier
- Vector support machines (SVMs)
- Logistics regression
- Decision trees (and its derived algorithms as decision forests)

The PySpark MLlib library provides implementations of all these algorithms.

In regression problems, the goal is to predict a continuous value based on the independent variables. PySpark includes implementations of the following algorithms (among others):

- Decision tree returns.
- GBT Rots (gradient boosted trees)
- FM Rots (Machines Factor)

##### **1.2 Unsupervised Algorithms**
Unsupervised algorithms do not have a “target” variable. Instead, they aim to find patterns, structures, or groupings that are intrinsically present in the analyzed data. These patterns can be used for decision-making and dimensionality reduction.

A common use case is the search for frequent patterns in the data, meaning identifying recurring relationships among variables. PySpark implements two different algorithms for this purpose:

- FPGrowth
- Prefixspan

Another approach is clustering analysis, where the goal is to assign each record to a group and determine which group a new record belongs to. Common clustering algorithms implemented by PySpark include:

- LDA
- Gaussianmixture
- Kmeans
- Bisectingkmeans
- PoweriteratiClustering

#### **II. Data Selection**
As mentioned in previous notebooks, partitioning rules were established based on three characterization variables, selected for their relevance in identifying behavioral patterns:

- Payment_Group: Groups payment methods into Credit Card, Cash, Mobile, and Other. _This variable is fundamental, as there is empirical evidence that passengers who pay with a card tend to leave tips more frequently than those who pay with cash._

- Pickup_zone_group: Corresponds to the starting area of the trip. The most representative community areas were grouped as: 76, 8, 32, 28, and Other (any other area). _This variable is used as a proxy for urban and socioeconomic context, since different areas can reflect different passenger profiles._

- Duration_group: Built from the duration_minutes variable by applying percentile-based binning with the following ranges: Flash Riders ≤10 min, Urban Cruisers between 10 and 23.2 min, and Long-Haul Nomads >23.2 min. _This grouping reflects different types of trips, from short rides typical of urban centers to long journeys, each with different expectations and behaviors associated with the service._

These three variables define the partitioning space, generating combinations that capture different passenger profiles. In total, the result is:

- 4 (payment_group) × 5 (pickup_zone_group) × 3 (duration_group) = **60 partition combinations**

All these combinations describe a wide range of travelers, which inevitably leads to combinations that occur only rarely. To reduce the complexity of our problem, partitions that occur less than 2% of the time within our dataset are merged, reducing the number of combinations to 21 (including the newly merged group).

Each of these partitions represents a distinct travel profile (for example, long trips paid with a card and starting in tourist areas). It is important to emphasize that not all profiles have the same proportion of data, which could introduce bias if the sampling technique is not properly defined. For this reason, stratified sampling was chosen, as it allows extracting a balanced proportion of records across all groups, preventing the model from learning patterns solely from the most frequent profiles.

#### **References**
Smartcitiesworld. (2022). Predictive Analytics Key to Easing Traffic Congestion.  
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; https: //www.smartcitiesworld.net/news/news/predictive-analytics-key-to-aeasing-traffic-congestion-7502

Guo, Y., Liu, Y., Wang, J., & Chen, H. (2023). Urban Mobility Hotspots and Their Implications for Resilient City Planning.  
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; journal of transport geography, 108, 103567. https://www.sciencedirect.com/science/article/pii/s096669232300039x?via%3dihub

LE, James. (2019, July 23). Using Ant Colony and Genetic Evolution to Optimize Ride-Sharing Trip Duration.  
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; medium. https://medium.com/data-science/using-ant-colony-and-genetic-evolution-to-optimize-ride-sharing-trip-duration-56194215923F

City of Chicago. (2024). Taxi Trips (2024-) [Data set].  
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; https: //data.cityofchicago.org/transportation/taxi-trips-2024-/ajtu-isnz/about_data

Ahmed, S. K. (2024). Research Methodology Simplified: How to Choose The Right Sample Technique and Determine The Approprot Sample Size for Research.  
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; oral oncology reports, 12, 100662. https://doi.org/10.1016/J.OOR.2024.100662

Pok, A. (2023). Scaling Machine Learning with Spark: Distributed ml with mllib, tensorflow, and pytorch. O’Reilly Media.

---

## **Implementation**

#### **Library Import**

As a first step, we import the libraries required for the execution of our code.

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer,StringIndexer,OneHotEncoder,StandardScaler,VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, isnan, when, count, percentile_approx, min, max, mean, stddev, approx_count_distinct, expr,  concat_ws, lit
from pyspark.sql.functions import hour, dayofweek, unix_timestamp, when, month,to_timestamp,  dayofweek
from pyspark.sql import functions as F

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator, ClusteringEvaluator
from pyspark.ml.clustering import KMeans

import scipy.stats as stats
import pandas as pd
import os
from IPython.display import display, HTML

#### **Creating the Spark Session**

Next, we create our Spark session and define a function that will allow us to display Spark DataFrames in a more user-friendly way. To do this, the DataFrame is converted into a pandas DataFrame and rendered using HTML.

In [2]:
spark = SparkSession.builder \
    .appName("ChicagoTaxyTripsAnalysis") \
     .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.python.worker.timeout", "600") \
    .config("spark.python.worker.retries", "3") \
    .getOrCreate()

In [3]:
def pretty_display(df, limit=100):
    pdf = df.limit(limit).toPandas()
    display(HTML(pdf.to_html(notebook=True)))

#### **Loading the Dataset**

We now load our dataset and check the number of rows and columns, giving us an idea of the dataset’s size and dimensions.

In [4]:
filename = "Taxi_Trips__2024-__20250426.csv"
# Local route
local_path = f"{filename}"

dftaxytrips = spark.read.csv(local_path, header=True, inferSchema=True)

In [5]:
print("Number of records:", dftaxytrips.count())
print("Number of columns:", len(dftaxytrips.columns))

Number of records: 7917844
Number of columns: 23


#### **Data Exploration**

Next, we examine the schema of our dataset, paying attention to columns that are of textual type, as these will require different handling in later sections.

In [6]:
dftaxytrips.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Taxi ID: string (nullable = true)
 |-- Trip Start Timestamp: string (nullable = true)
 |-- Trip End Timestamp: string (nullable = true)
 |-- Trip Seconds: integer (nullable = true)
 |-- Trip Miles: double (nullable = true)
 |-- Pickup Census Tract: long (nullable = true)
 |-- Dropoff Census Tract: long (nullable = true)
 |-- Pickup Community Area: integer (nullable = true)
 |-- Dropoff Community Area: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tips: double (nullable = true)
 |-- Tolls: double (nullable = true)
 |-- Extras: double (nullable = true)
 |-- Trip Total: double (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Pickup Centroid Latitude: double (nullable = true)
 |-- Pickup Centroid Longitude: double (nullable = true)
 |-- Pickup Centroid Location: string (nullable = true)
 |-- Dropoff Centroid Latitude: double (nullable = true)
 |-- Dropoff Centroid 

#### **Characterization Variables**

As mentioned in previous deliveries, we decided to apply transformations to our data to generate new columns that provide meaningful information for our analysis. A clear example of this is the creation of the columns "trip_hour", "trip_day_of_week", and "trip_month", which extract components from the trip’s start date and time. These columns can be very important in identifying patterns in our data.

In addition, some columns are grouped into categories, and binning is applied to others to reduce the complexity of the data.

In [7]:
# Trip Start Timestamp is Timestamp type
dftaxytrips = dftaxytrips.withColumn(
    "trip_start_ts",
    to_timestamp(col("Trip Start Timestamp"), "MM/dd/yyyy hh:mm:ss a")
)

# Time of day
dftaxytrips = dftaxytrips.withColumn("trip_hour", hour(col("trip_start_ts")))

# Week day (1 = Sunday, 7 = Saturday)
dftaxytrips = dftaxytrips.withColumn("trip_day_of_week", dayofweek(col("trip_start_ts")))

# Month of the year (January 1, 12 = December)
dftaxytrips = dftaxytrips.withColumn("trip_month", month(col("trip_start_ts")))

# Trip duration in minutes
dftaxytrips = dftaxytrips.withColumn("duration_minutes", col("Trip Seconds") / 60)

# Tip/Fare ratio
dftaxytrips = dftaxytrips.withColumn("tip_ratio",
    when(col("Fare") > 0, col("Tips") / col("Fare")).otherwise(0))

# Tip/Trip Miles Ratio
dftaxytrips = dftaxytrips.withColumn("tip_per_mile",
    when(col("Trip Miles") > 0, col("Tips") / col("Trip Miles")).otherwise(0))

# Payment method group
dftaxytrips = dftaxytrips.withColumn("payment_group",
    when(col("Payment Type") == "Credit Card", "Credit Card")
    .when(col("Payment Type") == "Cash", "Cash")
    .when(col("Payment Type") == "Mobile", "Mobile")
    .otherwise("Other"))

# Company group
dftaxytrips = dftaxytrips.withColumn("company_group",
    when(col("Company") == "Flash Cab", "Flash Cab")
    .when(col("Company") == "Taxi Affiliation Services", "Taxi Affiliation")
    .when(col("Company") == "Taxicab Insurance Agency Llc", "Insurance Agency")
    .when(col("Company") == "Sun Taxi", "Sun Taxi")
    .when(col("Company") == "City Service", "City Service")
    .when(col("Company") == "Chicago Independents", "Chicago Independents")
    .otherwise("Other"))

# Origin Zone Group
dftaxytrips = dftaxytrips.withColumn("pickup_zone_group",
    when(col("Pickup Community Area") == 76, 76)
    .when(col("Pickup Community Area") == 8, 8)
    .when(col("Pickup Community Area") == 32, 32)
    .when(col("Pickup Community Area") == 28, 28)
    .otherwise("Other"))

# Group Destination Zone
dftaxytrips = dftaxytrips.withColumn("dropoff_zone_group",
    when(col("Dropoff Community Area") == 8, 8)
    .when(col("Dropoff Community Area") == 32, 32)
    .when(col("Dropoff Community Area") == 28, 28)
    .when(col("Dropoff Community Area") == 76, 76)
    .otherwise("Other"))

# Rename certain columns
dftaxytrips = dftaxytrips.withColumnRenamed("Trip ID", "trip_id")
dftaxytrips = dftaxytrips.withColumnRenamed("Trip Miles", "trip_miles")

In [8]:
# Trip duration (in minutes)
dftaxytrips = dftaxytrips.withColumn(
    "duration_group",
    (
        when(col("duration_minutes") <= 10.0, "Flash Riders")           # Very short, high -rotation trips
        .when(col("duration_minutes") <= 23.2, "Urban Cruisers")        # Typical paths within the city
        .otherwise("Long-Haul Nomads")                                  # long journeys, possibly between distant districts
    )
)

### **Data Selection**

#### **Partitioning Rules**

Since the main goal of the project is to analyze tips in Chicago taxi trips and predict relevant patterns, we have chosen the following three variables for partitioning:


* **`Payment_group`:** Groups payment methods into Credit Card, Cash, Mobile, and Other. This variable is considered key due to the strong relationship between card payments and tipping behavior.
* **`Pickup_zone_group`:** Groups pickup zones into specific areas (76, 8, 32, 28) and an “Other” group that includes all remaining zones. It serves as a proxy for socioeconomic or commercial location.
* **`duration_group`:** Classifies trip duration into Flash Riders (≤10 min), Urban Cruisers (10–23.2 min), and Long-Haul Nomads (>23.2 min). This grouping captures the intensity and context of the trip.

These variables allow us to capture key behavioral factors related to the passenger’s decision to leave a tip.

We then partition the dataset using these variables, which will show us the proportion of each segment and help us make informed decisions for our sampling process.

In [9]:
partition_counts = dftaxytrips.groupBy(
    "payment_group", "pickup_zone_group", "duration_group"
).agg(count("*").alias("count"))

# Calculate total general
total_count = dftaxytrips.count()

# Add proportion by combination
partition_counts = partition_counts.withColumn(
    "proportion", col("count") / total_count
)

# Orderly
partition_counts.orderBy(col("proportion").desc()).show(60)


+-------------+-----------------+----------------+------+--------------------+
|payment_group|pickup_zone_group|  duration_group| count|          proportion|
+-------------+-----------------+----------------+------+--------------------+
|  Credit Card|               76|Long-Haul Nomads|932993| 0.11783422355883748|
|        Other|            Other|  Urban Cruisers|447407| 0.05650616506210529|
|        Other|            Other|Long-Haul Nomads|430338| 0.05435040144766681|
|         Cash|            Other|    Flash Riders|310727| 0.03924389012968682|
|         Cash|                8|    Flash Riders|306484| 0.03870801192849973|
|  Credit Card|               32|    Flash Riders|291907|0.036866980455790746|
|  Credit Card|                8|    Flash Riders|276498| 0.03492086987316244|
|         Cash|               32|    Flash Riders|255828| 0.03231031073610442|
|       Mobile|                8|    Flash Riders|225234|0.028446380100441485|
|  Credit Card|            Other|Long-Haul Nomads|21

As we can see, there are a total of 60 segments in our population. Many of these have a very small number of records, so including them in our sampling techniques could lead to errors and unnecessarily complicate the process with segments that are not representative of the overall population. For these reasons, we chose to group all segments that represent less than 2% of the population into a new segment labeled "Other."

To achieve this, the following steps are carried out:
- A dataset is created containing only the segments with more than 2% of the population, using a filter on the "proportion" column.
- A new column, "StrataGrouping", is added to this dataset. It serves as an identifier by concatenating the values of "payment_group", "pickup_zone_group", and "duration_group".
- This dataset is then joined with the original dataset using the three aforementioned columns. As a result, the "StrataGrouping" column will have values only for the segments with more than 2% of the population.
- For the remaining segments, the value "Other" is imputed.

In [10]:
significant_combinations = partition_counts.filter(col("proportion") > 0.02)

significant_combinations = significant_combinations.withColumn(
    "StrataGrouping", concat_ws("_", "payment_group", "pickup_zone_group", "duration_group")
)

dftaxytrips_with_strata = dftaxytrips.join(
    significant_combinations.select(
        "payment_group", "pickup_zone_group", "duration_group", "StrataGrouping"
    ),
    on=["payment_group", "pickup_zone_group", "duration_group"],
    how="left"
)

dftaxytrips_with_strata = dftaxytrips_with_strata.withColumn(
    "StrataGrouping",
    when(col("StrataGrouping").isNull(), lit("Other")).otherwise(col("StrataGrouping"))
)

In [11]:
grouped_partition_counts = dftaxytrips_with_strata.groupBy(
    "StrataGrouping"
).agg(count("*").alias("count"))

# Add proportion by combination
grouped_partition_counts = grouped_partition_counts.withColumn(
    "proportion", col("count") / total_count
)

# Orderly
grouped_partition_counts.orderBy(col("proportion").desc()).show(60)


+--------------------+-------+--------------------+
|      StrataGrouping|  count|          proportion|
+--------------------+-------+--------------------+
|               Other|2377353|  0.3002525687548277|
|Credit Card_76_Lo...| 932993| 0.11783422355883748|
|Other_Other_Urban...| 447407| 0.05650616506210529|
|Other_Other_Long-...| 430338| 0.05435040144766681|
|Cash_Other_Flash ...| 310727| 0.03924389012968682|
| Cash_8_Flash Riders| 306484| 0.03870801192849973|
|Credit Card_32_Fl...| 291907|0.036866980455790746|
|Credit Card_8_Fla...| 276498| 0.03492086987316244|
|Cash_32_Flash Riders| 255828| 0.03231031073610442|
|Mobile_8_Flash Ri...| 225234|0.028446380100441485|
|Credit Card_Other...| 211508|0.026712827380786994|
|Cash_76_Long-Haul...| 206361|0.026062776685168335|
|Cash_Other_Long-H...| 205220| 0.02591867180005062|
|Mobile_Other_Urba...| 203128|0.025654458461166953|
|Credit Card_76_Ur...| 199533|  0.0252004207205901|
|Cash_Other_Urban ...| 194488| 0.02456325231969713|
|Credit Card

#### **Sampling Technique**

Now that we have the "StrataGrouping" column, we can use stratified sampling.
To do this, we first create a dictionary that takes the unique values of "StrataGrouping" and assigns them the percentage of values that will be taken from the population of that segment. Then, the sampleBy function is used to extract the data.

In our case, we chose to use 15% of the data from each stratum. This allows us to save a considerable amount of processing and storage resources, while preserving the data distribution without affecting any stratum, maintaining the integrity of our population within the sample.

In [12]:
fractions_df = dftaxytrips_with_strata.select("StrataGrouping").distinct().withColumn("fraction",lit(0.15))
fractions_dict = fractions_df.rdd.collectAsMap()

In [13]:
sampled_df = dftaxytrips_with_strata.stat.sampleBy("StrataGrouping", fractions_dict, seed=42)

In [14]:
sampled_total_count = sampled_df.count()

In [15]:
sampled_partition_counts = sampled_df.groupBy(
    "StrataGrouping"
).agg(count("*").alias("count"))

# Add proportion per combination
sampled_partition_counts = sampled_partition_counts.withColumn(
    "proportion", col("count") / sampled_total_count
)

# Sort by the most representative
sampled_partition_counts.orderBy(col("proportion").desc()).show(60)

+--------------------+------+--------------------+
|      StrataGrouping| count|          proportion|
+--------------------+------+--------------------+
|               Other|356854| 0.30022976553146846|
|Credit Card_76_Lo...|140422| 0.11814037151176633|
|Other_Other_Urban...| 67046| 0.05640739590931539|
|Other_Other_Long-...| 64443| 0.05421743004182221|
|Cash_Other_Flash ...| 46794| 0.03936890618650635|
| Cash_8_Flash Riders| 46027| 0.03872361082716433|
|Credit Card_32_Fl...| 44029|0.037042645862411586|
|Credit Card_8_Fla...| 41543| 0.03495111488024176|
|Cash_32_Flash Riders| 38282| 0.03220755794828046|
|Mobile_8_Flash Ri...| 33494|  0.0281792995642784|
|Credit Card_Other...| 31932|0.026865151779021254|
|Cash_Other_Long-H...| 30787| 0.02590183602094223|
|Mobile_Other_Urba...| 30682|0.025813497021293066|
|Cash_76_Long-Haul...| 30650|0.025786574659495222|
|Credit Card_76_Ur...| 29751|0.025030224557737107|
|Cash_Other_Urban ...| 29284|0.024637326340249857|
|Credit Card_8_Urb...| 27575|0.

In [16]:
print(sampled_total_count)

1188603


In [17]:
print(total_count)

7917844


### **Data Preparation**

##### **Elimination of non-important columns**

In [18]:
# DATASET structure
sampled_df.printSchema()

root
 |-- payment_group: string (nullable = false)
 |-- pickup_zone_group: string (nullable = false)
 |-- duration_group: string (nullable = false)
 |-- trip_id: string (nullable = true)
 |-- Taxi ID: string (nullable = true)
 |-- Trip Start Timestamp: string (nullable = true)
 |-- Trip End Timestamp: string (nullable = true)
 |-- Trip Seconds: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- Pickup Census Tract: long (nullable = true)
 |-- Dropoff Census Tract: long (nullable = true)
 |-- Pickup Community Area: integer (nullable = true)
 |-- Dropoff Community Area: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tips: double (nullable = true)
 |-- Tolls: double (nullable = true)
 |-- Extras: double (nullable = true)
 |-- Trip Total: double (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Pickup Centroid Latitude: double (nullable = true)
 |-- Pickup Centroid Longitude: double (nullable 

In our dataset we can observe that there are multiple columns dedicated to representing location data, this complicates our model without providing significant information, so we will eliminate all except 'pickup_zone_group' and 'Drugff_zone_group', which we build previously for this same reason.

In addition, the 'Trip Minute' column was calculated directly from the 'Trip Seconds' column, so we eliminate the latter to avoid collinearity problems. The 'trip_id' and 'taxi_id' columns do not present important values for our analysis, so they can be eliminated. And the columns 'Trip Start Timestamp', 'Trip End Timestamp' and 'Trip_start_ts' present a problem similar to the previous ones, where their values are represented in other columns or do not contribute to important information for our analysis, so we can eliminate them.

In [19]:
sampled_df = sampled_df.drop('Pickup Census Tract','Dropoff Census Tract','Pickup Community Area','Dropoff Community Area','Pickup Centroid Longitude','Pickup Centroid Latitude','Dropoff Centroid Latitude','Pickup Centroid Location','Dropoff Centroid Longitude','Dropoff Centroid  Location','Taxi ID','trip_id','Trip Seconds','Trip Start Timestamp', 'Trip End Timestamp','trip_start_ts','Payment Type','Company','StrataGrouping')

##### **Handling Missing Data**

In [20]:
missing_taxytrips = sampled_df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in sampled_df.columns
])

In [21]:
print("Missing values in our sample:")
pretty_display(missing_taxytrips)

Missing values in our sample:


Unnamed: 0,payment_group,pickup_zone_group,duration_group,trip_miles,Fare,Tips,Tolls,Extras,Trip Total,trip_hour,trip_day_of_week,trip_month,duration_minutes,tip_ratio,tip_per_mile,company_group,dropoff_zone_group
0,0,0,0,8,3049,3049,3049,3049,3049,0,0,0,244,0,2790,0,0


Because our goal is to predict the TIP values, it is important to eliminate those cases in which this column has null values.

In [22]:
sampled_df = sampled_df.where(sampled_df.Tips != 0)

For the remaining numerical columns we can define a simple imputator that uses the average of each column.

In [23]:
# List of numerical variables to impute
vars_a_imputar = ["duration_minutes", "trip_miles", "Fare","Tolls","Extras","Trip Total","tip_ratio", "tip_per_mile"]

# We apply imputation with median
for var in vars_a_imputar:
    mediana = sampled_df.approxQuantile(var, [0.5], 0.01)[0]
    sampled_df = sampled_df.withColumn(
        var, when(col(var).isNull(), mediana).otherwise(col(var))
    )

For categorical variables, we define an imputer that uses a placeholder value to ensure that no null values remain in the future.

Meanwhile, the variables "trip_hour", "trip_day_of_week", and "trip_month" are numeric types, but they represent categorical values (hour, day of the week, and month). Therefore, I will treat them as categorical variables and assign them a separate imputer.

In [24]:
# List of numerical variables to impute
vars_a_imputar = ["payment_group", "pickup_zone_group", "duration_group","company_group","dropoff_zone_group"]

# We apply imputation with median
for var in vars_a_imputar:
    sampled_df = sampled_df.withColumn(
        var, when(col(var).isNull(), "NA").otherwise(col(var))
    )
vars_a_imputar = ["trip_hour","trip_day_of_week","trip_month"]

# We apply imputation with median
for var in vars_a_imputar:
    sampled_df = sampled_df.withColumn(
        var, when(col(var).isNull(), 0).otherwise(col(var))
    )

In [25]:
# Analysis of missing values in 'dftaxytrips_selected'
missing_taxytrips = sampled_df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in sampled_df.columns
])

In [26]:
print("Missing values in Chicago Taxi Trips Dataset (CSV):")
pretty_display(missing_taxytrips)

Missing values in Chicago Taxi Trips Dataset (CSV):


Unnamed: 0,payment_group,pickup_zone_group,duration_group,trip_miles,Fare,Tips,Tolls,Extras,Trip Total,trip_hour,trip_day_of_week,trip_month,duration_minutes,tip_ratio,tip_per_mile,company_group,dropoff_zone_group
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [27]:
sampled_df.printSchema()

root
 |-- payment_group: string (nullable = false)
 |-- pickup_zone_group: string (nullable = false)
 |-- duration_group: string (nullable = false)
 |-- trip_miles: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tips: double (nullable = true)
 |-- Tolls: double (nullable = true)
 |-- Extras: double (nullable = true)
 |-- Trip Total: double (nullable = true)
 |-- trip_hour: integer (nullable = true)
 |-- trip_day_of_week: integer (nullable = true)
 |-- trip_month: integer (nullable = true)
 |-- duration_minutes: double (nullable = true)
 |-- tip_ratio: double (nullable = true)
 |-- tip_per_mile: double (nullable = true)
 |-- company_group: string (nullable = false)
 |-- dropoff_zone_group: string (nullable = false)



##### **Handling Atypical Data**

First we define auxiliary functions that will help us detect and eliminate atypical values. It is important to consider that even if atypical values are not detected in the current sample, we have to include code to deal with them when they occur.

In [28]:
def count_outliers(df, column):
    percentiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
    Q1 = percentiles[0]
    Q3 = percentiles[1]

    IQR = Q3 - Q1

    lower_limit = Q1 - 1.5 * IQR
    upper_limit = Q3 + 1.5 * IQR

    return df.filter((col(column) < lower_limit) & (col(column) > upper_limit)).count()

def remove_outliers_inplace(df, column):

    percentiles = df.approxQuantile(column, [0.25, 0.75], 0.05)
    Q1 = percentiles[0]
    Q3 = percentiles[1]

    IQR = Q3 - Q1

    lower_limit = Q1 - 1.5 * IQR
    upper_limit = Q3 + 1.5 * IQR

    df = df.filter((col(column) >= lower_limit) & (col(column) <= upper_limit))

    return df

In [29]:
NumVar = ["duration_minutes", "trip_miles", "Fare","Tips","Tolls","Extras","Trip Total","tip_ratio", "tip_per_mile"]
for i in NumVar:
    outlier_count = count_outliers(sampled_df, i)
    print(f"Valores atípicos para {i}: {outlier_count}")

Valores atípicos para duration_minutes: 0
Valores atípicos para trip_miles: 0
Valores atípicos para Fare: 0
Valores atípicos para Tips: 0
Valores atípicos para Tolls: 0
Valores atípicos para Extras: 0
Valores atípicos para Trip Total: 0
Valores atípicos para tip_ratio: 0
Valores atípicos para tip_per_mile: 0


We add code that manages atypical values in case they occur

In [30]:
sampled_no_outlier_df = sampled_df
for i in NumVar:
    sampled_no_outlier_df = remove_outliers_inplace(sampled_df, i)

### **Preparation of the training and test sets**

To avoid issues related to bias injection, the dataset is split into training and testing sets.

In [31]:
train_df, test_df = sampled_no_outlier_df.randomSplit([0.7, 0.3], seed=42)

##### **Categorical data transformation**

PySpark models do not allow the use of non-numeric columns, so the following columns must be transformed using a StringIndexer. In addition, one-hot encoding is applied to prevent numeric relationships from being inferred where none exist.

In [32]:
indexer = StringIndexer(
            inputCols=["payment_group","company_group","pickup_zone_group","dropoff_zone_group","duration_group"],
            outputCols= ["payment_group_cat","company_group_cat","pickup_zone_group_cat","dropoff_zone_group_cat","duration_group_cat"])
indexerFit = indexer.fit(train_df)
train_indexed_df = indexerFit.transform(train_df)

In [33]:
train_indexed_df = train_indexed_df.select(
 'trip_miles',
 'Fare',
 'Tips',
 'Tolls',
 'Extras',
 'Trip Total',
 'trip_hour',
 'trip_day_of_week',
 'trip_month',
 'duration_minutes',
 'tip_ratio',
 'tip_per_mile',
 'payment_group_cat',
 'company_group_cat',
 'pickup_zone_group_cat',
 'dropoff_zone_group_cat',
 'duration_group_cat')

In [34]:
encoder = OneHotEncoder(
            inputCols=["payment_group_cat","company_group_cat","pickup_zone_group_cat","dropoff_zone_group_cat","duration_group_cat","trip_hour", "trip_day_of_week", "trip_month"],
            outputCols= ["payment_group_ohe","company_group_ohe","pickup_zone_group_ohe","dropoff_zone_group_ohe","duration_group_ohe","trip_hour_ohe", "trip_day_of_week_ohe", "trip_month_ohe"])
encoderFit = encoder.fit(train_indexed_df)
train_encoded_df = encoderFit.transform(train_indexed_df)

In [35]:
train_encoded_df = train_encoded_df.select(
 'trip_miles',
 'Fare',
 'Tips',
 'Tolls',
 'Extras',
 'Trip Total',
 'trip_hour_ohe',
 'trip_day_of_week_ohe',
 'trip_month_ohe',
 'duration_minutes',
 'tip_ratio',
 'tip_per_mile',
 'payment_group_ohe',
 'company_group_ohe',
 'pickup_zone_group_ohe',
 'dropoff_zone_group_ohe',
 'duration_group_ohe')

##### **Numerical data transformation for supervised learning**

Since the numerical data are on different scales and there are no outliers (these were handled in earlier steps), we opted to use a StandardScaler to transform these columns.

It is important to note that the Tip column is not modified, as it is our target column for the supervised learning algorithm.

In [36]:
numeric_cols = ["duration_minutes", "trip_miles", "Fare", "Tolls", "Extras", "Trip Total", "tip_ratio", "tip_per_mile"]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features_vec")
train_encoded_num_vectorized_df = assembler.transform(train_encoded_df)

In [37]:
scaler = StandardScaler(
    inputCol="numeric_features_vec",
    outputCol="numeric_features_scaled",
    withMean=True,
    withStd=True
)

scalerFit = scaler.fit(train_encoded_num_vectorized_df)
train_encoded_num_scaled_df = scalerFit.transform(train_encoded_num_vectorized_df)

##### **Final DataFrame for the supervised learning model**

Finally, the DataFrame for the supervised learning model is created. To achieve this, a VectorAssembler is used to generate the features column, which contains vectors with all the previously transformed columns.

In [38]:
final_features = ["numeric_features_scaled", 'payment_group_ohe',
 'company_group_ohe',
 'pickup_zone_group_ohe',
 'dropoff_zone_group_ohe',
 'duration_group_ohe','trip_hour_ohe',
 'trip_day_of_week_ohe']
assembler_final = VectorAssembler(inputCols=final_features, outputCol="features")
train_final_df = assembler_final.transform(train_encoded_num_scaled_df )

In [39]:
train_final_df = train_final_df.select('features','tips')
train_final_df = train_final_df.withColumnRenamed("tips", "label")

##### **Transforming the testing set for supervised learning**

We also transform our test dataset, ensuring that we do not use the fit function on any of our transformers for this set.

In [40]:
test_indexed_df = indexerFit.transform(test_df)
test_encoded_df = encoderFit.transform(test_indexed_df)
test_vectorized_df = assembler.transform(test_encoded_df)
test_scaled_df = scalerFit.transform(test_vectorized_df)
test_final_df = assembler_final.transform(test_scaled_df)
test_final_df = test_final_df.select('features','tips')

In [41]:
test_final_df  = test_final_df .withColumnRenamed("tips", "label")

##### **Numerical data transformation for unsupervised learning**

The same steps are then repeated for the dataset used in the unsupervised learning model. The main difference here is that this dataset includes the tips column.

In [42]:
numeric_cols_nonSup = ["duration_minutes", "trip_miles", "Fare", "Tolls", "Extras", "Trip Total", "tip_ratio", "tip_per_mile","tips"]
assembler_nonSup = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features_vec")
train_vectorized_df_nonSup = assembler.transform(train_encoded_df)

In [43]:
scaler_nonSup = StandardScaler(
    inputCol="numeric_features_vec",
    outputCol="numeric_features_scaled",
    withMean=True,
    withStd=True
)

scalerFit_nonSup = scaler.fit(train_vectorized_df_nonSup)
train_scaled_df_nonSup = scalerFit.transform(train_vectorized_df_nonSup)

##### **Final DataFrame for the unsupervised learning model**

In [44]:
train_final_df_nonSup = assembler_final.transform(train_scaled_df_nonSup )

In [45]:
train_final_df_nonSup = train_final_df.select('features')

##### **Transforming the testing set for the unsupervised learning model**

In [46]:
test_vectorized_nonSup_df = assembler_nonSup.transform(test_encoded_df)
test_scaled_nonSup_df = scalerFit_nonSup.transform(test_vectorized_df)
test_final_nonSup_df = assembler_final.transform(test_scaled_df)
test_final_nonSup_df = test_final_nonSup_df .select('features')

### **Building supervised and unsupervised learning models**

#### **Creating the supervised learning model**

For our supervised learning problem, we will use linear regression to predict the values of the 'tips' column.

We define our model and the parameters to be used in the grid search.

In [47]:
lr = LinearRegression()
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.01, 0.1]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
evaluator = RegressionEvaluator(metricName="rmse")
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4
)

We train our grid search and obtain the best model.

In [48]:
cv_model = cv.fit(train_final_df)
best_model = cv_model.bestModel

We print the parameters of our model and evaluate its performance on our test dataset.

In [49]:
predictions = best_model.transform(test_final_df)
rmse = evaluator.evaluate(predictions)

print("Best model parameters:")
print(f"  regParam: {best_model._java_obj.getRegParam()}")
print(f"  elasticNetParam: {best_model._java_obj.getElasticNetParam()}")
print(f"RMSE en set de prueba: {rmse}")

Best model parameters:
  regParam: 0.0
  elasticNetParam: 0.5
RMSE en set de prueba: 0.3389000723663502


In [50]:
predictions.select("label", "prediction").show(5)

+-----+------------------+
|label|        prediction|
+-----+------------------+
|  2.0|2.1460191983007038|
|  3.0| 3.113884371945795|
|  1.0|1.1283108188735556|
|  2.0| 2.128203154836537|
|  9.5| 9.448396205437547|
+-----+------------------+
only showing top 5 rows



We display metrics for our target column to put our results into context.

In [51]:
train_final_df.selectExpr("mean(label)", "stddev(label)", "min(label)", "max(label)").show()

+------------------+-----------------+----------+----------+
|       mean(label)|    stddev(label)|min(label)|max(label)|
+------------------+-----------------+----------+----------+
|6.0354008949752345|4.440883954488067|      0.01|     150.0|
+------------------+-----------------+----------+----------+



From this, we can conclude that our model produces strong results that differ only by a few cents when predicting tip amounts, thus achieving our objective.

#### **Creating the unsupervised learning model**

For the unsupervised learning model, we chose to use KMeans to perform clustering on our dataset. This can be very valuable from a business perspective to better understand consumer behavior patterns.

In [52]:
k_values = [2, 4, 6, 8]
max_iter_values = [10, 20]
evaluator = ClusteringEvaluator()

We define the values to be used in our GridSearch and the evaluator.
We then run the GridSearch; since PySpark does not provide a native implementation, we use two nested for loops.

In [53]:
best_model = None
best_score = float('-inf')
best_params = {}

for k in k_values:
    for max_iter in max_iter_values:
        kmeans = KMeans(k=k, maxIter=max_iter, seed=1)
        model = kmeans.fit(train_final_df_nonSup)
        predictions = model.transform(train_final_df_nonSup)

        score = evaluator.evaluate(predictions)

        if score > best_score:
            best_score = score
            best_model = model
            best_params = {"k": k, "maxIter": max_iter}

In [54]:
print(f"Best Parameters={best_params}, Best Result={best_score}")

Best Parameters={'k': 2, 'maxIter': 10}, Best Result=0.4181930478893609


We take the best model and try it with our test data

In [55]:
testResults = evaluator.evaluate(best_model.transform(test_final_nonSup_df))
print(f"Model results with test data = {testResults:.4f}")

Model results with test data = 0.4313


The default evaluation method for the clustering evaluator uses the silhouette score, which ranges from [+1, -1], where 1 indicates an ideal clustering and -1 indicates incorrect clusters.

The final evaluation with the test set, resulting in a score of 0.41, indicates that while the clustering is positive, it is not perfect and there is ample room for improvement. Next steps could include a broader hyperparameter search or the use of other algorithms.
