## **AIM:**
To achieve the tasks using Spark SQL and integrate with machine learning algorithms.
* Identify the country with the highest purchase.
* Count the number of unique customers from each country.
* Calculate the maximum quantity purchased by each customer.
* Compute the maximum and minimum sales within a specified date range (01.10.24 to 15.10.24).
* Calculate the total sales across all countries.

Machine Learning Integration:

* Use ML algorithms to predict future sales or customer purchasing behavior based on historical data.
* Build a regression model (e.g., linear regression) to forecast total sales.


**1. Spark SQL for Analysis**

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, max, min, countDistinct, to_date


In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("E-commerce Analysis with Spark SQL") \
    .getOrCreate()

In [None]:
# Load the dataset into a pandas DataFrame
file_path = "/content/Online Retail.xlsx"
pdf = pd.read_excel(file_path) # Read the Excel file into a pandas DataFrame 'pdf'

In [None]:
# Create a Spark DataFrame from the pandas DataFrame
data = spark.createDataFrame(pdf)


In [None]:
# Preprocessing: Add TotalSales column and convert InvoiceDate to DateType
data = data.withColumn("TotalSales", col("Quantity") * col("UnitPrice")) \
           .withColumn("InvoiceDate", to_date(col("InvoiceDate"), "MM/dd/yyyy"))

data.createOrReplaceTempView("sales_data")


In [None]:
# 1. Country with the highest purchase
highest_purchase = spark.sql("""
    SELECT Country, SUM(TotalSales) AS TotalPurchase
    FROM sales_data
    GROUP BY Country
    ORDER BY TotalPurchase DESC
    LIMIT 1
""")
highest_purchase.show()

+--------------+-----------------+
|       Country|    TotalPurchase|
+--------------+-----------------+
|United Kingdom|8187806.363998706|
+--------------+-----------------+



In [None]:
# 2. Number of customers from each country
customers_per_country = spark.sql("""
    SELECT Country, COUNT(DISTINCT CustomerID) AS TotalCustomers
    FROM sales_data
    GROUP BY Country
    ORDER BY TotalCustomers DESC
""")
customers_per_country.show()

+---------------+--------------+
|        Country|TotalCustomers|
+---------------+--------------+
| United Kingdom|          3951|
|        Germany|            95|
|         France|            88|
|          Spain|            31|
|        Belgium|            25|
|    Switzerland|            22|
|       Portugal|            20|
|          Italy|            15|
|        Finland|            12|
|        Austria|            11|
|         Norway|            10|
|        Denmark|             9|
|Channel Islands|             9|
|      Australia|             9|
|    Netherlands|             9|
|         Sweden|             8|
|         Cyprus|             8|
|          Japan|             8|
|         Poland|             6|
|    Unspecified|             5|
+---------------+--------------+
only showing top 20 rows



In [None]:
# 3. Maximum quantity purchased by each customer
max_quantity_per_customer = spark.sql("""
    SELECT CustomerID, MAX(Quantity) AS MaxQuantity
    FROM sales_data
    GROUP BY CustomerID
    ORDER BY MaxQuantity DESC
""")
max_quantity_per_customer.show()

+----------+-----------+
|CustomerID|MaxQuantity|
+----------+-----------+
|   16446.0|      80995|
|   12346.0|      74215|
|   13256.0|      12540|
|       NaN|       5568|
|   12901.0|       4800|
|   13135.0|       4300|
|   18087.0|       3906|
|   14609.0|       3186|
|   15749.0|       3114|
|   16308.0|       3000|
|   12931.0|       2880|
|   16754.0|       2880|
|   16333.0|       2592|
|   16029.0|       2400|
|   14646.0|       2400|
|   14101.0|       2160|
|   12798.0|       2040|
|   17949.0|       1992|
|   17450.0|       1944|
|   15299.0|       1824|
+----------+-----------+
only showing top 20 rows



In [None]:
# 4. Max and Min sales between 01.10.24 and 15.10.24
date_filtered_sales = spark.sql("""
    SELECT
        MAX(TotalSales) AS MaxSale,
        MIN(TotalSales) AS MinSale,
        SUM(TotalSales) AS TotalSale
    FROM sales_data
    WHERE InvoiceDate BETWEEN '2024-10-01' AND '2024-10-15'
""")
date_filtered_sales.show()

+-------+-------+---------+
|MaxSale|MinSale|TotalSale|
+-------+-------+---------+
|   NULL|   NULL|     NULL|
+-------+-------+---------+



**2. Machine Learning with MLlib**
Build a linear regression model to predict future sales.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
# Feature Engineering: Prepare data for ML
ml_data = data.select("Quantity", "UnitPrice", "TotalSales")
assembler = VectorAssembler(inputCols=["Quantity", "UnitPrice"], outputCol="features")
ml_data = assembler.transform(ml_data).select("features", "TotalSales")


In [None]:

# Split data into training and test sets
train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)


In [None]:
# Train Linear Regression Model
lr = LinearRegression(featuresCol="features", labelCol="TotalSales")
lr_model = lr.fit(train_data)

In [None]:
# Evaluate Model
predictions = lr_model.transform(test_data)
predictions.select("features", "TotalSales", "prediction").show()

+--------------+-------------------+-------------------+
|      features|         TotalSales|         prediction|
+--------------+-------------------+-------------------+
| [-9600.0,0.0]|               -0.0|-13222.143988828493|
| [-3114.0,2.1]| -6539.400000000001|-4284.8792177248215|
| [-3000.0,0.0]|               -0.0| -4126.463404989572|
|[-2000.0,1.85]|            -3700.0| -2749.482799816703|
| [-1440.0,0.0]|               -0.0|-1976.5752669912815|
| [-1440.0,0.0]|               -0.0|-1976.5752669912815|
|[-1300.0,2.55]|-3314.9999999999995|-1785.2256054446457|
| [-1092.0,0.0]|               -0.0|-1496.9848362070475|
|  [-690.0,0.0]|               -0.0| -942.9752006459496|
|  [-675.0,0.0]|               -0.0|  -922.303199319043|
| [-670.0,6.75]|            -4522.5|  -919.618755016084|
| [-624.0,2.55]|-1591.1999999999998|  -853.607412312053|
|  [-618.0,0.0]|               -0.0| -843.7495942767978|
|  [-590.0,0.0]|               -0.0| -805.1618584665721|
|  [-472.0,0.0]|               

In [None]:
# Print Model Metrics
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")
print(f"RMSE: {lr_model.summary.rootMeanSquaredError}")
print(f"R2 Score: {lr_model.summary.r2}")

Coefficients: [1.378133421793776,-0.6231441194088747]
Intercept: 7.936860391755732
RMSE: 159.25807731171892
R2 Score: 0.773210198440154


### **Output**

Spark SQL Results:

* Country with the Highest Purchase: Displays the country with the highest TotalSales.
* Number of Customers per Country: Displays a breakdown of unique customers by country.
* Maximum Quantity Purchased by Customers: Lists the maximum quantity purchased by each customer.
* Sales Metrics within Date Range: Displays max, min, and total sales for the specified date range.

Machine Learning Results:

* Predicts future sales based on Quantity and UnitPrice.
* Prints metrics such as Root Mean Squared Error (RMSE) and R² score to evaluate the model's performance.