# Retail Data Analysis with PySpark
This analysis explores retail data, including customer orders, product details, and sales categories. The primary goal is to analyze order data, identify distinct orders, and observe relationships among various datasets.

### Dataset Descriptions
- `orders.csv`: Contains customer orders with order details and timestamps.
- `order_items.csv`: Includes item-level details for each order.
- `products.csv`: Lists product details available in the store.
- `customers.csv`: Contains customer data.
- `categories.csv`: Provides categories for each product.
- `departments.csv`: Department data for organizing products.

The following analysis is performed using PySpark to demonstrate data processing, aggregation, and summary statistics.


In [6]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

In [7]:
spark = SparkSession.builder.appName("RetailDBAnalysis").master("local[2]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/31 15:00:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Load datasets using Spark's CSV reader with schema inference
# Each dataset is loaded into a separate DataFrame for easier manipulation and analysis


In [8]:
# Load datasets using Spark's CSV reader with schema inference.
# Each dataset is loaded into a separate DataFrame for easier manipulation and analysis.

orders_df = spark.read.csv("/opt/examples/orders.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv("/opt/examples/order_items.csv", header=True, inferSchema=True)
products_df = spark.read.csv("/opt/examples/products.csv", header=True, inferSchema=True)
customers_df = spark.read.csv("/opt/examples/customers.csv", header=True, inferSchema=True)
categories_df = spark.read.csv("/opt/examples/categories.csv", header=True, inferSchema=True)
departments_df = spark.read.csv("/opt/examples/departments.csv", header=True, inferSchema=True)

                                                                                

In [9]:
# Count distinct orders by selecting unique order IDs in order_items_df.

distinct_order_ids = order_items_df.select("orderItemOrderId").distinct().count()
print("Distinct orderItemOrderId count:", distinct_order_ids)

# Explanation: This output shows the count of unique orders, 
# helping us understand the total number of individual orders processed.



Distinct orderItemOrderId count: 57431


                                                                                

In [10]:
# Count the total number of rows in orders and order_items tables.
# This provides a basic overview of the dataset sizes.

orders_count = orders_df.count()
order_items_count = order_items_df.count()
print("Orders row count:", orders_count)
print("Order Items row count:", order_items_count)

# Explanation: These row counts give us a sense of the data volume 
# in the orders and order items datasets.

Orders row count: 68883
Order Items row count: 172198


In [11]:
canceled_orders = orders_df.filter(orders_df.orderStatus == "CANCELED")


In [12]:
canceled_orders.show(10)

+-------+-------------------+---------------+-----------+
|orderId|          orderDate|orderCustomerId|orderStatus|
+-------+-------------------+---------------+-----------+
|     50|2013-07-25 00:00:00|           5225|   CANCELED|
|    112|2013-07-26 00:00:00|           5375|   CANCELED|
|    527|2013-07-28 00:00:00|           5426|   CANCELED|
|    552|2013-07-28 00:00:00|           1445|   CANCELED|
|    564|2013-07-28 00:00:00|           2216|   CANCELED|
|    607|2013-07-28 00:00:00|           6376|   CANCELED|
|    649|2013-07-28 00:00:00|           7261|   CANCELED|
|    667|2013-07-28 00:00:00|           4726|   CANCELED|
|    716|2013-07-29 00:00:00|           2581|   CANCELED|
|    717|2013-07-29 00:00:00|           8208|   CANCELED|
+-------+-------------------+---------------+-----------+
only showing top 10 rows



In [13]:
canceled_product_sales = canceled_orders \
    .join(order_items_df, canceled_orders.orderId == order_items_df.orderItemOrderId) \
    .join(products_df, order_items_df.orderItemProductId == products_df.productId) \
    .groupBy("productName") \
    .agg(F.round(F.sum("orderItemSubTotal"), 2).alias("total_sales")) \
    .orderBy(F.desc("total_sales"))

In [14]:
canceled_product_sales.show(10)

+--------------------+-----------+
|         productName|total_sales|
+--------------------+-----------+
|Field & Stream Sp...|  134393.28|
|Perfect Fitness P...|    85785.7|
|Nike Men's Free 5...|   80691.93|
|Diamondback Women...|   80094.66|
|Pelican Sunstream...|   66196.69|
|Nike Men's Dri-FI...|    65750.0|
|Nike Men's CJ Eli...|   60705.33|
|O'Brien Men's Neo...|   58126.74|
|Under Armour Girl...|   26153.46|
|LIJA Women's Eyel...|     2145.0|
+--------------------+-----------+
only showing top 10 rows



In [15]:
canceled_product_sales.write \
.mode("overwrite") \
.format("parquet") \
.save("output/most_canceled_products.parquet")


                                                                                

In [16]:
canceled_order_items = order_items_df.join(canceled_orders, order_items_df.orderItemOrderId == canceled_orders.orderId)

In [24]:
canceled_categories_sales = canceled_order_items \
    .join(products_df, canceled_order_items.orderItemProductId == products_df.productId) \
    .join(categories_df, products_df.productCategoryId == categories_df.categoryId) \
    .groupBy("categoryName") \
    .agg(F.round(F.sum("orderItemSubTotal"),2).alias("total_sales")) \
    .orderBy(F.desc("total_sales"))

In [25]:
canceled_categories_sales.show(10)

+--------------------+-----------+
|        categoryName|total_sales|
+--------------------+-----------+
|             Fishing|  134393.28|
|              Cleats|    85785.7|
|    Cardio Equipment|   81351.93|
|    Camping & Hiking|   80094.66|
|        Water Sports|   66196.69|
|     Women's Apparel|    65750.0|
|      Men's Footwear|   60705.33|
|Indoor/Outdoor Games|   58126.74|
|       Shop By Sport|   27423.44|
|         Electronics|     5685.5|
+--------------------+-----------+
only showing top 10 rows



In [26]:
monthly_sales = orders_df \
    .join(order_items_df, orders_df.orderId == order_items_df.orderItemOrderId) \
    .withColumn("month_year", F.date_format("orderDate", "MMMM-yyyy")) \
    .groupBy("month_year") \
    .agg(F.round(F.sum("orderItemSubTotal"), 2).alias("total_sales")) \
    .orderBy(F.desc("total_sales"))

In [28]:
months_tr = {
    "January": "Ocak", "February": "Şubat", "March": "Mart", "April": "Nisan",
    "May": "Mayıs", "June": "Haziran", "July": "Temmuz", "August": "Ağustos",
    "September": "Eylül", "October": "Ekim", "November": "Kasım", "December": "Aralık"
}

monthly_sales = orders_df \
    .join(order_items_df, orders_df["orderId"] == order_items_df["orderItemOrderId"]) \
    .withColumn("month", F.date_format("orderDate", "MMMM")) \
    .withColumn("year", F.year("orderDate")) \
    .groupBy("month", "year") \
    .agg(F.round(F.sum("orderItemSubTotal"), 2).alias("total_sales")) \
    .orderBy(F.desc("total_sales"))

top_monthly_sales = monthly_sales.limit(1).collect()[0]

month_tr = months_tr[top_monthly_sales["month"]]
year = top_monthly_sales["year"]

output = f"{year} yılının {month_tr} ayında en çok satış olmuş"
print(output)

2013 yılının Kasım ayında en çok satış olmuş


In [29]:
days_tr = {
    "Monday": "Pazartesi", "Tuesday": "Salı", "Wednesday": "Çarşamba",
    "Thursday": "Perşembe", "Friday": "Cuma", "Saturday": "Cumartesi", "Sunday": "Pazar"
}

weekly_sales = orders_df \
    .join(order_items_df, orders_df["orderId"] == order_items_df["orderItemOrderId"]) \
    .withColumn("day_of_week", F.date_format("orderDate", "EEEE")) \
    .groupBy("day_of_week") \
    .agg(F.round(F.sum("orderItemSubTotal"), 2).alias("total_sales")) \
    .orderBy(F.desc("total_sales"))

top_day_sales = weekly_sales.limit(1).collect()[0]

# Convert the English day name to Turkish
day_of_week_tr = days_tr[top_day_sales["day_of_week"]]

# Format the output in the requested format
output = f"En çok satış yapılan gün: {day_of_week_tr}"
print(output)


En çok satış yapılan gün: Cuma


In [30]:
retail_all = orders_df \
    .join(order_items_df, orders_df.orderId == order_items_df.orderItemOrderId, "left") \
    .join(customers_df, orders_df.orderCustomerId == customers_df.customerId, "left") \
    .join(products_df, order_items_df.orderItemProductId == products_df.productId, "left") \
    .join(categories_df, products_df.productCategoryId == categories_df.categoryId, "left") \
    .join(departments_df, categories_df.categoryDepartmentId == departments_df.departmentId, "left") \
    .select(
        F.col("orderItemName"),
        F.col("orderItemOrderId"),
        F.col("orderItemProductId"),
        F.col("orderItemQuantity"),
        F.col("orderItemSubTotal"),
        F.col("orderItemProductPrice"),
        F.col("orderId"),
        F.col("orderDate").cast("timestamp"),
        F.col("orderCustomerId"),
        F.col("orderStatus"),
        F.col("customerId"),
        F.col("customerFName"),
        F.col("customerLName"),
        F.col("customerEmail"),
        F.col("customerPassword"),
        F.col("customerStreet"),
        F.col("customerCity"),
        F.col("customerState"),
        F.col("customerZipcode"),
        F.col("productId"),
        F.col("productCategoryId"),
        F.col("productName"),
        F.col("productDescription"),
        F.col("productPrice"),
        F.col("productImage"),
        F.col("categoryId"),
        F.col("categoryDepartmentId"),
        F.col("categoryName"),
        F.col("departmentId"),
        F.col("departmentName")
    )

In [31]:
retail_all.printSchema()

root
 |-- orderItemName: integer (nullable = true)
 |-- orderItemOrderId: integer (nullable = true)
 |-- orderItemProductId: integer (nullable = true)
 |-- orderItemQuantity: integer (nullable = true)
 |-- orderItemSubTotal: double (nullable = true)
 |-- orderItemProductPrice: double (nullable = true)
 |-- orderId: integer (nullable = true)
 |-- orderDate: timestamp (nullable = true)
 |-- orderCustomerId: integer (nullable = true)
 |-- orderStatus: string (nullable = true)
 |-- customerId: integer (nullable = true)
 |-- customerFName: string (nullable = true)
 |-- customerLName: string (nullable = true)
 |-- customerEmail: string (nullable = true)
 |-- customerPassword: string (nullable = true)
 |-- customerStreet: string (nullable = true)
 |-- customerCity: string (nullable = true)
 |-- customerState: string (nullable = true)
 |-- customerZipcode: integer (nullable = true)
 |-- productId: integer (nullable = true)
 |-- productCategoryId: integer (nullable = true)
 |-- productName: str

In [39]:
db_ip = "172.19.0.2"
user_name = "postgres"
password = "Ankara06"

jdbcUrl = f"jdbc:postgresql://{db_ip}:5432/test1?user={user_name}&password={password}"

retail_all.write.jdbc(
    url=jdbcUrl,
    table="churn_spark",
    mode="overwrite",
    properties={"driver": "org.postgresql.Driver"}
)

                                                                                