In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=540b6dd8231032696168185b453d159f48bde0795bacd9244f98d3c4980f0cbb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName('SalesAnalysis').getOrCreate()

In [3]:
# Load dataset
data = spark.read.csv('Sales Dataset.csv', header=True, inferSchema=True)

In [4]:
# Show the first few rows of the dataset
data.show(5)

# Print the schema of the dataset
data.printSchema()

# Describe the dataset to get summary statistics
data.describe().show()

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|    NULL|                NULL|            NULL|      NULL|          NULL|                NULL|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 5 rows

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: int

In [5]:
# Drop rows with missing values
data = data.dropna()

# Convert necessary columns to appropriate data types
from pyspark.sql.functions import col

data = data.withColumn("Quantity Ordered", col("Quantity Ordered").cast("integer"))
data = data.withColumn("Price Each", col("Price Each").cast("float"))
data = data.withColumn("Order Date", col("Order Date").cast("timestamp"))

In [6]:
from pyspark.sql.functions import year, month, dayofmonth

# Extract year, month, and day from Order Date
data = data.withColumn("Year", year("Order Date"))
data = data.withColumn("Month", month("Order Date"))
data = data.withColumn("Day", dayofmonth("Order Date"))

# Calculate total sales for each order
data = data.withColumn("Total Sales", col("Quantity Ordered") * col("Price Each"))

In [7]:
# Group by year and month to calculate monthly sales
monthly_sales = data.groupBy("Year", "Month").sum("Total Sales").orderBy("Year", "Month")
monthly_sales.show()

+----+-----+-------------------+
|Year|Month|   sum(Total Sales)|
+----+-----+-------------------+
|NULL| NULL|3.449203579634309E7|
+----+-----+-------------------+



In [8]:
# Group by product to calculate total quantity sold
top_products = data.groupBy("Product").sum("Quantity Ordered").orderBy(col("sum(Quantity Ordered)").desc())
top_products.show()

+--------------------+---------------------+
|             Product|sum(Quantity Ordered)|
+--------------------+---------------------+
|AAA Batteries (4-...|                31017|
|AA Batteries (4-p...|                27635|
|USB-C Charging Cable|                23975|
|Lightning Chargin...|                23217|
|    Wired Headphones|                20557|
|Apple Airpods Hea...|                15661|
|Bose SoundSport H...|                13457|
|    27in FHD Monitor|                 7550|
|              iPhone|                 6849|
|27in 4K Gaming Mo...|                 6244|
|34in Ultrawide Mo...|                 6199|
|        Google Phone|                 5532|
|       Flatscreen TV|                 4819|
|  Macbook Pro Laptop|                 4728|
|     ThinkPad Laptop|                 4130|
|        20in Monitor|                 4129|
|     Vareebadd Phone|                 2068|
|  LG Washing Machine|                  666|
|            LG Dryer|                  646|
+---------

In [9]:
from pyspark.sql.functions import split

# Extract city from Purchase Address
data = data.withColumn("City", split(col("Purchase Address"), ",")[1])

# Group by city to calculate total sales
sales_by_city = data.groupBy("City").sum("Total Sales").orderBy(col("sum(Total Sales)").desc())
sales_by_city.show()

+--------------+------------------+
|          City|  sum(Total Sales)|
+--------------+------------------+
| San Francisco| 8262203.869155407|
|   Los Angeles| 5452570.772869825|
| New York City| 4664317.406611681|
|        Boston|3661641.9913027287|
|       Atlanta| 2795498.565934658|
|        Dallas| 2767975.385901928|
|       Seattle| 2747755.465265751|
|      Portland|2320490.5985319614|
|        Austin|1819581.7407691479|
+--------------+------------------+



In [10]:
# Group by year, month, and day to calculate daily sales
daily_sales = data.groupBy("Year", "Month", "Day").sum("Total Sales").orderBy("Year", "Month", "Day")
daily_sales.show()

+----+-----+----+-------------------+
|Year|Month| Day|   sum(Total Sales)|
+----+-----+----+-------------------+
|NULL| NULL|NULL|3.449203579634309E7|
+----+-----+----+-------------------+



In [11]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# Create a feature vector
assembler = VectorAssembler(inputCols=["Quantity Ordered", "Price Each"], outputCol="features")
feature_vector = assembler.transform(data)

# Calculate correlation
correlation_matrix = Correlation.corr(feature_vector, "features").head()[0]
print("Correlation matrix:\n", correlation_matrix)

Correlation matrix:
 DenseMatrix([[ 1.        , -0.14827234],
             [-0.14827234,  1.        ]])


In [12]:
# Stop Spark session
spark.stop()