PySpark Assignment: Data Cleaning, Transformation, Analysis, and Prediction

Student:Angel Ivan Reyes Torresa ID: C0053883

Dataset Info:

NBrewery Operations and Market Analysis Dataset

a2.6gb file size

Total rows in dataset 10000000138GB


https://www.kaggle.com/datasets/ankurnapa/brewery-operations-and-market-analysis-dataset?select=brewery_data_complete_extended.csv

This dataset presents an extensive collection of data from a craft beer brewery, spanning from January 2020 to January 2024. It encapsulates a rich blend of brewing parameters, sales data, and quality assessments, providing a holistic view of the brewing process and its market implications.

Contain 20 columnsong others.

In [1]:
import pandas as pd
import seaborn as sns
from functools import reduce
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from sklearn.compose import ColumnTransformer
from pyspark.ml.classification import LinearSVC
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, ParamGridBuilder
from pyspark.sql.types import FloatType, DoubleType, IntegerType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.sql.functions import col, count, when, isnan, to_date, year, month, dayofweek, dayofyear, avg

1 Data Loading and Initial Exploration

In [2]:
# Start a Spark session with optimized configurations for handling large datasets

spark = SparkSession.builder \
    .appName("BreweryDataAnalysis") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

In [3]:
# Load the dataset with an appropriate number of partitions
file_path = "C:/Users/angel/00Angel/brewery_data.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [4]:
# Check the schema to see data types and structure
df.printSchema()

root
 |-- Batch_ID: integer (nullable = true)
 |-- Brew_Date: timestamp (nullable = true)
 |-- Beer_Style: string (nullable = true)
 |-- SKU: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Fermentation_Time: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- pH_Level: double (nullable = true)
 |-- Gravity: double (nullable = true)
 |-- Alcohol_Content: double (nullable = true)
 |-- Bitterness: integer (nullable = true)
 |-- Color: integer (nullable = true)
 |-- Ingredient_Ratio: string (nullable = true)
 |-- Volume_Produced: integer (nullable = true)
 |-- Total_Sales: double (nullable = true)
 |-- Quality_Score: double (nullable = true)
 |-- Brewhouse_Efficiency: double (nullable = true)
 |-- Loss_During_Brewing: double (nullable = true)
 |-- Loss_During_Fermentation: double (nullable = true)
 |-- Loss_During_Bottling_Kegging: double (nullable = true)



In [5]:
# Show the first 10 rows to inspect data quality and format
df.show(10, truncate=False)

+--------+-------------------+----------+-------+---------------+-----------------+------------------+------------------+------------------+-----------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+----------------------------+
|Batch_ID|Brew_Date          |Beer_Style|SKU    |Location       |Fermentation_Time|Temperature       |pH_Level          |Gravity           |Alcohol_Content  |Bitterness|Color|Ingredient_Ratio|Volume_Produced|Total_Sales       |Quality_Score    |Brewhouse_Efficiency|Loss_During_Brewing|Loss_During_Fermentation|Loss_During_Bottling_Kegging|
+--------+-------------------+----------+-------+---------------+-----------------+------------------+------------------+------------------+-----------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+-------

In [6]:
# Count the total number of rows in the dataset
row_count = df.count()
print(f"Total rows in dataset: {row_count}")

Total rows in dataset: 10000000


2 Data Cleaning and Transformation

2.1. Handling Missing Values

In [7]:
# Count missing values (nulls) for each column
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

# Show the result
missing_values.show()

+--------+---------+----------+---+--------+-----------------+-----------+--------+-------+---------------+----------+-----+----------------+---------------+-----------+-------------+--------------------+-------------------+------------------------+----------------------------+
|Batch_ID|Brew_Date|Beer_Style|SKU|Location|Fermentation_Time|Temperature|pH_Level|Gravity|Alcohol_Content|Bitterness|Color|Ingredient_Ratio|Volume_Produced|Total_Sales|Quality_Score|Brewhouse_Efficiency|Loss_During_Brewing|Loss_During_Fermentation|Loss_During_Bottling_Kegging|
+--------+---------+----------+---+--------+-----------------+-----------+--------+-------+---------------+----------+-----+----------------+---------------+-----------+-------------+--------------------+-------------------+------------------------+----------------------------+
|       0|        0|         0|  0|       0|                0|          0|       0|      0|              0|         0|    0|               0|              0|      

In [8]:
df_cleaned = df.dropna()  # Removes rows with any null value

In [9]:
row_count = df_cleaned.count()
print(f"Total rows in dataset: {row_count}")

