In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
import seaborn as sns

ModuleNotFoundError: No module named 'numpy'

In [5]:
# Initialize Spark Session
spark = SparkSession.builder.appName("BigDataProcessing").getOrCreate()


In [3]:
# Load datasets
fact_sales = spark.read.csv("FactInternetSales.csv", header=True, inferSchema=True)
dim_customer = spark.read.csv("DimCustomer.csv", header=True, inferSchema=True)
dim_product = spark.read.csv("DimProduct.csv", header=True, inferSchema=True)
dim_date = spark.read.csv("DimDate.csv", header=True, inferSchema=True)
dim_sales_territory = spark.read.csv("DimSalesTerritory.csv", header=True, inferSchema=True)

25/01/03 22:25:00 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+---------+---+
|     Name|Age|
+---------+---+
|    Alice| 34|
|      Bob| 45|
|Catherine| 29|
+---------+---+



In [None]:
# Cache datasets for performance
fact_sales.cache()
dim_customer.cache()
dim_product.cache()
dim_date.cache()
dim_sales_territory.cache()

In [None]:
# Step 2: Descriptive Statistics
fact_sales.describe().show()
fact_sales.select([(count(when(col(c).isNull(), c)) / count("*")).alias(c) for c in fact_sales.columns]).show()


In [None]:
# Step 3: Data Distributions
sales_amount_distribution = fact_sales.groupBy("SalesAmount").count().toPandas()
sales_amount_distribution.plot(kind="hist", bins=50, title="SalesAmount Distribution", color="blue")
plt.show()

In [None]:
# Step 4: Correlation Analysis
correlation_matrix = fact_sales.select("SalesAmount", "UnitPrice", "DiscountAmount").toPandas().corr()
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm")
plt.title("Correlation Heatmap")
plt.show()

In [None]:
# Step 5: Relationship Analysis
scatter_data = fact_sales.select("SalesAmount", "UnitPrice").toPandas()
plt.scatter(scatter_data["UnitPrice"], scatter_data["SalesAmount"], alpha=0.5)
plt.title("SalesAmount vs UnitPrice")
plt.xlabel("UnitPrice")
plt.ylabel("SalesAmount")
plt.show()

In [None]:
# Step 6: Time Series Analysis
fact_sales = fact_sales.withColumn("OrderDate", to_date(col("OrderDateKey").cast("string"), "yyyyMMdd"))
sales_trend = fact_sales.groupBy("OrderDate").sum("SalesAmount").orderBy("OrderDate").toPandas()
plt.plot(sales_trend["OrderDate"], sales_trend["sum(SalesAmount)"])
plt.title("Sales Over Time")
plt.xlabel("Date")
plt.ylabel("Total Sales")
plt.show()

In [None]:
# Step 7: Geographical Analysis
geo_sales = fact_sales.join(dim_sales_territory, "SalesTerritoryKey").groupBy("SalesTerritoryRegion").sum("SalesAmount").toPandas()
geo_sales.plot(kind="bar", x="SalesTerritoryRegion", y="sum(SalesAmount)", title="Sales by Region", color="green")
plt.show()

In [None]:
# Step 8: Key Performance Indicators
top_products = fact_sales.join(dim_product, "ProductKey").groupBy("EnglishProductName").sum("SalesAmount").orderBy(desc("sum(SalesAmount)")).limit(10).toPandas()
top_products.plot(kind="bar", x="EnglishProductName", y="sum(SalesAmount)", title="Top 10 Products by Sales")
plt.show()

In [None]:
# Step 9: Outlier Detection
sales_data = fact_sales.select("SalesAmount").toPandas()
sns.boxplot(x=sales_data["SalesAmount"])
plt.title("SalesAmount Outliers")
plt.show()

In [None]:
# Forecasting
assembler = VectorAssembler(inputCols=["OrderDateKey"], outputCol="features")
sales_data = fact_sales.withColumn("OrderDateKey", col("OrderDateKey").cast("int"))
sales_vector = assembler.transform(sales_data.select("OrderDateKey", "SalesAmount"))
