In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar -xzf spark-3.1.2-bin-hadoop2.7.tgz
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
import findspark
findspark.init()

In [23]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, ArrayType

conf_obj = SparkConf()
conf_obj.set("spark.app.name", "RevolveSolutionsAssignment")
conf_obj.set("spark.master", "local[*]")

spark = SparkSession.\
        builder.\
        config(conf=conf_obj).\
        getOrCreate()

In [115]:
import os
import numpy as np
import csv
import json
import math
import random
from datetime import datetime, timedelta
np.random.seed(seed=42)

class Customer(object):
    def __init__(self, customer_id, loyalty_score):
        self.customer_id = customer_id
        self.value_score = loyalty_score


def generate_customers(output_location_root, number_of_customers, return_data=True):
    customers = []
    with open(f'{output_location_root}/customers.csv', mode='w') as customers_file:
        csv_writer = csv.writer(customers_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        csv_writer.writerow(["customer_id", "loyalty_score"])
        for cid in range(1, number_of_customers + 1):
            score = np.random.randint(low=1, high=11)
            customer_id = f"C{cid}"
            csv_writer.writerow([customer_id, score])
            if return_data:
                customers.append(Customer(customer_id, score))
    return customers if return_data else None


def generate_products(output_location_root, products_to_generate):
    product_count_digits = int(math.log10(len(sum(products_to_generate.values(), []))) + 1)

    product_id_lookup = {k: {} for k, v in products_to_generate.items()}
    with open(f'{output_location_root}/products.csv', mode='w') as products_file:
        csv_writer = csv.writer(products_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        csv_writer.writerow(["product_id", "product_description", "product_category"])
        item_index = 1
        for category in products_to_generate:
            for item in products_to_generate[category]:
                product_id = f"P{str(item_index).zfill(product_count_digits)}"
                csv_writer.writerow([product_id, item, category])
                product_id_lookup[category][item] = product_id
                item_index += 1
    return product_id_lookup


def generate_transactions(output_location_root, customers, products, product_id_lookup, products_cats_frequency,
                          start_datetime, end_datetime):
    open_files = open_transaction_sinks(output_location_root, start_datetime, end_datetime)
    product_cats_count = len(products.keys())
    num_days = (end_datetime - start_datetime).days
    all_days = [start_datetime + timedelta(days=d) for d in range(0, num_days + 1)]
    customer_frequency_type = [int(num_days / 14), int(num_days / 10), int(num_days / 7), int(num_days / 5),
                               int(num_days / 4), int(num_days / 3)]

    for customer in customers:
        num_transaction_days = random.choice(customer_frequency_type)
        num_cats = random.randint(1, product_cats_count)
        customer_transaction_days = sorted(random.sample(all_days, num_transaction_days))
        cats = random.sample(products_cats_frequency, num_cats)
        for day in customer_transaction_days:
            transaction = {
                "customer_id": customer.customer_id,
                "basket": generate_basket(products, product_id_lookup, cats),
                "date_of_purchase": str(day + timedelta(minutes=random.randint(168, 1439)))
            }
            open_files[to_canonical_date_str(day)].write(json.dumps(transaction) + "\n")

    for f in open_files.values():
        f.close()


def to_canonical_date_str(date_to_transform):
    return date_to_transform.strftime('%Y-%m-%d')


def open_transaction_sinks(output_location_root, start_datetime, end_datetime):
    root_transactions_dir = f"{output_location_root}/transactions/"
    open_files = {}
    days_to_generate = (end_datetime - start_datetime).days
    for next_day_offset in range(0, days_to_generate + 1):
        next_day = to_canonical_date_str(start_datetime + timedelta(days=next_day_offset))
        day_directory = f"{root_transactions_dir}/d={next_day}"
        os.makedirs(day_directory, exist_ok=True)
        open_files[next_day] = open(f"{day_directory}/transactions.json", mode='w')
    return open_files


def generate_basket(products, product_id_lookup, cats):
    num_items_in_basket = random.randint(1, 3)
    basket = []
    product_category = random.choice(cats)
    for item in [random.choice(products[product_category]) for _ in range(0, num_items_in_basket)]:
        product_id = product_id_lookup[product_category][item]
        basket.append({
            "product_id": product_id,
            "price": random.randint(1, 2000)
        })
    return basket


products_data = {
    "house": ["detergent", "kitchen roll", "bin liners", "shower gel", "scented candles", "fabric softener",
                "cling film", "aluminium foil", "toilet paper", "kitchen knife", "dishwasher tablets", "ice pack"],
    "clothes": ["men's dark green trousers", "women's shoes", "jumper", "men's belt", "women's black socks",
                "men's striped socks", "men's trainers", "women's blouse", "women's red dress"],
    "fruit_veg": ["avocado", "cherries", "scotch bonnets", "peppers", "broccoli", "potatoes", "grapes",
                    "easy peeler", "mango", "lemon grass", "onions", "apples", "raspberries"],
    "sweets": ["carrot cake", "salted caramel dark chocolate", "gummy bears", "kombucha", "ice cream", "irn bru"],
    "food": ["steak", "chicken", "mince beef", "milk", "hummus", "activated charcoal croissant", "whole chicken",
                "tuna", "smoked salmon", "camembert", "pizza", "oats", "peanut butter", "almond milk", "lentil soup",
                "greek yoghurt", "parmesan", "coconut water", "chicken stock",  "water"],
    "bws": ["red wine", "gin", "cognac", "cigarettes"]
}
products_cats_frequency = ["house"]*15 + ["clothes"]*5 + ["fruit_veg"]*25 + ["sweets"] * 20 + ["food"] * 25 + \
                            ["bws"] * 10

gen_id = "starter"
output_location = f"/content/input_data/{gen_id}"
os.makedirs(output_location, exist_ok=True)

gen_customers = generate_customers(output_location, 137)
product_id_lookup = generate_products(output_location, products_data)

start_date = datetime(2018, 12, 1, 0, 0, 0)
end_date = datetime(2019, 3, 1, 23, 59, 59)
generate_transactions(output_location, gen_customers, products_data, product_id_lookup, products_cats_frequency,
                        start_date, end_date)

In [101]:
customers_df = spark.read.csv("/content/input_data/starter/customers.csv", header=True, multiLine=True)
products_df = spark.read.csv("/content/input_data/starter/products.csv", header=True, multiLine=True)
transaction_schema = StructType([
        StructField("customer_id", StringType(), True),
        StructField("basket", ArrayType(StructType([
                StructField("product_id", StringType(), True),
                StructField("price", IntegerType(), True)
        ]))),
        StructField("date_of_purchase", StringType(), True)
])
transactions_raw_df = spark.read.text("/content/input_data/starter/transactions/d*/*")
transactions_df = transactions_raw_df.select(
    F.from_json(F.col("value"), transaction_schema).alias("map_cols")
).select(
    "map_cols.customer_id",
    "map_cols.basket",
    "map_cols.date_of_purchase"
)

In [112]:
transactionsdf1 = transactions_df.select(
	"customer_id",
	"date_of_purchase",
	F.explode("basket").alias("basket_exp")
)
transactionsdf2 = transactionsdf1.select(
	"customer_id",
	"date_of_purchase",
	F.col("basket_exp.product_id").alias("product_id"),
	F.col("basket_exp.price").alias("price")
)

trans_prod_df = transactionsdf2.alias("tdf").join(
	products_df.alias("pdf"),
	transactionsdf2["product_id"] == products_df["product_id"],
	"left"
).select(
	"tdf.customer_id",
    "tdf.product_id",
	"pdf.product_category",
    F.to_date(F.substring("date_of_purchase",1,10), "yyyy-MM-dd").alias("date_of_purchase")
)


rolled_up_df = trans_prod_df.groupby(
    "date_of_purchase",
    "customer_id",
    "product_category",
    "product_id"
).count().orderBy(
    "date_of_purchase",
    "customer_id",
    "product_category",
    "product_id"
)

output_df = rolled_up_df.join(
	customers_df,
	rolled_up_df["customer_id"] == customers_df["customer_id"],
	"left"
).select(
    "date_of_purchase",
    rolled_up_df["customer_id"],
    customers_df["loyalty_score"],
    "product_category",
    "product_id",
    F.col("count").alias("purchase_count")
)


(output_df
    .coalesce(1)
    .write
    .partitionBy("date_of_purchase")
    .mode("overwrite")
    .json("/content/output_data"))

In [113]:
!rm -r input_data

In [114]:
!rm -r inputs_data_generator

In [107]:
!rm -r output_data_1

In [104]:
a = spark.read.text("/content/input_data/starter/transactions/d=2018-12-01/transactions.json")
a_df = a.select(F.from_json(a["value"], transaction_schema).alias("map_cols"))
a_df.select(
    "map_cols.customer_id",
    "map_cols.basket",
    "map_cols.date_of_purchase"
).show()

+-----------+--------------------+-------------------+
|customer_id|              basket|   date_of_purchase|
+-----------+--------------------+-------------------+
|         C1|        [{P29, 464}]|2018-12-01 04:11:00|
|         C6|        [{P47, 279}]|2018-12-01 11:09:00|
|        C10|        [{P55, 413}]|2018-12-01 19:38:00|
|        C12|[{P62, 58}, {P62,...|2018-12-01 15:34:00|
|        C13|[{P63, 340}, {P63...|2018-12-01 22:15:00|
|        C22|        [{P43, 601}]|2018-12-01 05:36:00|
|        C28|[{P11, 1839}, {P0...|2018-12-01 09:48:00|
|        C30|[{P01, 1296}, {P0...|2018-12-01 23:20:00|
|        C31|[{P43, 1375}, {P5...|2018-12-01 04:21:00|
|        C39|[{P27, 1628}, {P2...|2018-12-01 21:35:00|
|        C44|       [{P24, 1332}]|2018-12-01 22:59:00|
|        C50|[{P26, 1848}, {P2...|2018-12-01 19:36:00|
|        C55|[{P01, 1707}, {P0...|2018-12-01 13:46:00|
|        C77|       [{P31, 1034}]|2018-12-01 07:23:00|
|        C78|[{P30, 1740}, {P2...|2018-12-01 22:37:00|
|        C