Total rows in dataset: 10000000


In [10]:
# Show the duplicate records (based on all columns)
duplicates_df = df.groupBy(df.columns).count().filter("count > 1")
duplicates_df.show()

+--------+---------+----------+---+--------+-----------------+-----------+--------+-------+---------------+----------+-----+----------------+---------------+-----------+-------------+--------------------+-------------------+------------------------+----------------------------+-----+
|Batch_ID|Brew_Date|Beer_Style|SKU|Location|Fermentation_Time|Temperature|pH_Level|Gravity|Alcohol_Content|Bitterness|Color|Ingredient_Ratio|Volume_Produced|Total_Sales|Quality_Score|Brewhouse_Efficiency|Loss_During_Brewing|Loss_During_Fermentation|Loss_During_Bottling_Kegging|count|
+--------+---------+----------+---+--------+-----------------+-----------+--------+-------+---------------+----------+-----+----------------+---------------+-----------+-------------+--------------------+-------------------+------------------------+----------------------------+-----+
+--------+---------+----------+---+--------+-----------------+-----------+--------+-------+---------------+----------+-----+----------------+----

2.3. Data Type Conversion

In [11]:
# Convert 'Brew_Date' column to date only (removes time part)
df = df.withColumn('Brew_Date', to_date(df['Brew_Date']))

# Show the result to confirm the changes
df.select('Brew_Date').show(5, truncate=False)

+----------+
|Brew_Date |
+----------+
|2020-01-01|
|2020-01-01|
|2020-01-01|
|2020-01-01|
|2020-01-01|
+----------+
only showing top 5 rows



In [12]:
df_cleaned = df.withColumn('Fermentation_Time', df['Fermentation_Time'].cast('integer'))
df_cleaned = df.withColumn('Brew_Date', df['Brew_Date'].cast('timestamp'))
df_cleaned = df.withColumn('Temperature', df['Temperature'].cast('double'))

2.4. Filtering Out Invalid Rows

In [13]:
# Filter out rows with invalid or impossible values
df_cleaned = df_cleaned.filter((df_cleaned['Volume_Produced'] >= 0) & 
                               (df_cleaned['Temperature'] >= 0) & (df_cleaned['Temperature'] <= 100) & 
                               (df_cleaned['pH_Level'] >= 0) & (df_cleaned['pH_Level'] <= 14))

1  Volume_Produced >= 0

Reason: The volume of beer produced (Volume_Produced) cannot be negative because it represents the actual quantity of beer brewed. Negative values would indicate an impossible or erroneous record. So, filtering out values less than zero is essential to ensure data validity.

2  Temperature >= 0 and Temperature <= 100

Reason: Minimum Temperature (0°C): The minimum possible temperature for water or beer is 0°C (freezing point of water). Temperatures below this could indicate data errors, such as negative values that don’t make sense in the context of brewing processes.

Maximum Temperature (100°C): 100°C is the boiling point of water at sea level. While brewing temperatures are typically below 100°C, extreme temperatures above 100°C would also be unusual. Therefore, this range is reasonable to filter out outliers or incorrect readings.
Brewing Temperature: Generally, brewing temperatures range between 60°C to 75°C for most beer styles. While this range is broader (0–100°C) to account for potential outliers, it still removes obviously invalid or extreme data points.

3 pH_Level >= 0 and pH_Level <= 14

Reason: Minimum pH (0): The pH scale ranges from 0 to 14, with 0 being extremely acidic and 14 being extremely alkaline. A pH level below 0 is physically impossible for a solution, so filtering values below 0 is necessary to remove errors.

Maximum pH (14): Similarly, a pH level above 14 is impossible in the context of beer brewing. Typical beer pH values range between 4.0 and 5.5, with some variation depending on the type of beer and fermentation conditions. Filtering values greater than 14 ensures that any erroneous or outlier values above the maximum pH scale are removed.

3 Data Analysis Using Spark SQL

3.1.1) Aggregation: Calculate summary statistics (e.g., mean, median, standard deviation) of the numerical columns.

In [14]:
df_final = df_cleaned
df_final.createOrReplaceTempView("brewing_data")

In [15]:

# Performing a SQL query to select all data from the brewing_data view
cleaned_data_query = "SELECT * FROM brewing_data"
final_df = spark.sql(cleaned_data_query)

# Showing the first few rows of the cleaned data
final_df.show(5)

