In [2]:
pip install pyspark==3.5.2

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark==3.5.2
  Using cached pyspark-3.5.2.tar.gz (317.3 MB)
  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
[33m  DEPRECATION: Building 'pyspark' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'pyspark'. Discussion can be found at https://github.com/pypa/pip/issues/6334[0m[33m
[0m  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812388 sha256=9fff3b35ad735b49842de44f03d63aa449e1bab726f78e9a3d0273f33c90fd8c
  Stored in directory: /home/user/.cache/pip/wheels/34/34

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("RetailInventory") \
    .enableHiveSupport() \
    .getOrCreate()

# Define schemas
transactions_schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("item_id", IntegerType(), True),
    StructField("city_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price_usd", DoubleType(), True),
    StructField("discount_percent", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("customer_age", IntegerType(), True),
    StructField("transaction_date", StringType(), True)
])

items_schema = StructType([
    StructField("item_id", IntegerType(), True),
    StructField("item_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("supplier", StringType(), True),
    StructField("price_usd", DoubleType(), True),
    StructField("weight_kg", DoubleType(), True),
    StructField("color", StringType(), True),
    StructField("warranty_years", IntegerType(), True),
    StructField("release_year", IntegerType(), True),
    StructField("rating", DoubleType(), True)
])

cities_schema = StructType([
    StructField("city_id", IntegerType(), True),
    StructField("city_name", StringType(), True),
    StructField("state", StringType(), True),
    StructField("country", StringType(), True),
    StructField("population", IntegerType(), True),
    StructField("area_sq_km", DoubleType(), True),
    StructField("average_income_usd", IntegerType(), True),
    StructField("founded_year", IntegerType(), True),
    StructField("time_zone", StringType(), True),
    StructField("climate", StringType(), True)
])

# Load CSVs
transactions_df = spark.read.csv(
    "file:///home/user/Documents/retail inventory optimisation/Raw_Data/Transaction_raw_data/transactions.csv",
    schema=transactions_schema,
    header=True,
    sep=","
)

items_df = spark.read.csv(
    "file:///home/user/Documents/retail inventory optimisation/Raw_Data/items.csv",
    schema=items_schema,
    header=True,
    sep=","
)

cities_df = spark.read.json(  # assuming JSON for cities
    "file:///home/user/Documents/retail inventory optimisation/Raw_Data/cities.json",
    schema=cities_schema,
    multiLine=True
)

# # Register Hive External Tables
# transactions_df.write.mode("overwrite").option("path", "/home/user/Documents/retail inventory optimisation/write_folder/") \
#     .saveAsTable("default.transactions_raw")

# items_df.write.mode("overwrite").option("path", "/home/user/Documents/retail inventory optimisation/write_folder/") \
#     .saveAsTable("default.items_raw")


# cities_df.write.mode("overwrite").option("path", "/home/user/Documents/retail inventory optimisation/write_folder/") \
#     .saveAsTable("default.cities_raw")

# Schema Validation Report
print("===== Transactions Schema =====")
transactions_df.printSchema()
transactions_df.describe().show()

print("===== Items Schema =====")
items_df.printSchema()
items_df.describe().show()

print("===== Cities Schema =====")
cities_df.printSchema()
cities_df.describe().show()


25/07/22 09:52:51 WARN Utils: Your hostname, user-virtual-machine resolves to a loopback address: 127.0.1.1; using 10.33.60.39 instead (on interface ens160)
25/07/22 09:52:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/22 09:52:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


===== Transactions Schema =====
root
 |-- transaction_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price_usd: double (nullable = true)
 |-- discount_percent: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- transaction_date: string (nullable = true)



25/07/22 09:52:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+------------------+------------------+------------------+------------------+------------------+--------------+------------+------------------+----------------+
|summary|    transaction_id|           item_id|           city_id|          quantity|         price_usd|  discount_percent|payment_method|order_status|      customer_age|transaction_date|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+--------------+------------+------------------+----------------+
|  count|            224400|            224400|            224400|            219902|            219924|            219927|        224400|      224400|            224400|          224400|
|   mean|109971.21411319073|1251.3774286987523|149.53861408199643|1.9613191330683668|247.33349166075595|14.988835840983576|          null|        null| 48.49874331550802|            null|
| stddev| 63512.13953268465|144.89527851151345|29.0034356720

In [2]:
from pyspark.sql.functions import *

print("Total records before cleaning:", items_df.count())
# Check nulls
items_df.select([count(when(col(c).isNull(), c)).alias(c) for c in items_df.columns]).show()

Total records before cleaning: 510
+-------+---------+--------+--------+---------+---------+-----+--------------+------------+------+
|item_id|item_name|category|supplier|price_usd|weight_kg|color|warranty_years|release_year|rating|
+-------+---------+--------+--------+---------+---------+-----+--------------+------------+------+
|     26|       25|      25|      25|       25|       26|   27|            25|          25|    27|
+-------+---------+--------+--------+---------+---------+-----+--------------+------------+------+



In [3]:
import datetime

# Removing duplicate item_id
cleaned_items_df = items_df.dropDuplicates(["item_id"])

# Drop rows with nulls in critical columns
required_columns = ["item_id", "item_name", "price_usd", "weight_kg", "category", "release_year"]
cleaned_items_df = cleaned_items_df.dropna(subset=required_columns)

# Filter invalid prices and weights
cleaned_items_df = cleaned_items_df.filter((col("price_usd") > 0) & (col("weight_kg") > 0))

# Filter warranty (valid in [0-5])
cleaned_items_df = cleaned_items_df.filter((col("warranty_years") >= 0) & (col("warranty_years") <= 5))

# Filter based on acceptable release years (2000 to current year)
current_year = datetime.datetime.now().year
cleaned_items_df = cleaned_items_df.filter((col("release_year") >= 2000) & (col("release_year") <= current_year))

# Add derived column: item_age
cleaned_items_df = cleaned_items_df.withColumn("item_age", lit(current_year) - col("release_year"))

print("Total records after cleaning:", cleaned_items_df.count())
cleaned_items_df.describe(["price_usd", "weight_kg", "item_age", "rating"]).show()

Total records after cleaning: 354
+-------+------------------+-----------------+-----------------+------------------+
|summary|         price_usd|        weight_kg|         item_age|            rating|
+-------+------------------+-----------------+-----------------+------------------+
|  count|               354|              354|              354|               340|
|   mean| 499.6977118644069|5.052802259887001|14.17231638418079| 2.976764705882352|
| stddev|287.11154156305804|2.940548365809729|7.082374223909156|1.1737121118334402|
|    min|              2.04|            0.067|                2|               1.0|
|    max|             999.6|            9.971|               25|               5.0|
+-------+------------------+-----------------+-----------------+------------------+



In [5]:


from pyspark.sql.functions import *

cleaned_df=cities_df

# Remove duplicate rows based on 'city_id'
cleaned_df = cleaned_df.dropDuplicates(['city_id'])

# Impute missing values in 'average_income_usd' with the mean
# Calculate the mean of 'average_income_usd' (excluding nulls).
mean_income = cleaned_df.agg(mean('average_income_usd')).first()[0]
# Fill all null values in 'average_income_usd' with the calculated mean.
cleaned_df = cleaned_df.fillna({'average_income_usd': mean_income})

# Remove rows where the 'population' is negative
# Ensures all cities have non-negative population counts.
cleaned_df = cleaned_df.filter(col('population') >= 0)

# Standardize 'city_name' by removing leading/trailing spaces
# Ensures city names are cleaned of any unwanted space characters.
cleaned_df = cleaned_df.withColumn('city_name', trim(col('city_name')))

# Filter cities by 'founded_year' to keep only reasonable years (between 1000 and 2025)
# Removes cities with obviously invalid foundation years.
cleaned_df = cleaned_df.filter((col('founded_year') >= 1000) & (col('founded_year') <= 2025))

# Remove rows where 'state' or 'country' is missing (null)
# Ensures all cities have both a state and a country specified.
cleaned_df = cleaned_df.filter(col('state').isNotNull() & col('country').isNotNull())