In [18]:
!pip install pyspark



In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, max, min
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [20]:
# Initialize a Spark session
spark = SparkSession.builder.appName("SalesAnalytics").getOrCreate()

In [21]:
# Load the CSV dataset into a PySpark DataFrame
df = spark.read.csv("sales_data.csv", header=True, inferSchema=True)

In [22]:
# Show the first few rows of the DataFrame
df.show()

+-------+---------+-----------+--------+-----+----------+
|OrderID|ProductID|ProductName|Quantity|Price|      Date|
+-------+---------+-----------+--------+-----+----------+
|   1001|    P1001|  Product A|       5| 25.0|2023-10-01|
|   1002|    P1002|  Product B|       3| 12.5|2023-10-02|
|   1003|    P1001|  Product A|       2| 25.0|2023-10-03|
|   1004|    P1003|  Product C|       4| 10.0|2023-10-04|
|   1005|    P1002|  Product B|       1| 12.5|2023-10-05|
|   1006|    P1001|  Product A|       5| 25.0|2023-10-06|
|   1007|    P1003|  Product C|       2| 10.0|2023-10-07|
|   1008|    P1004|  Product D|       3| 30.0|2023-10-08|
|   1009|    P1002|  Product B|       4| 12.5|2023-10-09|
|   1010|    P1001|  Product A|       2| 25.0|2023-10-10|
|   1011|    P1003|  Product C|       5| 10.0|2023-10-11|
|   1012|    P1004|  Product D|       2| 30.0|2023-10-12|
|   1013|    P1002|  Product B|       6| 12.5|2023-10-13|
|   1014|    P1001|  Product A|       3| 25.0|2023-10-14|
|   1015|    P

In [23]:
# Calculate the total revenue
df = df.withColumn("TotalPrice", df["Quantity"] * df["Price"])
total_revenue = df.selectExpr("sum(TotalPrice) as TotalRevenue").collect()[0]["TotalRevenue"]
print(f"Total Revenue: ${total_revenue:.2f}")

Total Revenue: $2685.00


In [24]:
# Calculate the average price
average_price = df.select(avg("Price")).collect()[0][0]
print(f"Average Price: ${average_price:.2f}")

Average Price: $19.11


In [25]:
# Calculate the number of products sold for each product
product_sales = df.groupBy("ProductID", "ProductName").agg(sum("Quantity").alias("TotalQuantity"))
product_sales.show()

+---------+-----------+-------------+
|ProductID|ProductName|TotalQuantity|
+---------+-----------+-------------+
|    P1004|  Product D|           26|
|    P1003|  Product C|           38|
|    P1002|  Product B|           42|
|    P1001|  Product A|           40|
+---------+-----------+-------------+



In [26]:
# Calculate the total sales for each product
total_sales = df.groupBy("ProductID", "ProductName").agg(sum("TotalPrice").alias("TotalSales"))
total_sales.show()

# Calculate the average quantity sold for each product
avg_quantity = df.groupBy("ProductID", "ProductName").agg(avg("Quantity").alias("AvgQuantity"))
avg_quantity.show()

# Calculate the maximum and minimum prices for each product
max_min_price = df.groupBy("ProductID", "ProductName").agg(max("Price").alias("MaxPrice"), min("Price").alias("MinPrice"))
max_min_price.show()

+---------+-----------+----------+
|ProductID|ProductName|TotalSales|
+---------+-----------+----------+
|    P1004|  Product D|     780.0|
|    P1003|  Product C|     380.0|
|    P1002|  Product B|     525.0|
|    P1001|  Product A|    1000.0|
+---------+-----------+----------+

+---------+-----------+------------------+
|ProductID|ProductName|       AvgQuantity|
+---------+-----------+------------------+
|    P1004|  Product D|               2.6|
|    P1003|  Product C|3.4545454545454546|
|    P1002|  Product B|               3.5|
|    P1001|  Product A|3.3333333333333335|
+---------+-----------+------------------+

+---------+-----------+--------+--------+
|ProductID|ProductName|MaxPrice|MinPrice|
+---------+-----------+--------+--------+
|    P1004|  Product D|    30.0|    30.0|
|    P1003|  Product C|    10.0|    10.0|
|    P1002|  Product B|    12.5|    12.5|
|    P1001|  Product A|    25.0|    25.0|
+---------+-----------+--------+--------+



PREDICTION USING LR

In [27]:
# Create a feature vector by combining "Quantity" and "Price"
assembler = VectorAssembler(inputCols=["Quantity", "Price"], outputCol="features")
df = assembler.transform(df)

# Split the dataset into training and testing sets
train_data, test_data = df.randomSplit([0.7, 0.3])

# Initialize and fit the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="TotalPrice")
lr_model = lr.fit(train_data)

# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Show the predictions and compare them to the actual values
predictions.select("Quantity", "Price", "TotalPrice", "prediction").show()

# Evaluate the model (you can use different evaluation metrics depending on your specific problem)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="TotalPrice", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")

+--------+-----+----------+------------------+
|Quantity|Price|TotalPrice|        prediction|
+--------+-----+----------+------------------+
|       5| 25.0|     125.0|102.23337572674414|
|       4| 12.5|      50.0| 50.82599927325582|
|       6| 12.5|      75.0| 79.80863008720927|
|       2| 30.0|      60.0|  73.5258539244186|
|       4| 25.0|     100.0| 87.74206031976742|
|       3| 30.0|      90.0| 88.01716933139532|
|       4| 30.0|     120.0|102.50848473837208|
|       2| 10.0|      20.0|14.460156250000033|
|       4| 12.5|      50.0| 50.82599927325582|
|       1| 30.0|      30.0|59.034538517441874|
|       5| 12.5|      62.5| 65.31731468023254|
|       3| 30.0|      90.0| 88.01716933139532|
|       3| 25.0|      75.0|  73.2507449127907|
+--------+-----+----------+------------------+

Root Mean Squared Error (RMSE): 12.63


In [28]:
spark.stop()