+--------+----------+----------+----+------------+-----------------+------------------+------------------+------------------+-----------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+----------------------------+
|Batch_ID| Brew_Date|Beer_Style| SKU|    Location|Fermentation_Time|       Temperature|          pH_Level|           Gravity|  Alcohol_Content|Bitterness|Color|Ingredient_Ratio|Volume_Produced|       Total_Sales|    Quality_Score|Brewhouse_Efficiency|Loss_During_Brewing|Loss_During_Fermentation|Loss_During_Bottling_Kegging|
+--------+----------+----------+----+------------+-----------------+------------------+------------------+------------------+-----------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+----------------------------+
| 7870796|2020-01-01|W

In [16]:
# Identifying numeric columns in the cleaned data
numeric_columns = [field.name for field in final_df.schema.fields 
                   if isinstance(field.dataType, (FloatType, DoubleType, IntegerType))]

# Generating SQL queries for summary statistics (mean, median, standard deviation)
summary_query = f"""
SELECT 
    {', '.join([f'AVG({col}) AS avg_{col}' for col in numeric_columns])},
    {', '.join([f'PERCENTILE_APPROX({col}, 0.5) AS median_{col}' for col in numeric_columns])},
    {', '.join([f'STDDEV({col}) AS stddev_{col}' for col in numeric_columns])}
FROM brewing_data
"""

# Executing the SQL query to calculate summary statistics
summary_df = spark.sql(summary_query)

# Showing the summary statistics DataFrame
summary_df.show(truncate=False)

+------------+---------------------+------------------+-----------------+------------------+-------------------+--------------+----------+-------------------+------------------+-----------------+------------------------+-----------------------+----------------------------+--------------------------------+---------------+------------------------+------------------+-----------------+------------------+----------------------+-----------------+------------+----------------------+------------------+--------------------+---------------------------+--------------------------+-------------------------------+-----------------------------------+------------------+------------------------+------------------+-------------------+--------------------+----------------------+------------------+-----------------+----------------------+------------------+--------------------+---------------------------+--------------------------+-------------------------------+-----------------------------------+
|avg_B

3.1.2) Grouping and Filtering: Group data by specific categories and calculate aggregations for each group. 

Highest Total_Sales per Brew_Date and Beer_Style

In [17]:
# SQL query to find the highest Total_Sales per Brew_Date and Beer_Style, ordered by date
aggregation_query = """
SELECT 
    Brew_Date,
    Beer_Style,
    MAX(Total_Sales) AS Highest_Total_Sales
FROM brewing_data
GROUP BY Brew_Date, Beer_Style
ORDER BY Brew_Date DESC, Highest_Total_Sales DESC
LIMIT 10
"""

# Execute the query
result_df = spark.sql(aggregation_query)

# Show the results
result_df.show(truncate=False)

+----------+----------+-------------------+
|Brew_Date |Beer_Style|Highest_Total_Sales|
+----------+----------+-------------------+
|2023-12-31|Sour      |19999.957753273946 |
|2023-12-31|Porter    |19998.713582154644 |
|2023-12-31|Pilsner   |19998.166499284678 |
|2023-12-31|Stout     |19991.554562673293 |
|2023-12-31|Wheat Beer|19988.27588543136  |
|2023-12-31|Lager     |19980.77426642725  |
|2023-12-31|Ale       |19965.82561229111  |
|2023-12-31|IPA       |19956.92365618578  |
|2023-12-30|Ale       |19996.39620222792  |
|2023-12-30|IPA       |19992.75046454753  |
+----------+----------+-------------------+



Highest Quality_Score per Brew_Date and Beer_Style

In [18]:
# SQL query to find the highest Quality_Score per Brew_Date and Beer_Style, ordered by date
aggregation_query = """
SELECT 
    Brew_Date,
    Beer_Style,
    MAX(Quality_Score) AS Highest_Quality_Score
FROM brewing_data
GROUP BY Brew_Date, Beer_Style
ORDER BY Brew_Date DESC, Highest_Quality_Score DESC
LIMIT 10
"""

# Execute the query
result_df = spark.sql(aggregation_query)

# Show the results
result_df.show(truncate=False)

+----------+----------+---------------------+
|Brew_Date |Beer_Style|Highest_Quality_Score|
+----------+----------+---------------------+
|2023-12-31|Sour      |9.999161865781595    |
|2023-12-31|Pilsner   |9.998151783823628    |
|2023-12-31|Ale       |9.9972227757805      |
|2023-12-31|Porter    |9.99697217893943     |
|2023-12-31|IPA       |9.996293456793538    |
|2023-12-31|Wheat Beer|9.994572202670671    |
|2023-12-31|Stout     |9.990243632639132    |
|2023-12-31|Lager     |9.984752221429648    |
|2023-12-30|Ale       |9.999722934959543    |
|2023-12-30|IPA       |9.999592070600983    |
+----------+----------+---------------------+



