# Installing and Setting up PySpark and Kaggle

In [1]:
# Install necessary libraries
!pip install pyspark
!pip install kaggle

# Setup Kaggle API to download dataset
!mkdir ~/.kaggle
!kaggle datasets download -d austinreese/craigslist-carstrucks-data

# Unzip the dataset
!unzip craigslist-carstrucks-data.zip

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=fec71519ff718236666304e613745221889a6156a77361f425b6533bed4c9535
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2
Dataset URL: https://www.kaggle.com/datasets/austinreese/craigslist-carstrucks-data
License(s): CC0-1.0
Downloading craigslist-carstrucks-data.zip to /content
100% 262M/262M [00:01<00:00, 203MB/s]
100% 262M/262M [00:01<00:00, 183MB/s]
Archive:  craigslist-carstrucks-data.zip

# Reading the Data

In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time
from pyspark.sql.functions import col, lit

# Create SparkSession (required to work with PySpark)
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Start timer and read the CSV file into a PySpark DataFrame
start_time = time.time()
df_spark = spark.read.csv("/content/vehicles.csv", header=True, inferSchema=True)
end_time = time.time()

# Display time taken to read the file
print(f"PySpark Reading Time: {end_time - start_time} seconds")

# Display schema of the DataFrame
df_spark.printSchema()

# Show the first 5 rows of the dataset
df_spark.show(5)

PySpark Reading Time: 35.66996765136719 seconds
root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string

# Data Preprocessing

In [3]:
# Drop columns that are not useful for our analysis or model
columns_to_drop = ['url', 'region', 'region_url', 'title_status', 'VIN', 'size', 'image_url', 'lat', 'long', 'county', 'description']
df_spark = df_spark.drop(*columns_to_drop)

In [4]:
# Convert columns to appropriate data types for analysis and modeling
df_spark = df_spark.withColumn("price", col("price").cast("double"))  # Price should be numeric
df_spark = df_spark.withColumn("year", col("year").cast("int"))  # Year should be an integer
df_spark = df_spark.withColumn("odometer", col("odometer").cast("double"))  # Odometer should be numeric

# Print the updated schema to verify data types
df_spark.printSchema()

root
 |-- id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: double (nullable = true)
 |-- transmission: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- description: string (nullable = true)
 |-- state: string (nullable = true)
 |-- posting_date: string (nullable = true)



In [5]:
# Fill missing categorical values with 'unknown' for consistency
df_spark = df_spark.fillna({
    "cylinders": "unknown",
    "fuel": "unknown",
    "transmission": "unknown",
    "drive": "unknown",
    "paint_color": "unknown",
    "type": "unknown"
})


In [6]:
# Filter record with invalid year
df_spark = df_spark.filter((col("year") > 1900) & (col("year") <= 2024))

df_spark.select("year").distinct().orderBy("year").show()

+----+
|year|
+----+
|1901|
|1902|
|1903|
|1905|
|1909|
|1910|
|1913|
|1915|
|1916|
|1918|
|1920|
|1921|
|1922|
|1923|
|1924|
|1925|
|1926|
|1927|
|1928|
|1929|
+----+
only showing top 20 rows



In [7]:
from pyspark.sql.functions import lit

# Calculate age based on the current year
current_year = 2024
df_spark = df_spark.withColumn("age", lit(current_year) - col("year"))

# Show result
df_spark.select("year", "age").show(5)

+----+---+
|year|age|
+----+---+
|2014| 10|
|2010| 14|
|2020|  4|
|2017|  7|
|2013| 11|
+----+---+
only showing top 5 rows



# EDA (Exploratory Data Analysis)

In [8]:
# Number of vehicles per year
df_spark.groupBy("year").count().orderBy("year").show(10)

+----+-----+
|year|count|
+----+-----+
|1901|    3|
|1902|    1|
|1903|   12|
|1905|    1|
|1909|    1|
|1910|    2|
|1913|    2|
|1915|    1|
|1916|    2|
|1918|    1|
+----+-----+
only showing top 10 rows



In [9]:
# Descriptive statistics for numeric columns
df_spark.describe(["price", "odometer", "age"]).show()

+-------+--------------------+------------------+-----------------+
|summary|               price|          odometer|              age|
+-------+--------------------+------------------+-----------------+
|  count|              425827|            421332|           425929|
|   mean|   75279.43337552574| 98224.43318807971|12.76114563694888|
| stddev|1.2197335159309383E7|214118.86783963133|9.431055628948746|
|    min|                 0.0|               0.0|                2|
|    max|       3.736928711E9|             1.0E7|              123|
+-------+--------------------+------------------+-----------------+



In [10]:
# Filter cars priced under $100,000 and manufactured 1990 or later
df_filtered = df_spark.filter((col("price") < 100000) & (col("year") >= 1990))
df_filtered.select("year", "price", "manufacturer", "model").show(5)


