# Data Analysis (Using the Pyspark Library)

- **The notebook with this analysis done in pandas can be found here: https://github.com/AVC-prog/Data_Science_and_Analysis_with_Python_and_SQL/blob/main/Project%206%3A%20Online%20Retail%20Analysis/Project%206%20-%20Online%20Retail%20Analysis.ipynb**

# Prefactory Remarks

In [None]:
# You don't have to do this, it's just safer.

# Install virtualenv (virtual environment):

# !pip install virtualenv

# Create a virtual environment named "myenv":

# !python -m venv myenv

# Activate the virtual environment:

# myenv\Scripts\activate (Windows)
# source myenv/bin/activate (macOS/Linux)

# Upgrade pip and install essential data science libraries inside the virtual environment:

# !myenv/bin/python -m pip install --upgrade pip  
# !myenv/bin/python -m pip install numpy pandas matplotlib seaborn scikit-learn scipy statsmodels jupyterlab plotly openpyxl xlrd tensorflow keras torch torchvision pyspark ipykernel

# Add the virtual environment as a Jupyter kernel:

# !myenv/bin/python -m ipykernel install --user --name=myenv --display-name "Python (myenv)"

# Deactivate the virtual environment (Run this in the terminal):

# deactivate

- [x] **Necessary packages to use**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when
from pyspark.sql.types import IntegerType, StringType, BooleanType
from pyspark.ml.feature import VectorAssembler, StringIndexer, PCA
from pyspark.ml.classification import (DecisionTreeClassifier,RandomForestClassifier,
    MultilayerPerceptronClassifier,GBTClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import TrainValidationSplit, CrossValidator, ParamGridBuilder
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import seaborn as sns


## 1. Visualize the data

- [x] **Start the PySpark session and view the data.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *  
from pyspark.sql.types import *  

spark = SparkSession.builder.appName("Test_OR").getOrCreate()


df = spark.read.csv("/kaggle/input/online-retail/online_retail.csv", header=True, inferSchema=True)

df.show(10)

## 2. Clean the Data

- [x] **Check for NaN values. Use imputation to fill in the slots for those values.**

In [None]:
# Let's put them in a dictionary, just like we've done in the pandas notebook

not_numbers = {}

for column in df.columns:
    not_numbers[column] = df.filter(df[column].isNull()).count()

print(not_numbers)

- [x] **Check for missing values (empty slots). Use imputation to fill in the slots for those values.**

In [None]:
# Again, let's print the values out in a dictionary, just like in the pandas notebook

empty_values = {}

for column in df.columns:
    empty_values[column] = df.filter(df[column] == "").count()

print(empty_values)

- [x] **Check for erronious values if there are any.**

In [None]:
# typically negative quantity represents returns and negative prices represent devolutions, but we'll ignore it here.

df.groupBy("Country").count().show()
df.groupBy("Price").count().show()
df.groupBy("Quantity").count().show()

df = df.withColumn("Quantity", abs(col("Quantity")))   

df.groupBy("Quantity").count().show()

df = df.withColumn("Price", abs(col("Price")))

df.groupBy("Price").count().show()

- [x] **Fix the invoice date column by separating it into date and time.**

In [None]:
df = df.withColumn('Invoice_Date', to_timestamp(col('InvoiceDate'), 'MM/dd/yyyy HH:mm'))

df = df.withColumn('Date', date_format(col('Invoice_Date'), 'yyyy-MM-dd')) 
df = df.withColumn('Time', date_format(col('Invoice_Date'), 'HH:mm:ss'))  

df = df.withColumn('Date', to_date(col('Date'), 'yyyy-MM-dd'))

df.printSchema()

- [x] **Save the clean file.**

In [None]:
output_path = "/kaggle/working/cleaned_onine_retail.csv"
df.write.option("header", "true").csv(output_path)

## 3. String Manipulation

- [x] **Choose stockcodes that only have 5 digits and the letter "A" after.**

In [None]:
stock_df_vA = df

stock_df_vA = stock_df_vA.filter(col("StockCode").rlike("^\d{5}A$"))

stock_df_vA.show()

- [] **Now, choose 5 digits followed by the letter "B" in the stockcode.**

In [None]:
stock_df_vB = df

stock_df_vB = stock_df_vB.filter(col("StockCode").rlike("^\d{5}B$"))

stock_df_vB.show()

## 4. Exploratory Data Analysis (EDA) and Visualizations

- **The visualizations have already been performed in the pandas version of this dataset, so I'll refrain from repeating them.**

- [] **Sales trends over time.**

In [None]:
import matplotlib.animation as animation

df = df.withColumn("InvoiceTimestamp", to_timestamp("InvoiceDate", "MM/dd/yyyy H:mm"))

df = df.withColumn("Year", year("InvoiceTimestamp")) \
       .withColumn("Month", month("InvoiceTimestamp")) \
       .withColumn("Weekday", dayofweek("InvoiceTimestamp")) \
       .withColumn("Hour", hour("InvoiceTimestamp"))

df = df.withColumn("Revenue", col("Quantity") * col("Price"))

monthly_sales = df.groupBy("Year", "Month").agg(_sum("Revenue").alias("TotalRevenue")).orderBy("Year", "Month")
monthly_sales.show()

monthly_sales_pd = monthly_sales.toPandas()
monthly_sales_pd["Date"] = pd.to_datetime(dict(year=monthly_sales_pd.Year, month=monthly_sales_pd.Month, day=1))


monthly_sales_pd = monthly_sales_pd.sort_values("Date").reset_index(drop=True)

fig, ax = plt.subplots(figsize=(12,6))
line, = ax.plot([], [], lw=2)
ax.set_xlim(monthly_sales_pd["Date"].min(), monthly_sales_pd["Date"].max())
ax.set_ylim(0, monthly_sales_pd["TotalRevenue"].max() * 1.1)
ax.set_title("Animated Monthly Sales Revenue")
ax.set_xlabel("Date")
ax.set_ylabel("Revenue")

def init():
    line.set_data([], [])
    return line,

def animate(i):
    x = monthly_sales_pd["Date"][:i+1]
    y = monthly_sales_pd["TotalRevenue"][:i+1]
    line.set_data(x, y)
    return line,

ani = animation.FuncAnimation(fig, animate, init_func=init, frames=len(monthly_sales_pd), interval=300, blit=True)

from IPython.display import HTML
HTML(ani.to_jshtml())

# To save:
# ani.save("monthly_sales_animation.mp4", writer='ffmpeg')




- [] **Perform a country level analysis**

In [None]:
country_sales = df.groupBy("Country").agg(
    _sum("Revenue").alias("TotalRevenue"),
    countDistinct("Customer ID").alias("NumCustomers"),
    countDistinct("Invoice").alias("NumOrders")
).orderBy(col("TotalRevenue").desc())

country_sales.show(10)
country_sales_pd = country_sales.toPandas()

top_countries = country_sales_pd.head(10)
plt.figure(figsize=(10,6))
plt.barh(top_countries['Country'], top_countries['TotalRevenue'])
plt.title("Top Countries by Revenue")
plt.gca().invert_yaxis()
plt.show()


- [x] **Perform a basket analysis, as in discover product bundling opportunities for Common product pairs or groups and potential cross-sell/upsell opportunities.**

In [None]:
basket_df = df.select("Invoice", "StockCode").dropna().dropDuplicates()

basket_items = basket_df.groupBy("Invoice").agg(collect_set("StockCode").alias("Items"))
basket_items.show(5, truncate=False)

basket_items_pd = basket_items.toPandas()

from mlxtend.preprocessing import TransactionEncoder
te = TransactionEncoder()
te_ary = te.fit(basket_items_pd["Items"]).transform(basket_items_pd["Items"])
basket_encoded = pd.DataFrame(te_ary, columns=te.columns_)

from mlxtend.frequent_patterns import apriori, association_rules
frequent_itemsets = apriori(basket_encoded, min_support=0.01, use_colnames=True)
rules = association_rules(frequent_itemsets, metric="lift", min_threshold=1)

rules.sort_values(by="lift", ascending=False).head(10)


# Data Science (Using the PySpark Library)

## 5. Inferential Statistics

- [] **Store the outliers of the data in the version A and B stock dataframes by using the typical interquartile range.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("OutliersDetection").getOrCreate()


Q1_A, Q3_A = stock_df_vA.approxQuantile("Price", [0.25, 0.75], 0.01)
IQR_A = Q3_A - Q1_A

out_A_df = stock_df_vA.filter((col("Price") < (Q1_A - 1.5 * IQR_A)) | (col("Price") > (Q3_A + 1.5 * IQR_A)))

Q1_B, Q3_B = stock_df_vB.approxQuantile("Price", [0.25, 0.75], 0.01)
IQR_B = Q3_B - Q1_B

out_B_df = stock_df_vB.filter((col("Price") < (Q1_B - 1.5 * IQR_B)) | (col("Price") > (Q3_B + 1.5 * IQR_B)))

out_A_df.describe().show()
out_B_df.describe().show()


- [] **Use a $\chi^2$ test to check if Country and Revenue Class (high/low revenue) are related.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
import pandas as pd
from scipy.stats import chi2_contingency

df = df.withColumn("Revenue", col("Quantity") * col("Price"))

invoice_revenue_df = df.groupBy("Invoice", "Country").agg(
    _sum("Revenue").alias("TotalInvoiceRevenue")
)

invoice_pd = invoice_revenue_df.toPandas()
median_revenue = invoice_pd["TotalInvoiceRevenue"].median()

invoice_pd["RevenueClass"] = invoice_pd["TotalInvoiceRevenue"].apply(
    lambda x: "High" if x >= median_revenue else "Low"
)

contingency_table = pd.crosstab(invoice_pd["Country"], invoice_pd["RevenueClass"])
print("Contingency Table (Top 5 rows):")
print(contingency_table.head())

chi2, p, dof, expected = chi2_contingency(contingency_table)

print(f"\nChi-squared statistic: {chi2:.2f}")
print(f"Degrees of freedom: {dof}")
print(f"P-value: {p:.4f}")

alpha = 0.05
if p < alpha:
    print("Statistically significant relationship between Country and Revenue Class.")
else:
    print("No statistically significant relationship between Country and Revenue Class.")


- [] **Use the Pearson/Spearman test to check if the quantity and price columns are related.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd
from scipy.stats import pearsonr, spearmanr


filtered_df = df.filter(
    (col("Quantity").isNotNull()) & 
    (col("Price").isNotNull()) &
    (col("Quantity") > 0) &
    (col("Price") > 0)
)

filtered_pd = filtered_df.select("Quantity", "Price").toPandas()

pearson_corr, pearson_p = pearsonr(filtered_pd["Quantity"], filtered_pd["Price"])

spearman_corr, spearman_p = spearmanr(filtered_pd["Quantity"], filtered_pd["Price"])

print("Pearson Correlation:")
print(f"  Correlation Coefficient: {pearson_corr:.4f}")
print(f"  P-value: {pearson_p:.4f}")
print("  → Linear relationship" if pearson_p < 0.05 else "  → Not statistically significant")

print("Spearman Correlation:")
print(f"  Correlation Coefficient: {spearman_corr:.4f}")
print(f"  P-value: {spearman_p:.4f}")
print("  → Monotonic relationship" if spearman_p < 0.05 else "  → Not statistically significant")


- [] **Use a t-test to compare the average price of purchases between two specific countries.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd
from scipy.stats import ttest_ind


country_a = "United Kingdom"
country_b = "Germany"

filtered_df = df.filter(
    (col("Country").isin([country_a, country_b])) &
    (col("Price").isNotNull()) &
    (col("Price") > 0)
)

price_pd = filtered_df.select("Country", "Price").toPandas()

group_a = price_pd[price_pd["Country"] == country_a]["Price"]
group_b = price_pd[price_pd["Country"] == country_b]["Price"]

t_stat, p_value = ttest_ind(group_a, group_b, equal_var=False)

print(f"🎯 Comparing average prices between '{country_a}' and '{country_b}'\n")
print(f"T-statistic: {t_stat:.4f}")
print(f"P-value: {p_value:.4f}")

alpha = 0.05
if p_value < alpha:
    print("Result: Statistically significant difference in average prices between the two countries.")
else:
    print("Result: No statistically significant difference in average prices between the two countries.")


- [] **Use an ANOVA test to check whether average quantity differs across multiple countries.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd
from scipy.stats import f_oneway

spark = SparkSession.builder.appName("CountryQuantityANOVA").getOrCreate()

df = spark.read.csv("your_file.csv", header=True, inferSchema=True)

filtered_df = df.filter(
    (col("Country").isNotNull()) &
    (col("Quantity").isNotNull()) &
    (col("Quantity") > 0)
)

top_countries = (filtered_df.groupBy("Country").count()
    .orderBy("count", ascending=False)
    .limit(5).toPandas()["Country"].tolist())

filtered_df = filtered_df.filter(col("Country").isin(top_countries))

quantity_pd = filtered_df.select("Country", "Quantity").toPandas()

groups = [quantity_pd[quantity_pd["Country"] == country]["Quantity"] for country in top_countries]

f_stat, p_value = f_oneway(*groups)

print("ANOVA: Does mean quantity differ by country?\n")
print(f"F-statistic: {f_stat:.4f}")
print(f"P-value: {p_value:.4f}")

alpha = 0.05
if p_value < alpha:
    print("There is a statistically significant difference in quantity between at least two countries.")
else:
    print("No statistically significant difference in quantity across countries.")


## 6. Linear Regression 

- [] ****

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col
import numpy as np

df = spark.read.csv("path_to_file.csv", header=True, inferSchema=True)

df = df.select("Quantity", "Price")

assembler = VectorAssembler(inputCols=["Quantity"], outputCol="features")
df = assembler.transform(df)

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="features", labelCol="Price")

lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Price", metricName="r2")
r2 = evaluator.evaluate(predictions)

evaluator.setMetricName("mae")
mae = evaluator.evaluate(predictions)

evaluator.setMetricName("mse")
mse = evaluator.evaluate(predictions)

rmse = np.sqrt(mse)

print("R2 Score:", r2)
print("MAE:", mae)
print("MSE:", mse)
print("RMSE:", rmse)


## 7. Logistic Regression

- [] ****

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("OnlineRetailData").getOrCreate()

df = spark.read.csv("path_to_file.csv", header=True, inferSchema=True)

df = df.withColumn("Revenue Class", (col("Price") * col("Quantity") > 100).cast("int"))  # Example rule for classification

indexer = StringIndexer(inputCol="Country", outputCol="Country_encoded")
df = indexer.fit(df).transform(df)

assembler = VectorAssembler(inputCols=["Country_encoded", "Price", "Quantity"], outputCol="features")
df = assembler.transform(df)

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
lr = LogisticRegression(featuresCol="features", labelCol="Revenue Class")
lr_model = lr.fit(train_data)

predictions = lr_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="Revenue Class", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

predictions.select("Revenue Class", "prediction").show(10)

print(f"Accuracy: {accuracy}")

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="Revenue Class", metricName="f1")
f1 = evaluator.evaluate(predictions)
print(f"F1 Score: {f1}")


