In [0]:
spark.conf.set("fs.azure.account.key.adlsprojct22.dfs.core.windows.net", "Ny+m1mhXiC4+DViiZrGuJYWKXhNa2J5l/HnArmFcRzca5fZSfrCfenBMPpgijqbY9+jEtXkeVLxz+AStih086Q==")

In [0]:
dbutils.fs.ls("abfss://raw@adlsprojct22.dfs.core.windows.net/")

[FileInfo(path='abfss://raw@adlsprojct22.dfs.core.windows.net/dbo.online_retail_sales_dataset.txt', name='dbo.online_retail_sales_dataset.txt', size=125017551, modificationTime=1724425156000)]

In [0]:
from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder.appName("ReadData").getOrCreate()
storage_account_name = "adlsprojct22"
src_container = "raw"
file_path = "dbo.online_retail_sales_dataset.txt"
target_container = "preprocessed"

adls_url = f"abfss://{src_container}@{storage_account_name}.dfs.core.windows.net/{file_path}"

df = spark.read.csv(adls_url, sep=',', header=True)

df.show()


+--------------+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
|transaction_id|          timestamp|customer_id|product_id|    product_category|quantity| price|discount|payment_method|customer_age|customer_gender|customer_location|total_amount|
+--------------+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
|         21606|2023-01-16 00:05:00|       2231|       759|               Books|       4|177.62|    0.05|     Gift Card|          24|         Female|    North America|      674.96|
|         21607|2023-01-16 00:06:00|       1542|       331|Beauty & Personal...|       7|395.48|    0.07|        PayPal|          64|          Other|           Africa|     2574.57|
|         21608|2023-01-16 00:07:00|       2133|       797|               Books|       7|145.02

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("product_category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("discount", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("customer_age", IntegerType(), True),
    StructField("customer_gender", StringType(), True),
    StructField("customer_location", StringType(), True),
    StructField("total_amount", DoubleType(), True)
])


In [0]:
df=spark.read.csv(adls_url, schema=schema, header=True,inferSchema=True)
df.show()


+--------------+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
|transaction_id|          timestamp|customer_id|product_id|    product_category|quantity| price|discount|payment_method|customer_age|customer_gender|customer_location|total_amount|
+--------------+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
|         21606|2023-01-16 00:05:00|       2231|       759|               Books|       4|177.62|    0.05|     Gift Card|          24|         Female|    North America|      674.96|
|         21607|2023-01-16 00:06:00|       1542|       331|Beauty & Personal...|       7|395.48|    0.07|        PayPal|          64|          Other|           Africa|     2574.57|
|         21608|2023-01-16 00:07:00|       2133|       797|               Books|       7|145.02

In [0]:
# Correctly renaming columns in a chained manner
df = df.withColumnRenamed("transaction_id", "id") \
       .withColumnRenamed("timestamp", "Timestamp") \
       .withColumnRenamed("customer_id", "Customer_id") \
       .withColumnRenamed("product_id", "Product_id") \
       .withColumnRenamed("product_category", "Product_category") \
       .withColumnRenamed("quantity", "Quantity") \
       .withColumnRenamed("price", "Price") \
       .withColumnRenamed("discount", "Discount") \
       .withColumnRenamed("payment_method", "Payment_method") \
       .withColumnRenamed("customer_age", "Customer_age") \
       .withColumnRenamed("customer_gender", "Customer_gender") \
       .withColumnRenamed("customer_location", "Customer_location") \
       .withColumnRenamed("total_amount", "Total_amount")




In [0]:
df.show()

+-----+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
|   id|          Timestamp|Customer_id|Product_id|    Product_category|Quantity| Price|Discount|Payment_method|Customer_age|Customer_gender|Customer_location|Total_amount|
+-----+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
|21606|2023-01-16 00:05:00|       2231|       759|               Books|       4|177.62|    0.05|     Gift Card|          24|         Female|    North America|      674.96|
|21607|2023-01-16 00:06:00|       1542|       331|Beauty & Personal...|       7|395.48|    0.07|        PayPal|          64|          Other|           Africa|     2574.57|
|21608|2023-01-16 00:07:00|       2133|       797|               Books|       7|145.02|    0.15|     Gift Card|          21|         Female|

In [0]:
#Checking of missing values

for a in df.columns:
    if df.filter(df[a].isNull()):
        null_count = df.filter(df[a].isNull()).count()    
print('The number of Null values are : ',null_count)

The number of Null values are :  0


In [0]:
# Drop duplicates based on all columns
df_no_duplicates = df.dropDuplicates()

# Count the number of records in the original and the new DataFrame
if df.count() != df_no_duplicates.count():
    print("There are duplicates in the DataFrame.")
else:
    print("No duplicates found.")


No duplicates found.


In [0]:
from pyspark.sql.functions import col

# Remove outliers using IQR method
q1 = df.approxQuantile("price", [0.25], 0.05)[0]
q3 = df.approxQuantile("price", [0.75], 0.05)[0]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

newdf = df.filter((col('price') >= lower_bound) & (col('price') <= upper_bound))

print(df.count())
print(newdf.count())



1000000
1000000


In [0]:
df.createOrReplaceTempView("view_retaildataset")
checking = """
                SELECT * FROM view_retaildataset
                ORDER BY id
              """
checking_df = spark.sql(checking)
checking_df.show(10)


+---+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
| id|          Timestamp|Customer_id|Product_id|    Product_category|Quantity| Price|Discount|Payment_method|Customer_age|Customer_gender|Customer_location|Total_amount|
+---+-------------------+-----------+----------+--------------------+--------+------+--------+--------------+------------+---------------+-----------------+------------+
|  1|2023-01-01 00:00:00|       1993|       915|      Home & Kitchen|       8| 103.3|    0.23|     Gift Card|          27|         Female|    North America|      636.33|
|  2|2023-01-01 00:01:00|       3474|       553|            Clothing|       9|180.28|    0.31|     Gift Card|          53|          Other|    South America|     1119.54|
|  3|2023-01-01 00:02:00|       4564|       248|Beauty & Personal...|       7| 81.58|    0.27|    Debit Card|          34|          Other|    North Am

In [0]:
checking_df.write.mode("append").parquet((f"abfss://{target_container}@adlsprojct22.dfs.core.windows.net/retail_dataset"))

In [0]:
from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder.appName("ReadData").getOrCreate()
storage_account_name = "adlsprojct22"
src_container = "preprocessed"
file_path = "retail_dataset/"
target_container = "processed"

adls_url = f"abfss://{src_container}@{storage_account_name}.dfs.core.windows.net/{file_path}"

df = spark.read.csv(adls_url, sep=',', header=True)

df.show()