+----+-------+------------+--------------------+
|year|  price|manufacturer|               model|
+----+-------+------------+--------------------+
|2014|33590.0|         gmc|sierra 1500 crew ...|
|2010|22590.0|   chevrolet|      silverado 1500|
|2020|39590.0|   chevrolet| silverado 1500 crew|
|2017|30990.0|      toyota|tundra double cab sr|
|2013|15000.0|        ford|           f-150 xlt|
+----+-------+------------+--------------------+
only showing top 5 rows



In [11]:
# Use PySpark SQL
df_spark.createOrReplaceTempView("vehicles")
spark.sql("SELECT year, price, manufacturer, model FROM vehicles WHERE price < 100000 AND year >= 1990").show(5)

+----+-------+------------+--------------------+
|year|  price|manufacturer|               model|
+----+-------+------------+--------------------+
|2014|33590.0|         gmc|sierra 1500 crew ...|
|2010|22590.0|   chevrolet|      silverado 1500|
|2020|39590.0|   chevrolet| silverado 1500 crew|
|2017|30990.0|      toyota|tundra double cab sr|
|2013|15000.0|        ford|           f-150 xlt|
+----+-------+------------+--------------------+
only showing top 5 rows



In [12]:
# Filter car odometer > 100,000
df_filtered = df_spark.filter(col("odometer") > 100000)
df_filtered.select("year", "odometer", "price").show(5)

+----+--------+-------+
|year|odometer|  price|
+----+--------+-------+
|2013|128000.0|15000.0|
|1992|192000.0| 4500.0|
|2001|144700.0|22500.0|
|2004|176144.0| 3000.0|
|2008|201300.0|17500.0|
+----+--------+-------+
only showing top 5 rows



In [13]:
# Use PySpark SQL
spark.sql("SELECT year, odometer, price FROM vehicles WHERE odometer > 100000").show(5)

+----+--------+-------+
|year|odometer|  price|
+----+--------+-------+
|2013|128000.0|15000.0|
|1992|192000.0| 4500.0|
|2001|144700.0|22500.0|
|2004|176144.0| 3000.0|
|2008|201300.0|17500.0|
+----+--------+-------+
only showing top 5 rows



## Comparing to PySpark and Pandas

## PySpark version

In [14]:
# Read data again
start_time = time.time()
df_spark = spark.read.csv("/content/vehicles.csv", header=True, inferSchema=True)
end_time = time.time()

# Display time taken to read the file
print(f"PySpark Reading Time: {end_time - start_time} seconds")

# Set the current year for age calculation
current_year = 2024

start_time = time.time()

# Drop unused columns
columns_to_drop =['url', 'region', 'region_url', 'title_status', 'VIN', 'size', 'image_url', 'lat', 'long', 'county', 'description']
df_spark = df_spark.drop(*columns_to_drop)

# Calculate age based on the current year
df_spark = df_spark.withColumn("age", lit(current_year) - col("year"))

# Convert columns to appropriate data types
df_spark = df_spark.withColumn("price", col("price").cast("double"))
df_spark = df_spark.withColumn("year", col("year").cast("int"))
df_spark = df_spark.withColumn("odometer", col("odometer").cast("double"))

# Filter record with invalid year
df_spark = df_spark.filter((col("year") > 1900) & (col("year") <= 2024))

# Fill missing values with default values
df_spark = df_spark.fillna({
    "cylinders": "unknown",
    "fuel": "unknown",
    "transmission": "unknown",
    "drive": "unknown",
    "paint_color": "unknown",
    "type": "unknown"
})

end_time = time.time()
print(f"PySpark Processing Time: {end_time - start_time} seconds")

PySpark Reading Time: 18.788039207458496 seconds
PySpark Processing Time: 0.2909984588623047 seconds


## Pandas version

In [57]:
import pandas as pd
import time

current_year = 2024

# Start timer for reading the data
start_time_read = time.time()

# Read the CSV file into a pandas DataFrame
df_pandas = pd.read_csv("/content/vehicles.csv")

end_time_read = time.time()
print(f"Pandas Reading Time: {end_time_read - start_time_read} seconds")

# Start timer for data processing
start_time_process = time.time()

# Drop unused columns
columns_to_drop = ['url', 'region', 'region_url', 'title_status', 'VIN', 'size', 'image_url', 'lat', 'long', 'county', 'description']
df_pandas = df_pandas.drop(columns=columns_to_drop)

# Calculate age based on the current year
df_pandas['age'] = current_year - df_pandas['year']

# Convert columns to appropriate data types
df_pandas['price'] = pd.to_numeric(df_pandas['price'], errors='coerce')
df_pandas['year'] = pd.to_numeric(df_pandas['year'], errors='coerce')
df_pandas['odometer'] = pd.to_numeric(df_pandas['odometer'], errors='coerce')

# Filter records with invalid year
df_pandas = df_pandas[(df_pandas['year'] > 1900) & (df_pandas['year'] <= 2024)]