Highest Volume_Produced per Brew_Date and Beer_Style

In [19]:
# SQL query to find the highest Volume_Produced per Brew_Date and Beer_Style, ordered by date
aggregation_query = """
SELECT 
    Brew_Date,
    Beer_Style,
    MAX(Volume_Produced) AS Highest_Volume_Produced
FROM brewing_data
GROUP BY Brew_Date, Beer_Style
ORDER BY Brew_Date DESC, Highest_Volume_Produced DESC
LIMIT 10
"""

# Execute the query
result_df = spark.sql(aggregation_query)

# Show the results
result_df.show(truncate=False)

+----------+----------+-----------------------+
|Brew_Date |Beer_Style|Highest_Volume_Produced|
+----------+----------+-----------------------+
|2023-12-31|Ale       |4999                   |
|2023-12-31|Pilsner   |4999                   |
|2023-12-31|Stout     |4998                   |
|2023-12-31|IPA       |4996                   |
|2023-12-31|Sour      |4996                   |
|2023-12-31|Lager     |4993                   |
|2023-12-31|Wheat Beer|4990                   |
|2023-12-31|Porter    |4986                   |
|2023-12-30|Wheat Beer|4999                   |
|2023-12-30|Lager     |4997                   |
+----------+----------+-----------------------+



3.1.3) Joins: If applicable, perform a join between two tables

Are not applicable for our dataset.

3.1.4) Time-based analysis: If your dataset contains a timestamp column, analyze trends over time.

SQL query for number of batches and total sales per Brew_Date

In [20]:
# SQL query to find the number of batches and total sales per Brew_Date
time_based_query = """
SELECT 
    Brew_Date,
    COUNT(Batch_ID) AS Number_of_Batches,
    SUM(Total_Sales) AS Total_Sales_Per_Day
FROM brewing_data
GROUP BY Brew_Date
ORDER BY Brew_Date DESC
LIMIT 10
"""

# Execute the query
result_time_based_df = spark.sql(time_based_query)

# Show the results
result_time_based_df.show(truncate=False)


+----------+-----------------+-------------------+
|Brew_Date |Number_of_Batches|Total_Sales_Per_Day|
+----------+-----------------+-------------------+
|2023-12-31|6951             |7.402164022621739E7|
|2023-12-30|6875             |7.277619057643037E7|
|2023-12-29|6801             |7.101619643852998E7|
|2023-12-28|6853             |7.161239863457432E7|
|2023-12-27|6823             |7.105070785238072E7|
|2023-12-26|6778             |7.173696041901506E7|
|2023-12-25|6975             |7.328681953301111E7|
|2023-12-24|6945             |7.398418149452E7   |
|2023-12-23|6809             |7.171651056100419E7|
|2023-12-22|6787             |7.072098217357387E7|
+----------+-----------------+-------------------+



SQL Query for Average Quality Score and Alcohol Content per Brew

In [21]:
# SQL query to find the average Quality Score and Alcohol Content per Brew_Date
average_metrics_query = """
SELECT 
    Brew_Date,
    AVG(Quality_Score) AS Average_Quality_Score,
    AVG(Alcohol_Content) AS Average_Alcohol_Content
FROM brewing_data
GROUP BY Brew_Date
ORDER BY Brew_Date DESC
LIMIT 10
"""

# Execute the query
result_average_metrics_df = spark.sql(average_metrics_query)

# Show the results
result_average_metrics_df.show(truncate=False)


+----------+---------------------+-----------------------+
|Brew_Date |Average_Quality_Score|Average_Alcohol_Content|
+----------+---------------------+-----------------------+
|2023-12-31|7.972432599968671    |5.247159001244369      |
|2023-12-30|7.972838103070529    |5.246944176486663      |
|2023-12-29|8.000243096635971    |5.2470258847132065     |
|2023-12-28|8.01174732752383     |5.2542033652894995     |
|2023-12-27|7.994104801119074    |5.239622772825028      |
|2023-12-26|7.998919681970579    |5.245127095144804      |
|2023-12-25|8.01056465475819     |5.244991263699129      |
|2023-12-24|7.993671636436315    |5.253078226738263      |
|2023-12-23|7.992270025832132    |5.248499193071808      |
|2023-12-22|7.99786223144857     |5.257680459953336      |
+----------+---------------------+-----------------------+



SQL Query for Beer Style Count per Brew Date