## 8. KMeans Clustering

- [] ****

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

spark = SparkSession.builder.appName("KMeansClustering").getOrCreate()

combined_outliers = out_A_df.union(out_B_df)

df = combined_outliers.select("Price", "Quantity")

assembler = VectorAssembler(inputCols=["Price", "Quantity"], outputCol="features")
df = assembler.transform(df)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
df = scaler.fit(df).transform(df)

kmeans = KMeans(k=6, featuresCol="scaled_features", predictionCol="k_means")
model = kmeans.fit(df)
predictions = model.transform(df)

predictions_pd = predictions.select("Price", "Quantity", "k_means").toPandas()

inertia = []
for k in range(1, 11):
    kmeans = KMeans(k=k, featuresCol="scaled_features", predictionCol="k_means")
    model = kmeans.fit(df)
    predictions = model.transform(df)
    inertia.append(predictions.select("k_means").count())  

plt.figure(figsize=(8, 6))
plt.plot(range(1, 11), inertia, marker='o')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Inertia')
plt.title('Elbow Method to Find Optimal k')
plt.show()

plt.figure(figsize=(8, 6))
sns.scatterplot(data=predictions_pd, x='Price', y='Quantity', hue='k_means', palette='viridis')
plt.xlabel('Price')
plt.ylabel('Quantity')
plt.title('K-Means Clustering of Outliers')
plt.show()


