Generate Dummy Dataset with the following data inside:

Contact Dataset
CustomerId, Name, DoB, Phone Number, Email

Sales Dataset
SaleID, CustomerID, Array of ProductIDs, Purchase Date

Product Dataset
ProductID, Item, cost

Potential Calculations:


Calculate best selling item
Highest Value Customer
Date with most sales
Day of week with most sales

Ways of testing them:

We don't use schemas for all of the datasets, intentially don't do it for one where it will cause an issue e.g. the contact dataset

Misuse of joins on a count to give incorrect total - counts for a customer buying particular items

How could you write a test to prevent a faulty join from happening 

Not using explode and instead accessing the first item of an array





In [68]:
customerData = [
    ("1","James Smith", "07123572134", "james.smith1@gmail.com"),
    ("2","Michael Rose", "", ""),
    ("3","Robert Williams", "", ""),
    ("4","Maria Jones", "", ""),
]

In [69]:
salesDataset = [
    "1", "1", [2,3], "15/05/2025"
]

In [70]:
productDataset = [
    "1", "Tennis Ball", "25"
]

In [71]:
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, StringType, DateType
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = (
    SparkSession.builder.appName("Spark Example").getOrCreate()
)

In [72]:
import csv
import random
from datetime import datetime, timedelta

random.seed = 123456
def generate_random_digits(length=11):
    return ''.join(str(random.randint(0, 9)) for _ in range(length))

# Generate a list of 500 random strings of 11 digits
random_digit_strings = [generate_random_digits() for _ in range(500)]
# Generate Customer Dataset
customer_data = []
for i in range(1, 501):
    customer_data.append([
        f"{i:03}",
        f"Customer{i}",
        f"{random.randint(1970, 2000)}-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}",
        f"{generate_random_digits()}",
        f"customer{i}@example.com"
    ])

columns = ["CustomerId", "Name", "DoB", "Phone Number", "Email"]
customer_data = spark.createDataFrame(customer_data,columns )
customer_data.write.parquet('customer_dataset.csv', mode = 'overwrite')

# Generate Product Dataset
product_data = []
for i in range(1, 101):
    product_data.append([
        i,
        f"Product{i}",
        round(random.uniform(10, 100), 2)
    ])

columns = ["ProductID", "Item", "cost"]
product_data = spark.createDataFrame(product_data, columns)
product_data.write.parquet('product_dataset.parquet', mode = 'overwrite', )

# Generate Sales Dataset
sales_data = []
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)

def random_date(start, end):
    return start + timedelta(days=random.randint(0, (end - start).days))

for i in range(1, 1001):
    customer_id = f"{random.randint(1, 500):03}"
    num_products = random.randint(1, 5)
    product_ids = random.sample(range(1, 101), num_products)
    purchase_date = random_date(start_date, end_date).strftime("%Y-%m-%d")
    sales_data.append([
        i,
        customer_id,
        product_ids,
        purchase_date
    ])