In [22]:
# SQL query to count the number of each Beer Style produced per Brew_Date
beer_style_count_query = """
SELECT 
    Brew_Date,
    Beer_Style,
    COUNT(*) AS Beer_Style_Count
FROM brewing_data
GROUP BY Brew_Date, Beer_Style
ORDER BY Brew_Date DESC, Beer_Style_Count DESC
LIMIT 10
"""

# Execute the query
result_beer_style_count_df = spark.sql(beer_style_count_query)

# Show the results
result_beer_style_count_df.show(truncate=False)

+----------+----------+----------------+
|Brew_Date |Beer_Style|Beer_Style_Count|
+----------+----------+----------------+
|2023-12-31|Lager     |892             |
|2023-12-31|Ale       |879             |
|2023-12-31|Wheat Beer|877             |
|2023-12-31|IPA       |876             |
|2023-12-31|Stout     |876             |
|2023-12-31|Pilsner   |864             |
|2023-12-31|Sour      |853             |
|2023-12-31|Porter    |834             |
|2023-12-30|Sour      |905             |
|2023-12-30|Stout     |901             |
+----------+----------+----------------+



Insights:

1 Highest Total Sales
   
Top Performers: On 2023-12-31, Sour beer led in total sales, followed closely by Porter, Pilsner, and Stout.
Beer Style Performance: Most beer styles show high total sales on 2023-12-31, indicating that this was likely a peak day for sales overall.
Sales Trend: The sales on 2023-12-30 were slightly lower, with Ale and IPA performing well, although sales for other styles were still significant.

2 Highest Quality Score

Top Performers: Sour beer had the highest quality score on 2023-12-31, followed by Pilsner and Ale. This suggests that consumers rated these beer styles very highly.
Quality Consistency: Quality scores across all beer styles are consistently high, particularly on 2023-12-30, where the Ale beer style achieved almost a perfect score.

3 Highest Volume Produced

Volume Consistency: Ale and Pilsner beer styles led in volume produced on 2023-12-31, both reaching 4999 units, indicating significant production for these styles.
Production Trends: Stout and IPA also had high production numbers, but the volume produced slightly dropped for some styles like Porter.

4 Number of Batches and Sales per Day

Batches and Sales Volume: 2023-12-31 recorded the highest number of batches (6951) and highest sales per day (around 74 million), suggesting that it was a high-volume production day.
Sales Correlation: The total sales per day shows fluctuations but remains high on 2023-12-31. The data indicates a very active production and sales environment, possibly around the holiday season.

5 Average Quality Score and Alcohol Content

Consistency in Quality Score: The average quality scores remain high across all the days, with a slight dip on 2023-12-31.
Alcohol Content: Average alcohol content remains relatively stable, with 2023-12-31 having an average of about 5.25%. This suggests no major variations in the types of beer produced during this period.

6 Beer Style Count

Lager and Ale Dominance: Lager and Ale were the most produced and sold beers on 2023-12-31, with Lager topping the list at 892 batches.
Beer Style Spread: The number of batches for Sour and Porter were slightly lower compared to others, indicating less production or popularity during this period.

Key Observations:

Sales and Production Peaks: The peak sales figures on 2023-12-31 align with increased production and higher quality scores for various beer styles, especially Sour and Pilsner.

Quality and Volume Correlation: Beer styles with high total sales on 2023-12-31 (like Sour and Pilsner) also had some of the highest quality scores, which indicates a strong positive correlation between quality and sales.

Beer Preferences: Ale and IPA are the more commonly produced beer styles, while Sour and Porter might have smaller but more targeted batches, with high-quality ratings.

4 Machine Learning Model (Regression/Classification)

4.1 Choose the appropriate ML problem based on the dataset:

Justification for Regression Model Selection
I'm selecting a regression model to predict Total Sales because:

Nature of the Target Variable (Total Sales):

Total Sales is a continuous numeric value, making it a natural fit for regression. The goal is to predict a specific value that can take on any numeric value within a range, which is a hallmark of regression problems.

The primary objective is to predict Total Sales based on various influencing factors such as Beer Style, Number of Batches, and Average Quality Score. This prediction can provide valuable insights for forecasting, budget allocation, and sales strategy, making it a key decision-making tool.
Using regression allows us to generate continuous predictions for total sales, which can be leveraged to adjust strategies in real-time based on predicted values.

Predictive Power:

Since Total Sales is directly influenced by multiple continuous features (e.g., Beer Style Count, Average Alcohol Content, Brew_Date, etc.), regression models are well-suited to capture these relationships and provide accurate numerical predictions. This can help in resource planning, inventory management, and understanding demand trends.
Given this, the regression approach is optimal for our dataset, where we want to predict the Total Sales based on a variety of features.

