In [None]:
import pyspark 
from pyspark import SparkConf, SparkContext

# Smoke Test

In [None]:
conf = SparkConf()
conf.setMaster('local[4]')
conf.set("spark.files.maxPartitionBytes", "200K")
conf.set("spark.sql.files.maxPartitionBytes", "200K")
conf.set("spark.files.openCostInBytes", "1000")

disable_optimizations = True

if disable_optimizations:
    conf.set("spark.shuffle.sort.bypassMergeThreshold", "1")
    conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
    conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
    conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
    conf.set("spark.sql.adaptive.enabled", "false")
    conf.set("spark.sql.bucketing.coalesceBucketsInJoin.enabled", "false")
    conf.set("spark.sql.cbo.enabled", "false")
    conf.set("spark.sql.csv.filterPushdown.enabled", "false")
    conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false")


sc = SparkContext(conf=conf)
sc.setLogLevel('INFO')
spark = pyspark.sql.SparkSession(sc)
sc

In [None]:
!hostname
!echo
!ls /ext

In [None]:
!ls /ext/dataset/*/* | shuf | head -n10  | sort

In [None]:
rdd = sc.parallelize(range(1000))
rdd \
    .filter(lambda number: number % 10 == 0) \
    .takeSample(False, 10)

# Reading data

## Customers

In [None]:
customers = spark.read \
    .option("header", True) \
    .csv("/ext/dataset/customers/*.csv")

In [None]:
customers.show(5)

In [None]:
customers.rdd.take(5)

In [None]:
customers.rdd.getNumPartitions()

In [None]:
customers.rdd.toDF().show(5)

In [None]:
customers.count()

In [None]:
customers.rdd.map(lambda row: type(row.customer_zip_code_prefix)).take(5)

## Products, Orders, Order-Items

In [None]:
products = spark.read \
    .option("header", True) \
    .csv("/ext/dataset/products/*.csv")
products.show(5)
products.count()

In [None]:
orders = spark.read \
    .option("header", True) \
    .csv("/ext/dataset/orders/*.csv")
orders.show(5)
orders.count()

In [None]:
order_items = spark.read \
    .option("header", True) \
    .csv("/ext/dataset/order_items/*.csv")
order_items.show(5)
order_items.count()

# Count unique customers

In [None]:
customers.count()

In [None]:
customers.select("customer_id").distinct().count()

In [None]:
customers.select("customer_unique_id").distinct().count()

# Spark API examples

## RDD API

In [None]:
customers_with_most_orders = customers.rdd \
    .map(lambda row: (row.customer_unique_id, 1)) \
    .reduceByKey(lambda num_rows1, num_rows2: num_rows1 + num_rows2) \
    .sortBy(lambda customer_uid_and_num_rows: -customer_uid_and_num_rows[1]) \
    .take(10)
customers_with_most_orders

## DataFrame API

In [None]:
home_products = products \
    .filter(products.product_category_name == 'cama_mesa_banho') \
    .filter(products.product_weight_g > 10000) \
    .select(["product_id", "product_category_name", "product_weight_g", (products.product_weight_g / 1000).alias('weight_kg') ])
home_products.show()

## SQL API

In [None]:
home_products.createOrReplaceTempView("table_home_products")
order_items.createOrReplaceTempView("table_order_items")

ordered_perfumes_df = spark.sql("""
    SELECT 
        p.product_id product_id,
        o.order_id order_id,
        CAST (o.price AS float) price,
        p.weight_kg weight_kg
    FROM
        table_home_products p
        LEFT JOIN table_order_items o ON (p.product_id = o.product_id)    
    WHERE
        price < 200
    ORDER BY
        product_id
""")

ordered_perfumes_df.collect()[:15]

# Exercise 1

Find customers who ordered (`order_status == "delivered"`) at least 3 products from category *perfumaria* and return top 5 customeres who spend most money in that category.

**Please use (mix) all three (RDD, DataFrame, SQL) interfaces to compute the result.**

# Exercise 2

- duplicate notebook
- set `disable_optimizations` to `False`
- use "Restart Kernel and runn all cells" in both notebooks
- compare execution plans and computed DAGs

# Hints and references

- https://spark.apache.org/docs/latest/sql-getting-started.html - Spark SQL Guide
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html - PySpark SQL API reference
- https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations - Spark actions and transformations
- https://spark.apache.org/docs/latest/configuration.html#available-properties - Spark Configuration

# Solution

In [None]:
from typing import NamedTuple
from collections import namedtuple

# class OrderAndCustomer(NamedTuple):
#     order_id: str
#     customer_id: str

OrderAndCustomer = namedtuple("OrderUser", "order_id customer_id")

delivered_orders = orders.rdd \
    .filter(lambda row: row.order_status == "delivered") \
    .map(lambda row: OrderAndCustomer(row.order_id, row.customer_id)) \
    .keyBy(lambda o: o.customer_id)

In [None]:
Customer = namedtuple("Customer", "customer_id customer_unique_id")

customers_with_orders = customers.rdd \
    .map(lambda row: Customer(row.customer_id, row.customer_unique_id)) \
    .keyBy(lambda c: c.customer_id) \
    .join(delivered_orders)

In [None]:
#customers_with_orders.take(10)

In [None]:
perfumes = products.filter(products.product_category_name == 'perfumaria')

In [None]:
#perfumes.show()

In [None]:
perfumes.createOrReplaceTempView("table_perfumes")
order_items.createOrReplaceTempView("table_orders")

ordered_perfumes_df = spark.sql("""
    select 
        p.product_id product_id,
        o.order_id order_id,
        cast (o.price as float) price
    from 
        table_perfumes p
        left join table_orders o on (p.product_id = o.product_id)    
""")

In [None]:
#ordered_perfumes_df.show(5)

In [None]:
ordered_perfumes = ordered_perfumes_df.rdd \
    .keyBy(lambda row: row.order_id)

In [None]:
customers_orders_perfumes = customers_with_orders \
    .values() \
    .keyBy(lambda customerAndOrder: customerAndOrder[1].order_id) \
    .join(ordered_perfumes)

In [None]:
# tmp = customers_orders_perfumes.take(2)
# v = tmp[0][1]
# v

In [None]:
Stats = namedtuple("Stats", "price num_items")
CustomerWithStats = namedtuple("CustomerWithStats", "customer_id stats")

# (order_id, ((Customer, OrderUser), ProductRow)) => (Customer, Orderuser, ProductRow)
flat_customers_orders_perfumes = customers_orders_perfumes \
    .values() \
    .map(lambda nested_customer_order_product: (nested_customer_order_product[0][0], nested_customer_order_product[0][1], nested_customer_order_product[1])) \

result_rdd = flat_customers_orders_perfumes \
    .map(lambda customer_order_product: CustomerWithStats(customer_order_product[0].customer_unique_id, Stats(customer_order_product[2].price, 1))) \
    .reduceByKey(lambda stats1, stats2: Stats(stats1.price + stats2.price, stats1.num_items + stats2.num_items)) \
    .map(lambda record: CustomerWithStats(*record)) \
    .filter(lambda customer_with_stats: customer_with_stats.stats.num_items > 2) \
    .sortBy(lambda customer_with_stats: -customer_with_stats.stats.price)    

result = result_rdd.collect()

In [None]:
result[:5]