In [1]:
import pyspark

# Initialize spark
conf = pyspark.SparkConf().setAppName("RetailRDD").setMaster("local[4]")
sc = pyspark.SparkContext(conf=conf)

In [2]:
# Load the data
data = sc.textFile("data/OnlineRetail.csv")

# Skip header
header = data.first()
rdd = data.filter(lambda row: row != header).map(lambda line: line.split("\t"))

In [3]:
# 1) Number of sales transactions
num_transactions = rdd.count()
print(f"Number of sales transactions: {num_transactions}")

Number of sales transactions: 541909


In [None]:
# 2) Percentage of cancelled transactions (InvoiceNo starts with 'C')
percentage_cancelled = (rdd.filter(lambda x: x[0].startswith("C")).count() / num_transactions) * 100
print(f"Percentage of cancelled transactions: {percentage_cancelled:.2f}%")

In [None]:
# 3) Number of customers and products
num_customers = rdd.map(lambda x: x[6]).distinct().count()
num_products = rdd.map(lambda x: x[1]).distinct().count()
print(f"Number of customers: {num_customers}")
print(f"Number of products: {num_products}")

In [None]:
# 4) Number of customers per country
customers_per_country = rdd.map(lambda x: (x[7], x[6])).distinct() \
                           .map(lambda x: (x[0], 1)) \
                           .reduceByKey(lambda a,b: a+b) \
                           .collect()
print("Number of customers per country:")
for country, count in customers_per_country:
    print(f"{country}: {count}")

In [None]:
# 5) Top 10 products by quantity / sales volume
product_sales = rdd.map(lambda x: (x[1], (int(x[3]), float(x[3])*float(x[5])))) \
                   .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

top10_by_quantity = product_sales.map(lambda x: (x[0], x[1][0])) \
                                 .takeOrdered(10, key=lambda x: -x[1])
top10_by_volume = product_sales.map(lambda x: (x[0], x[1][1])) \
                               .takeOrdered(10, key=lambda x: -x[1])

print("Top 10 products by quantity:")
for p, q in top10_by_quantity:
    print(f"{p}: {q}")

print("Top 10 products by sales volume:")
for p, v in top10_by_volume:
    print(f"{p}: {v:.2f}")

In [None]:
# 6) Number of products with Union Jack branding
union_jack_products = rdd.filter(lambda x: "Union Jack" in x[2]).map(lambda x: x[1]).distinct().count()
print(f"Number of products with Union Jack branding: {union_jack_products}")

In [None]:
# 7) Average number of distinct products per sale
avg_products_per_invoice = rdd.map(lambda x: (x[0], x[1])).distinct() \
                              .map(lambda x: (x[0], 1)) \
                              .reduceByKey(lambda a,b: a+b) \
                              .map(lambda x: x[1]).mean()
print(f"Average number of distinct products per sale: {avg_products_per_invoice:.2f}")

In [None]:
# 8) Sales volume per country in December 2010
sales_dec_2010 = rdd.filter(lambda x: x[4].split(" ")[0].split("/")[2] == "2010" and x[4].split(" ")[0].split("/")[0] == "12") \
                    .map(lambda x: (x[7], float(x[3])*float(x[5]))) \
                    .reduceByKey(lambda a,b: a+b) \
                    .collect()

print("Sales volume per country in December 2010:")
for country, volume in sales_dec_2010:
    print(f"{country}: {volume:.2f}")