# Fill missing values with default values
df_pandas.loc['cylinders']=df_pandas['cylinders'].fillna('unknown')
df_pandas.loc['fuel']=df_pandas['fuel'].fillna('unknown')
df_pandas.loc['transmission']=df_pandas['transmission'].fillna('unknown')
df_pandas.loc['drive']=df_pandas['drive'].fillna('unknown')
df_pandas.loc['paint_color']=df_pandas['paint_color'].fillna('unknown')
df_pandas.loc['type']=df_pandas['type'].fillna('unknown')

# End timer for data processing
end_time_process = time.time()

# Print processing time
print(f"Pandas Processing Time: {end_time_process - start_time_process} seconds")

Pandas Reading Time: 33.5674831867218 seconds
Pandas Processing Time: 2.1332733631134033 seconds


# Modelling

## Step 1: Data Cleaning


### 1.1 Drop rows with missing values in important columns

In [24]:
# We remove rows that have missing values in the 'price', 'odometer', 'year', or 'manufacturer' columns.
df_spark = df_spark.dropna(subset=['price', 'odometer', 'year', 'manufacturer'])

### 1.2 Drop outliers using Z-score for numeric columns
Calculate Z-scores and filter out records where Z-score > 2 (approximately 95% confidence interval)

In [25]:
from pyspark.sql.functions import mean, stddev

# Calculate mean and standard deviation for the numeric columns
mean_price, stddev_price = df_spark.select(mean('price'), stddev('price')).first()
mean_odometer, stddev_odometer = df_spark.select(mean('odometer'), stddev('odometer')).first()

# Filter out outliers based on Z-score > 2
df_spark = df_spark.filter(((df_spark['price'] - mean_price) / stddev_price).between(-2, 2))
df_spark = df_spark.filter(((df_spark['odometer'] - mean_odometer) / stddev_odometer).between(-2, 2))

## Step 2: Feature Engineering
Convert categorical columns to numeric using StringIndexer

In [26]:
from pyspark.ml.feature import StringIndexer

# Convert 'manufacturer' to numeric
indexer_manufacturer = StringIndexer(inputCol="manufacturer", outputCol="manufacturer_index")
df_spark = indexer_manufacturer.fit(df_spark).transform(df_spark)

# Convert 'fuel' to numeric
indexer_fuel = StringIndexer(inputCol="fuel", outputCol="fuel_index")
df_spark = indexer_fuel.fit(df_spark).transform(df_spark)

# Convert 'transmission' to numeric
indexer_transmission = StringIndexer(inputCol="transmission", outputCol="transmission_index")
df_spark = indexer_transmission.fit(df_spark).transform(df_spark)

## Step 3: Prepare Features


### 3.1 Assemble features into a single vector

In [27]:
from pyspark.ml.feature import VectorAssembler

# List of features to be used in the model
features = ['year', 'odometer', 'manufacturer_index', 'fuel_index', 'transmission_index']

# Assemble these features into a single feature vector
assembler = VectorAssembler(inputCols=features, outputCol="features")
df_with_features = assembler.transform(df_spark)

### 3.2 Normalize the features

In [28]:
from pyspark.ml.feature import StandardScaler

# Apply standard scaling to normalize the feature vector (zero mean, unit variance)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_with_features)
df_with_scaled_features = scaler_model.transform(df_with_features)

## Step 4: Train-Test Split

### Split data into training and testing sets

In [38]:
# We use an 80-20 split for training and testing the model.
train_data, test_data = df_with_scaled_features.randomSplit([0.8, 0.2], seed=42)

## Step 5: Model Training

In [39]:
from pyspark.ml.regression import DecisionTreeRegressor

# Initialize the Decision Tree model
dt = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="price")

# Train the model on the training data
model = dt.fit(train_data)

## Step 6: Model Prediction & Evaluation

In [40]:
# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model using Mean Squared Error (MSE)
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the evaluator with MSE as the metric
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")

# Calculate MSE on the test data
mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE): {mse}")

Mean Squared Error (MSE): 3224698814.7563186


In [42]:
# Print some predictions and true price
predictions.select("prediction", "price").show(20)

+------------------+-------+
|        prediction|  price|
+------------------+-------+
|28139.301985584116|29590.0|
|32122.851198990324|22990.0|
|15693.980246567666|22590.0|
| 27222.18423206329|38990.0|
|15766.982151244716|30990.0|
|32122.851198990324|21990.0|
|32122.851198990324|38990.0|
|12823.033226081818|27990.0|
|14503.288756167229|18500.0|
|  6743.16269446042|12977.0|
|15693.980246567666|20977.0|
|19544.021471873202|40590.0|
|32122.851198990324|22488.0|
| 26951.87902542373|19995.0|
|28139.301985584116|30990.0|
|  6743.16269446042| 5000.0|
|23136.404829488667|27590.0|
| 27222.18423206329|32590.0|
|28139.301985584116|30990.0|
|28139.301985584116|30990.0|
+------------------+-------+
only showing top 20 rows