Model: Random Forest Regressor

Random Forest Regressor for the following reasons:

Handles Non-linearity: The relationships between Total Sales and features like Beer Style Count, Average Quality Score, and Number of Batches may be complex and non-linear. Random Forest can capture these complex relationships, unlike linear models.

Robustness: It is less prone to overfitting compared to a single decision tree, as it uses an ensemble of trees. This makes it a more robust choice when dealing with real-world, noisy data.

Feature Importance: Random Forest provides insights into feature importance, which can help identify which features contribute most to predicting Total Sales, aiding in business decisions.

Accuracy: It generally provides better predictive accuracy compared to simpler models like linear regression, especially for more complex problems with multiple interacting features.

In [23]:
print(type(df_cleaned))

<class 'pyspark.sql.dataframe.DataFrame'>


In [24]:
df_cleaned.show(5)

+--------+----------+----------+----+------------+-----------------+------------------+------------------+------------------+-----------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+----------------------------+
|Batch_ID| Brew_Date|Beer_Style| SKU|    Location|Fermentation_Time|       Temperature|          pH_Level|           Gravity|  Alcohol_Content|Bitterness|Color|Ingredient_Ratio|Volume_Produced|       Total_Sales|    Quality_Score|Brewhouse_Efficiency|Loss_During_Brewing|Loss_During_Fermentation|Loss_During_Bottling_Kegging|
+--------+----------+----------+----+------------+-----------------+------------------+------------------+------------------+-----------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+----------------------------+
| 7870796|2020-01-01|W

Feature Engineering

In [25]:
# Extracting time-related features from Brew_Date
df_cleaned = df_cleaned.withColumn('Brew_Year', year(col('Brew_Date')))
df_cleaned = df_cleaned.withColumn('Brew_Month', month(col('Brew_Date')))
df_cleaned = df_cleaned.withColumn('Brew_Day_of_Week', dayofweek(col('Brew_Date')))
df_cleaned = df_cleaned.withColumn('Brew_Day_of_Year', dayofyear(col('Brew_Date')))

In [26]:
# Aggregating data by Beer Style to create new features
beer_style_agg = df_cleaned.groupBy('Beer_Style').agg(
    count('Batch_ID').alias('Number_of_Batches'),
    avg('Quality_Score').alias('Average_Quality_Score'),
    avg('Alcohol_Content').alias('Average_Alcohol_Content'),
    avg('Volume_Produced').alias('Average_Volume_Produced'),
    avg('Total_Sales').alias('Average_Sales')
)

In [27]:
# Joining the aggregated data back to the main dataframe
df_cleaned = df_cleaned.join(beer_style_agg, on='Beer_Style', how='left')

In [28]:
# Select the relevant columns to create the new dataframe
columns_to_keep = [
    'Brew_Year', 'Brew_Month', 'Volume_Produced',
    'Number_of_Batches', 'Total_Sales'
]

In [29]:
# Creating a new DataFrame with the selected columns
selected_df = df_cleaned.select(*columns_to_keep)

In [30]:
# Show the new DataFrame with the added features
selected_df.show(5)

+---------+----------+---------------+-----------------+------------------+
|Brew_Year|Brew_Month|Volume_Produced|Number_of_Batches|       Total_Sales|
+---------+----------+---------------+-----------------+------------------+
|     2020|         1|           3173|          1251002|12050.177463190277|
|     2020|         1|           4449|          1250296|5515.0774647529615|
|     2020|         1|           3752|          1251002| 6278.389850288936|
|     2020|         1|            832|          1250307| 9758.801062471319|
|     2020|         1|           4666|          1249023|2664.7593448382822|
+---------+----------+---------------+-----------------+------------------+
only showing top 5 rows



Feature Engineering 

In [31]:
# Step 1: Group by Brew_Year and calculate total sales per year
df_yearly_sales = selected_df.groupBy("Brew_Year").agg(F.sum("Total_Sales").alias("Yearly_Sales"))

# Step 2: Create a window specification to calculate the previous year's sales
window_spec = Window.orderBy("Brew_Year")

# Step 3: Add a column for the previous year's sales using the lag function
df_yearly_sales = df_yearly_sales.withColumn("Previous_Year_Sales", F.lag("Yearly_Sales", 1).over(window_spec))