# Define the schema
sales_schema = StructType([
    StructField("SaleID", IntegerType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("ProductIDs", ArrayType(IntegerType()), True),
    StructField("Purchase Date", StringType(), True)  # Using StringType for simplicity
])

salesDF = spark.createDataFrame(data=sales_data,schema=sales_schema)
salesDF.write.parquet('sales_dataset.parquet', mode = 'overwrite')

print("CSV files generated successfully.")


CSV files generated successfully.


In [73]:
customer = spark.read.parquet('customer_dataset.parquet', header=True)

In [74]:
customer.show()

+----------+-----------+----------+------------+--------------------+
|CustomerId|       Name|       DoB|Phone Number|               Email|
+----------+-----------+----------+------------+--------------------+
|       251|Customer251|1996-07-06| 96307466380|customer251@examp...|
|       252|Customer252|1995-06-11| 04539879250|customer252@examp...|
|       253|Customer253|1993-05-20| 67408513309|customer253@examp...|
|       254|Customer254|1972-05-17| 94396710154|customer254@examp...|
|       255|Customer255|2000-09-08| 48565135257|customer255@examp...|
|       256|Customer256|1970-08-28| 18181205096|customer256@examp...|
|       257|Customer257|1999-04-14| 11835025584|customer257@examp...|
|       258|Customer258|1978-09-25| 14860273878|customer258@examp...|
|       259|Customer259|1973-06-27| 27300722658|customer259@examp...|
|       260|Customer260|1970-10-27| 82224980518|customer260@examp...|
|       261|Customer261|1986-07-25| 65328607930|customer261@examp...|
|       262|Customer

In [75]:
product = spark.read.parquet('product_dataset.parquet', header=True)

In [76]:
product.show()

+---------+---------+-----+
|ProductID|     Item| cost|
+---------+---------+-----+
|       51|Product51|50.22|
|       52|Product52|43.18|
|       53|Product53|12.89|
|       54|Product54|41.46|
|       55|Product55|44.12|
|       56|Product56|92.95|
|       57|Product57|90.84|
|       58|Product58| 86.0|
|       59|Product59|79.44|
|       60|Product60|71.98|
|       61|Product61|85.33|
|       62|Product62| 50.5|
|       63|Product63|65.69|
|       64|Product64|95.54|
|       65|Product65| 36.5|
|       66|Product66|74.77|
|       67|Product67|10.08|
|       68|Product68|70.27|
|       69|Product69|16.64|
|       70|Product70|96.85|
+---------+---------+-----+
only showing top 20 rows



In [77]:

# Define the schema
sales_schema = StructType([
    StructField("SaleID", IntegerType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("ProductIDs", ArrayType(IntegerType()), True),
    StructField("Purchase Date", StringType(), True)  # Using StringType for simplicity
])


sales = spark.read.parquet('sales_dataset.parquet', schema =sales_schema,header=True)
sales.show()

+------+----------+--------------------+-------------+
|SaleID|CustomerID|          ProductIDs|Purchase Date|
+------+----------+--------------------+-------------+
|     1|       011|        [45, 66, 13]|   2023-02-05|
|     2|       203|        [19, 29, 98]|   2023-07-10|
|     3|       201|[30, 71, 87, 85, 65]|   2023-11-05|
|     4|       044| [5, 70, 88, 81, 46]|   2023-01-27|
|     5|       423|    [17, 26, 37, 48]|   2023-12-25|
|     6|       130|[52, 56, 25, 93, 84]|   2023-07-27|
|     7|       067|            [10, 75]|   2023-05-11|
|     8|       245|                [68]|   2023-09-06|
|     9|       373|                [89]|   2023-03-25|
|    10|       055|                [41]|   2023-07-13|
|    11|       137|[32, 86, 40, 45, 72]|   2023-05-15|
|    12|       155|                [76]|   2023-09-25|
|    13|       237|        [60, 39, 27]|   2023-10-13|
|    14|       164|        [17, 90, 93]|   2023-11-01|
|    15|       142|            [24, 33]|   2023-12-05|
|    16|  

Read in Sales Dataset using a Schema

Misuse of joins on a count to give incorrect total - counts for a customer buying particular items

How could you write a test to prevent a faulty join from happening 

In [78]:
# Register the DataFrame as a SQL temporary view
customer.createOrReplaceTempView("tmp_customer")
sales.createOrReplaceTempView("tmp_sales")
product.createOrReplaceTempView("tmp_product")

#Calculate the number of customers that have signed up to the website that have made a purchase on the website

# Here we are going to leave it as a left join to over generate the number of sales that there have been

spark.sql("""select count(distinct c.customerid) from tmp_customer as c left join tmp_sales as s on s.customerID = c.customerID""").show()

+--------------------------+
|count(DISTINCT customerid)|
+--------------------------+
|                       500|
+--------------------------+



In [79]:
spark.sql("""select min(customerID) from tmp_customer""").show()

+---------------+
|min(customerID)|
+---------------+
|            001|
+---------------+



Follow up question: we want to find all the customers that haven't made a purchase yet as part of a marketing campaign, please write some code to generate that

In [80]:

#Here we have intentionally picked the first item in the array to give the wrong value, we are instead expecting them to try and explode this array and get the correct answer this way
spark.sql('select sum(cost) from tmp_product as p join tmp_sales as s on element_at(s.productIDs, 1) = p.productID group by p.productID').show()


+------------------+
|         sum(cost)|
+------------------+
|225.35000000000002|
| 661.8600000000001|
|             474.5|
|           1050.48|
|            165.84|
| 463.4699999999999|
|            686.48|
|            369.24|
|349.71999999999997|
| 706.6800000000001|
|             263.5|
|           1180.92|
|443.34000000000003|
| 94.32000000000001|
|            476.64|
|              94.4|
|1645.4299999999998|
| 918.9000000000001|
|            104.37|
|            758.88|
+------------------+
only showing top 20 rows

