In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("silverlayer").getOrCreate()

###Extraction

mounting the bronzelayer container

In [0]:
dbutils.fs.mount(
    source="wasbs://bronzelayer@anshstorageacc1.blob.core.windows.net/",
    mount_point="/mnt/bronze",
    extra_configs={f"fs.azure.account.key.anshstorageacc1.blob.core.windows.net":<Access Key>}
)

reading the raw data 

In [0]:
df=spark.read.csv('dbfs:/mnt/bronze/Online Retail.csv', header=True, inferSchema=True)

In [0]:
df.show(10)

In [0]:
df.printSchema()

####Transformation

converting the InvoiceDate column into a date and time format

In [0]:
from pyspark.sql.functions import to_timestamp

# Convert InvoiceDate to a datetime column
df = df.withColumn("InvoiceDate", to_timestamp("InvoiceDate", "dd-MM-yyyy HH:mm"))

df.printSchema()
df.select("InvoiceDate").show(5, truncate=False)

In [0]:
# Count the number of unique InvoiceNo in df
unique_InvoiceNo = df.select("InvoiceNo").distinct().count()
print(f"Number of unique InvoiceNo in df: {unique_InvoiceNo}")

taking combination of InvoiceNo and StockCode as a primary key or unique identifier

In [0]:
# Count rows with unique combinations of InvoiceNo and StockCode
unique_count = df.select("InvoiceNo", "StockCode").distinct().count()
print(f"Number of unique rows with distinct combinations of InvoiceNo and StockCode: {unique_count}")

checking for null values in our datasets

In [0]:
from pyspark.sql.functions import col, sum, when

null_counts = df.select(
    [
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df.columns
    ]
)

null_counts.show()

In [0]:
df = df.dropna()

In [0]:
# Count rows with unique combinations of InvoiceNo and StockCode after dropping NULL values
unique_count = df.select("InvoiceNo", "StockCode").distinct().count()
print(f"Number of unique rows with distinct combinations of InvoiceNo and StockCode: {unique_count}")

#####Filtering the dataset

In [0]:
from pyspark.sql.functions import col

# Removing rows where Quantity is 0 or less
df_filtered = df.filter(col("Quantity") > 0)

In [0]:
# Removing rows where UnitPrice is 0 or less
df_filtered = df_filtered.filter(col("UnitPrice") > 0)

In [0]:
# Filter out rows where Quantity is not an integer (or is a decimal value)
df_filtered = df_filtered.filter(col("Quantity") % 1 == 0)

In [0]:
df_filtered.show(5)

In [0]:
# Count rows with unique combinations of InvoiceNo and StockCode after dropping NULL values
unique_count = df.select("InvoiceNo", "StockCode").distinct().count()
print(f"Number of unique rows with distinct combinations of InvoiceNo and StockCode: {unique_count}")

Keeping only 1 record for each order of a customer at a given day, so we can later find which customers have reordered in the future

In [0]:
df_unique = df_filtered.dropDuplicates(["CustomerID", "InvoiceDate"])
df_unique.orderBy("InvoiceNo").show()

In [0]:
df_unique_count = df_unique.count()
print(df_unique_count)

Getting a count of which customer has ordered on how many unique times

In [0]:
from pyspark.sql.functions import count

# Group by CustomerID and count the occurrences
df_customer_multiple = df_unique.groupBy("CustomerID") \
                                  .agg(count("CustomerID").alias("customer_count"))

In [0]:
abc = df_customer_multiple.count()
print(abc)

Adding a flag column to df_filtered if the CustomerID is present in df_customer_multiple

in otherwords Adding a flag column to df_filtered if the customer has reordered

In [0]:
# Perform a left join between df_filtered and df_customer_multiple on CustomerID
df_filtered_with_flag = df_filtered.join(df_customer_multiple, on="CustomerID", how="left")

In [0]:
df_filtered_with_flag.show()

In [0]:
# Count rows where Flag_reorder = 1
count_flag_1 = df_filtered_with_flag.filter(col("customer_count") == 1).count()

# Count rows where Flag_reorder > 1
count_flag_greater_than_1 = df_filtered_with_flag.filter(col("customer_count") > 1).count()

print(f"Rows where customer_count = 1: {count_flag_1}")
print(f"Rows where customer_count > 1: {count_flag_greater_than_1}")

In [0]:
from pyspark.sql.functions import when

# Assuming customer_count is a column in df_filtered_with_flag
df_filtered_with_flag = df_filtered_with_flag.withColumn(
    "Flag_reorder", when(col("customer_count") == 1, 0).otherwise(1)
)

df_filtered_with_flag.show()

####Load

Saving the processed data into the silverlayer container


In [0]:
#Saving this data in our silver layer container
#mounting
dbutils.fs.mount(
    source="wasbs://silverlayer@anshstorageacc1.blob.core.windows.net/",
    mount_point="/mnt/silver",
    extra_configs={f"fs.azure.account.key.anshstorageacc1.blob.core.windows.net":<ACCESS KEY>}
)

In [0]:
df_filtered_with_flag.write.format("parquet").mode("overwrite").save("/mnt/silver/online_retail_parquet")