# Step 4: Calculate the sales growth rate: (Current Year Sales - Previous Year Sales) / Previous Year Sales
df_yearly_sales = df_yearly_sales.withColumn("Sales_Growth",
                                             (df_yearly_sales["Yearly_Sales"] - df_yearly_sales["Previous_Year_Sales"]) 
                                             / df_yearly_sales["Previous_Year_Sales"])

# Step 5: Handle edge case where there's no previous year (e.g., set Sales_Growth to null for the first year)
df_yearly_sales = df_yearly_sales.withColumn("Sales_Growth",
                                             F.when(F.col("Previous_Year_Sales").isNull(), None)
                                             .otherwise(df_yearly_sales["Sales_Growth"]))

# Format columns to display with fewer decimal places for readability
df_yearly_sales = df_yearly_sales.select(
    "Brew_Year",
    F.format_number("Yearly_Sales", 2).alias("Yearly_Sales"),
    F.format_number("Previous_Year_Sales", 2).alias("Previous_Year_Sales"),
    F.format_number("Sales_Growth", 4).alias("Sales_Growth")
)

# Show the result
df_yearly_sales.show()

+---------+-----------------+-------------------+------------+
|Brew_Year|     Yearly_Sales|Previous_Year_Sales|Sales_Growth|
+---------+-----------------+-------------------+------------+
|     2020|26,307,637,143.99|               NULL|        NULL|
|     2021|26,234,643,027.70|  26,307,637,143.99|     -0.0028|
|     2022|26,215,935,205.80|  26,234,643,027.70|     -0.0007|
|     2023|26,219,638,061.92|  26,215,935,205.80|      0.0001|
+---------+-----------------+-------------------+------------+



The purpose of calculating year-over-year sales growth is to assess how sales have changed over time, which provides valuable insights into the performance and trends of the business. Here's a brief description of why this step is important:

Trend Analysis: Calculating sales growth helps identify whether sales are increasing or decreasing over the years. A positive growth rate indicates improvement, while a negative growth rate suggests a decline. Understanding these trends is crucial for making informed decisions about production, marketing, and strategy.

Business Health: Monitoring the sales growth rate over multiple years helps evaluate the overall health and performance of the business. Consistent growth is a sign of success, while a decline could indicate issues that need to be addressed.

Planning and Forecasting: By knowing how sales are growing or shrinking, businesses can plan more effectively. If there’s a consistent upward trend, businesses may decide to increase production, invest in new markets, or expand operations. Conversely, if sales are declining, it could prompt a reevaluation of business strategies or product offerings.

Comparison Across Years: The lag function allows for comparing sales from one year to the next, helping highlight seasonal or annual changes that may not be apparent in the overall sales data.

In summary, this analysis helps track and understand sales performance over time, which is essential for making strategic business decisions, budgeting, and predicting future performance.

VectorAssembler: Combine all features into a single vector.

In [32]:
from pyspark.ml.feature import VectorAssembler

# Define the feature columns you want to combine into the vector
feature_columns = [
    'Brew_Month', 'Volume_Produced', 'Number_of_Batches'
]

# Initialize VectorAssembler with the feature columns and the output vector column name
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the DataFrame to add the "features" column
assembled_df = assembler.transform(selected_df)

# Show the DataFrame with the new "features" column
assembled_df.select("Brew_Year", "features").show(5)