- [] **Use a silhouette score to see if the clusters are, in fact, well separated enough.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import seaborn as sns


evaluator = ClusteringEvaluator(predictionCol="k_means", featuresCol="scaled_features", metricName="silhouette")
silhouette_avg = evaluator.evaluate(predictions)

print(f"Silhouette Score (k=6): {silhouette_avg}")

predictions_pd = predictions.select("Price", "Quantity", "k_means").toPandas()

plt.figure(figsize=(8, 6))
sns.scatterplot(data=predictions_pd, x='Price', y='Quantity', hue='k_means', palette='viridis')
plt.xlabel('Price')
plt.ylabel('Quantity')
plt.title('K-Means Clustering of Outliers')
plt.show()


# SQL Queries (Using the PySpark Library)

- It's more useful to do the SQL queries here using PySpark, since MySQL takes a long time to import over half a million rows of data.

- [x] **Create a SQL query that retrives the top 10 Products by Total Revenue**

In [None]:

df.createOrReplaceTempView("retail_data")

top_products_query = """
SELECT
    Description,
    StockCode,
    SUM(Quantity * Price) AS TotalRevenue,
    SUM(Quantity) AS TotalUnitsSold
FROM retail_data
WHERE Quantity > 0 AND Price > 0
GROUP BY Description, StockCode
ORDER BY TotalRevenue DESC
LIMIT 10
"""

top_products_df = spark.sql(top_products_query)
top_products_df.show(truncate=False)


- [] **Create a SQL query that retrieves all of the version A products. (StockCode has 5 digits and the letter "A").**

In [None]:


df.createOrReplaceTempView("retail_data")

version_a_query = """
SELECT *
FROM retail_data
WHERE StockCode RLIKE '^[0-9]{5}A$'
"""

version_a_df = spark.sql(version_a_query)

version_a_df.show(10) 

print(f"Version A product count: {version_a_df.count()}")


- [] **Create a SQL query that retrieves all of the version B products. (StockCode has 5 digits and the letter "B").**

In [None]:
df.createOrReplaceTempView("retail_data")

version_b_query = """
SELECT *
FROM retail_data
WHERE StockCode RLIKE '^[0-9]{5}B$'
"""

version_b_df = spark.sql(version_b_query)

version_b_df.show(10) 

print(f"Version B product count: {version_b_df.count()}")


- [x] **Don't forget to stop the spark session.**

In [None]:
# spark.stop()