In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, StorageLevel
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, sum, concat_ws, count, desc

In [2]:
conf = SparkConf().setMaster('local[*]').setAppName("CarsData") \
.set("spark.driver.memory", "10g") \
.set("spark.executor.memory", "12g") \
.set("spark.driver.memoryOverhead", "512m") \
.set("spark.executor.memoryOverhead", "512m")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext


In [3]:
data = spark.read.options(header=True, 
                        escape='"', 
                        multiline=True,
                        ).csv('hdfs://localhost:9000/cars/used_cars_data.csv')
                        #).csv('hdfs://localhost:9000/user/mhhel/used_cars_data/used_cars_data.csv') #Mo's path

In [None]:
row_count = data.count()
column_count = len(data.columns)

print(f"Rows: {row_count}, Columns: {column_count}")

In [None]:
data.printSchema()

In [None]:
data.show(5)

Based of our dataset we have decided that we want to discover and analyze the following:

* Discover/look into what kind of make_name & model_name is the most popular car listed in a certain city.
* Are certain types of cars more popular in the geographical sense of the US (say pick-up trucks in the southern part etc).
* Which 5 cities have most car listings

We decide to keep the following columns to work with based of what we think is relevant to our goals:

- city: City where the car is listed
- make_name: Make brand of the listed car
- model_name: Model of the listed car
- dealer_zip: Zipcode of the dealer

In [19]:
columns_to_keep = ["city", "make_name", "model_name", "dealer_zip", "engine_cylinders"]

cleaned_data = data.select(*columns_to_keep)

Verify the new dataframe with it's columns:

In [None]:
cleaned_data.printSchema()
cleaned_data.show(5)

In [9]:

import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

def plot_null_values(df):
    """
    This function takes a PySpark DataFrame and plots the number of null values in each column.
    
    Args:
    df (pyspark.sql.DataFrame): The PySpark DataFrame to analyze.
    
    Returns:
    None: Displays a bar chart of null values per column.
    """
    
    # Step 1: Count the null values for each column
    null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    
    # Step 2: Collect the results into a dictionary where key is the column name, and value is the null count
    null_counts_dict = null_counts.collect()[0].asDict()
    
    # Step 3: Prepare data for plotting
    columns = list(null_counts_dict.keys())
    null_values = [val / 3000040 for val in list(null_counts_dict.values())]
    
    # Step 4: Plot the data using Matplotlib
    plt.figure(figsize=(10, 6))  # Set the figure size
    plt.barh(columns, null_values, color='skyblue',height=0.6)  # Create a horizontal bar plot
    
    plt.xlabel('Number of Null Values (scaled by 1/3000040)', fontsize=12)  # X-axis label
    plt.ylabel('Columns', fontsize=12)  # Y-axis label
    plt.title('Number of Null Values Per Column', fontsize=14)  # Title of the plot
    
    # Annotate the bar plot with the null counts
    for i, val in enumerate(null_values):
        plt.text(val + 0.01, i, f'{val:.4f}', ha='left', va='center', fontsize=10)
    
    plt.tight_layout()  # Ensure the layout fits well
    plt.show()  # Display the plot



In [None]:
# plot_null_values(cleaned_data)

Now lets check for the the unique values of our features to see if there are any typos of any kind:

In [None]:
cleaned_data.select('make_name').distinct().collect()

In [None]:
cleaned_data.select('model_name').distinct().collect()

In [None]:
cleaned_data.select('city').distinct().collect()

In [None]:
cleaned_data.select('dealer_zip').distinct().collect()

FYI about the datatypes of the columns, we want to keep **dealer_zip** as a string because it preserves the leading zeros in the zip code, which good to have as the first number usually represents a broad region in the US.

# Analysis

## Most sold car in Houston

In [None]:
# Filter for records in Houston and create a "Make + Model" column
houston_sales = cleaned_data.filter(col("city") == "Houston")
houston_sales = houston_sales.withColumn("Make_Model", concat_ws(" ", col("make_name"), col("model_name")))

# Count the occurrences of each "Make + Model" and sort to find the most sold
most_sold_houston = houston_sales.groupBy("Make_Model").agg(count("*").alias("count")) \
                                  .orderBy(desc("count")) \
                                  .limit(1)

# Show the result
most_sold_houston.show()

## Cities with most sales

In [None]:
# Group by city and count the number of occurrences
city_sales_count = cleaned_data.groupBy("city").agg(count("*").alias("sales_count"))

# Order by the count in descending order and limit to top 5
top_5_cities = city_sales_count.orderBy(desc("sales_count")).limit(5)

# Show the result
top_5_cities.show()

## Most common engine type

In [None]:
# Group by engine_cylinders and count the number of occurrences
cylinder_count = cleaned_data.groupBy("engine_cylinders").agg(count("*").alias("count"))

# Order by the count in descending order and limit to top 5
top_5_cylinders = cylinder_count.orderBy(desc("count")).limit(5)

# Show the result
top_5_cylinders.show()