+---------+--------------------+
|Brew_Year|            features|
+---------+--------------------+
|     2020|[1.0,1475.0,12507...|
|     2020|[1.0,1459.0,12507...|
|     2020|[1.0,3341.0,12507...|
|     2020|[1.0,4642.0,12507...|
|     2020|[1.0,3073.0,12507...|
+---------+--------------------+
only showing top 5 rows



The VectorAssembler is used in machine learning workflows to combine multiple individual feature columns into a single vector column. This is necessary because most machine learning algorithms in Spark (and other ML frameworks) expect input data in a specific format—typically as a single column of vectors, where each vector represents a row of feature values.

Reasons for using VectorAssembler:
Required Input Format: Machine learning algorithms in Spark expect a single column containing a vector for the features, rather than multiple columns. The VectorAssembler consolidates all the feature columns (e.g., numerical, categorical) into a single vector column.

Simplicity and Efficiency: It simplifies data manipulation, especially when working with many features. Rather than manually combining each feature, VectorAssembler automatically handles this process, saving time and reducing errors.

Facilitates Model Training: Once the features are assembled into a vector, they can be directly fed into Spark’s machine learning models like regression, classification, and clustering. Without this step, these models would not be able to accept the input data.

In essence, the VectorAssembler prepares the data for use in machine learning algorithms by transforming it into the required vector format, making it a crucial step in most Spark ML workflows.

Model evaluation: Evaluate the model using metrics such as RMSE for regression or
accuracy/F1-score for classification

In [33]:
# Feature Scaling: Scale the "features" column and output to "scaled_features"
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaled_df = scaler.fit(assembled_df).transform(assembled_df)

# Retain 'Brew_Year' and 'Total_Sales' columns in scaled_df for reference in predictions
scaled_df = scaled_df.select("Brew_Year", "Total_Sales", "scaled_features")

# Show the DataFrame with the "scaled_features" column for verification
scaled_df.show(5, truncate=False)

# Split the data into training and test sets
train_data, test_data = scaled_df.randomSplit([0.8, 0.2], seed=42)

# Initialize and train the RandomForest model
rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="Total_Sales")
rf_model = rf.fit(train_data)

# Make predictions on the test data
predictions = rf_model.transform(test_data)

# Show some of the predictions with 'Brew_Year' included
predictions.select("Brew_Year", "prediction", "Total_Sales").show(5)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="Total_Sales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

# Optionally, evaluate the model using R2
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print(f"R2 on test data: {r2}")

+---------+------------------+------------------------------------------------------------+
|Brew_Year|Total_Sales       |scaled_features                                             |
+---------+------------------+------------------------------------------------------------+
|2020     |17473.8825235854  |[-1.6011815790750519,-0.9807192706752805,1.1834451215253379]|
|2020     |6824.2626567713705|[-1.6011815790750519,-0.9930356969089057,1.1834451215253379]|
|2020     |15380.29733887055 |[-1.6011815790750519,0.4556839388212486,1.1834451215253379] |
|2020     |2880.503193567716 |[-1.6011815790750519,1.4571633469428908,1.1834451215253379] |
|2020     |17806.932180755524|[-1.6011815790750519,0.2493837994080279,1.1834451215253379] |
+---------+------------------+------------------------------------------------------------+
only showing top 5 rows

+---------+------------------+------------------+
|Brew_Year|        prediction|       Total_Sales|
+---------+------------------+-----------------

Cross-validation and hyperparameter tuning to optimize the model

In [34]:
# Define the RandomForestRegressor
rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="Total_Sales")

In [35]:
# Create a parameter grid for tuning
param_grid = (ParamGridBuilder()
              .addGrid(rf.numTrees, [50, 100, 200])  # Number of trees
              .addGrid(rf.maxDepth, [5, 10, 15])    # Maximum depth of trees
              .addGrid(rf.minInstancesPerNode, [1, 5, 10])  # Minimum number of instances per node
              .build())

In [36]:
# Set up the evaluator
evaluator = RegressionEvaluator(labelCol="Total_Sales", predictionCol="prediction", metricName="rmse")

# Set up the CrossValidator
crossval = CrossValidator(estimator=rf, 
                          estimatorParamMaps=param_grid, 
                          evaluator=evaluator, 
                          numFolds=5,  # 5-fold cross-validation
                          seed=42)

In [37]:
# Perform cross-validation and get the best model
cv_model = crossval.fit(train_data)

In [38]:
# Get the best model from cross-validation
best_rf_model = cv_model.bestModel

In [39]:
# Make predictions using the best model
predictions = best_rf_model.transform(test_data)

In [40]:
# Show some of the predictions with 'Brew_Year' included
predictions.select("Brew_Year", "prediction", "Total_Sales").show(5)

+---------+------------------+------------------+
|Brew_Year|        prediction|       Total_Sales|
+---------+------------------+------------------+
|     2020|10501.475343170161|1000.0134352860266|
|     2020|10502.991538466043|1000.0383832416472|
|     2020|10501.262749325395|1000.0549495561056|
|     2020|10497.836105780634|1000.1234760987548|
|     2020|10516.656545008076|1000.2090171673718|
+---------+------------------+------------------+
only showing top 5 rows



In [41]:
# Evaluate the model using RMSE
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

Root Mean Squared Error (RMSE) on test data: 5485.390862021014


In [42]:
# Optionally, evaluate the model using R2
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print(f"R2 on test data: {r2}")

R2 on test data: -5.4867644938561e-07


The R² value close to zero (and slightly negative) suggests that the model explains almost none of the variance in Total_Sales.


An R² near zero can occur when the features used in the model don’t have a strong relationship with the target variable (Total_Sales). This could imply that your features (like Brew_Year, Volume_Produced, Number_of_Batches, etc.) aren’t sufficient to predict Total_Sales accurately.

More exploration and improvements will be implemented in order to obtain a better value.