#Ejercicios

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

In [0]:
MOUNTPOINT = "/mnt/[container]"

CUSTOMERS_DATA =   f'{MOUNTPOINT}/retail/customers'
DEPARTMENTS_DATA = f'{MOUNTPOINT}/retail/departments'
CATEGORIES_DATA =  f'{MOUNTPOINT}/retail/categories'
PRODUCTS_DATA =    f'{MOUNTPOINT}/retail/products'
ORDERS_DATA =      f'{MOUNTPOINT}/retail/orders'
ORDER_ITEMS_DATA = f'{MOUNTPOINT}/retail/order_items'

In [0]:
# define the schema, corresponding to a line in the csv data file for Customer
customers_schema = StructType([
    StructField('customer_id',       IntegerType(), nullable=True),
    StructField('customer_fname',    StringType(), nullable=True),
    StructField('customer_lname',    StringType(), nullable=True),
    StructField('customer_email',    StringType(), nullable=True),
    StructField('customer_password', StringType(), nullable=True),
    StructField('customer_street',   StringType(), nullable=True),
    StructField('customer_city',     StringType(), nullable=True),
    StructField('customer_state',    StringType(), nullable=True),
    StructField('customer_zipcode',  StringType(), nullable=True)])

In [0]:
departments_schema = StructType([
    StructField('department_id',   IntegerType(), nullable=True),
    StructField('department_name', StringType(), nullable=True)])

In [0]:
categories_schema = StructType([
    StructField('category_id',            IntegerType(), nullable=True),
    StructField('category_department_id', IntegerType(), nullable=True),
    StructField('category_name',          StringType(), nullable=True)])

In [0]:
products_schema = StructType([
    StructField('product_id',          IntegerType(), nullable=True),
    StructField('product_category_id', IntegerType(), nullable=True),
    StructField('product_name',        StringType(), nullable=True),
    StructField('product_description', StringType(), nullable=True),
    StructField('product_price',       FloatType(), nullable=True),
    StructField('product_image',       StringType(), nullable=True)])

In [0]:
orders_schema = StructType([
    StructField('order_id',          IntegerType(), nullable=True),
    StructField('order_date',        StringType(), nullable=True),
    StructField('order_customer_id', IntegerType(), nullable=True),
    StructField('order_status',      StringType(), nullable=True)])

In [0]:
order_items_schema = StructType([
    StructField('order_item_id',            IntegerType(), nullable=True),
     StructField('order_item_order_id',      IntegerType(), nullable=True),
    StructField('order_item_product_id',    IntegerType(), nullable=True),
    StructField('order_item_quantity',      IntegerType(), nullable=True),
     StructField('order_item_subtotal',      FloatType(), nullable=True),
     StructField('order_item_product_price', FloatType(), nullable=True)])

In [0]:
customers_df = spark.read.csv(path=CUSTOMERS_DATA, schema=customers_schema, sep='|')
display(customers_df)

In [0]:
# Load data
customers_df = spark.read.csv(path=CUSTOMERS_DATA, schema=customers_schema, sep='|')
customers_df.cache()

departments_df = spark.read.csv(path=DEPARTMENTS_DATA, schema=departments_schema, sep='|')
departments_df.cache()

categories_df = spark.read.csv(path=CATEGORIES_DATA, schema=categories_schema, sep='|')
categories_df.cache()

products_df = spark.read.csv(path=PRODUCTS_DATA, schema=products_schema, sep='|')
products_df.cache()

orders_df = spark.read.csv(path=ORDERS_DATA, schema=orders_schema, sep='|')
orders_df.cache()

order_items_df = spark.read.csv(path=ORDER_ITEMS_DATA, schema=order_items_schema, sep='|')
order_items_df.cache()

In [0]:
customers_df.createOrReplaceTempView("customers")
orders_df.createOrReplaceTempView("orders")
order_items_df.createOrReplaceTempView("order_items")
products_df.createOrReplaceTempView("products")
categories_df.createOrReplaceTempView("categories")
departments_df.createOrReplaceTempView("departments")


In [0]:
customers_df.show(5, truncate=False)
orders_df.show(5)
order_items_df.show(5)
products_df.show(5)
categories_df.show(5)
departments_df.show(5)

In [0]:
query = """
          SELECT o.order_date, sum(oi.order_item_subtotal) / COUNT(DISTINCT oi.order_item_order_id) as avg_rev_per_day
          FROM orders o JOIN order_items oi 
              ON o.order_id = oi.order_item_order_id
          GROUP BY o.order_date 
          ORDER BY o.order_date
          """

display(spark.sql(query))

In [0]:
avg_rev_per_day = (orders_df.join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id)
     .select(['order_date', 'order_item_subtotal', 'order_item_order_id'])
     .groupBy('order_date')
     .agg((F.sum('order_item_subtotal') / F.countDistinct('order_item_order_id')).alias('avg_rev_per_day'))
     .orderBy('order_date'))

avg_rev_per_day.cache()

avg_rev_per_day.show()

In [0]:
avg_rev_per_month = (avg_rev_per_day
                     .select(F.month('order_date').alias('month'), 'avg_rev_per_day')
                     .groupBy('month')
                     .agg(F.avg('avg_rev_per_day').alias('avg_rev_per_month'))
                     .orderBy('month'))

avg_rev_per_month.cache()

In [0]:
avg_rev_per_month.show(truncate=False)

In [0]:
display(avg_rev_per_month)


In [0]:
%sql
SELECT o.order_date, sum(oi.order_item_subtotal) / COUNT(DISTINCT oi.order_item_order_id) as avg_rev_per_day
          FROM orders o JOIN order_items oi 
              ON o.order_id = oi.order_item_order_id
          GROUP BY o.order_date 
          ORDER BY o.order_date

In [0]:
avg_rev_per_month.unpersist()
avg_rev_per_day.unpersist()


In [0]:
query = """
            SELECT YEAR(o.order_date) as order_year, MONTH(o.order_date) as order_month, SUM(oi.order_item_subtotal) tot_revenue 
          FROM orders o JOIN order_items oi 
              ON o.order_id = oi.order_item_order_id
          GROUP BY order_year, order_month 
          ORDER BY order_year, order_month
"""

display(spark.sql(query))

In [0]:
tot_rev_per_month_per_year = (orders_df.join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id)
     .select([F.year('order_date').alias('order_year'), F.month('order_date').alias('order_month'), 'order_item_subtotal'])
     .groupBy(['order_year', 'order_month'])
     .agg(F.sum('order_item_subtotal').alias('tot_revenue'))
     .orderBy(['order_year', 'order_month']))


In [0]:
tot_rev_per_month_per_year.show()

In [0]:
%sql
SELECT YEAR(o.order_date) as order_year, MONTH(o.order_date) as order_month, SUM(oi.order_item_subtotal) tot_revenue 
          FROM orders o JOIN order_items oi 
              ON o.order_id = oi.order_item_order_id
          GROUP BY order_year, order_month 
          ORDER BY order_year, order_month

In [0]:
query = """
           SELECT d.department_name, YEAR(o.order_date) as order_year, SUM(oi.order_item_subtotal) as tot_revenue
           FROM orders o 
              INNER JOIN order_items oi 
                  ON o.order_id = oi.order_item_order_id
              INNER JOIN products p
                  ON oi.order_item_product_id = p.product_id
              INNER JOIN categories c
                  ON c.category_id = p.product_category_id
              INNER JOIN departments d
                  ON c.category_department_id = d.department_id
          WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD'
          GROUP BY d.department_name, order_year
          ORDER BY d.department_name, order_year
        """
display(spark.sql(query))

In [0]:
df = (orders_df
      .filter((orders_df.order_status != 'CANCELED') & (orders_df.order_status != 'SUSPECTED_FRAUD'))
      .join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id, how='inner')
      .join(products_df, order_items_df.order_item_product_id == products_df.product_id, how='inner')
      .join(categories_df, products_df.product_category_id == categories_df.category_id, how='inner')
      .join(departments_df, categories_df.category_department_id == departments_df.department_id, how='inner')
      .select('department_name', F.year(orders_df.order_date).alias('order_year'), 'order_item_subtotal')
      .groupBy([departments_df.department_name, 'order_year'])
      .agg(F.sum(order_items_df.order_item_subtotal).alias('tot_revenue'))
      .orderBy('department_name', 'order_year'))
df.show()

In [0]:
pivotDF = df.groupBy("department_name").pivot("order_year").sum("tot_revenue")
display(pivotDF)

In [0]:
monthmap = {1:"Jan", 2:"Feb", 3:"Mar",  4:"Apr", 5:"May", 6:"Jun", 7:"Jul", 8:"Aug", 9:"Sep", 10:"Oct", 11:"Nov", 12:"Dec"}
def function_map(m):
    return monthmap[m]

spark.udf.register("udfmonTomonth", lambda m: monthmap[m], StringType())

In [0]:
query = """
          SELECT q.* 
          FROM (
               SELECT r.*, DENSE_RANK() OVER (PARTITION by order_year, order_month ORDER BY product_revenue DESC) as dense_rank
               FROM (
                    SELECT YEAR(o.order_date) as order_year, udfmonTomonth(MONTH(o.order_date)) as order_month, p.product_name, ROUND(SUM(CAST(oi.order_item_subtotal as float)), 2) as product_revenue
                    FROM order_items oi 
                        INNER JOIN orders o 
                            ON oi.order_item_order_id = o.order_id
                        INNER JOIN products p
                            ON oi.order_item_product_id = p.product_id
                        WHERE o.order_status <> 'CANCELED' AND o.order_status <> 'SUSPECTED_FRAUD'
                        GROUP BY order_year, order_month, p.product_name ) r ) q
          WHERE q.dense_rank <= 5
          ORDER BY q.order_year, q.order_month, q.dense_rank
        """

display(spark.sql(query))

In [0]:
udfmonTomonth = udf(lambda m: monthmap[m], StringType())

In [0]:
rev_per_month_per_year_per_product = (orders_df
                         .select(F.year('order_date').alias('order_year'), udfmonTomonth(F.month('order_date')).alias('order_month'), 'order_id', 'order_status')
                         .where((col('order_status') != 'CANCELED') & (col('order_status') != 'SUSPECTED_FRAUD'))
                         .join(order_items_df, orders_df.order_id == order_items_df.order_item_order_id, how='inner')
                         .join(products_df, order_items_df.order_item_product_id == products_df.product_id, how='inner')
                         .select(['order_year', 'order_month', 'product_name', 'order_item_subtotal'])
                         .groupBy(['order_year', 'order_month', 'product_name'])
                         .agg(F.round(F.sum('order_item_subtotal'), 2).alias('product_revenue')))
rev_per_month_per_year_per_product.show()

In [0]:
query = """
          SELECT c.category_name, COUNT(order_item_quantity) as order_count 
          FROM order_items oi 
          INNER JOIN products p on oi.order_item_product_id = p.product_id 
          INNER JOIN categories c on c.category_id = p.product_category_id 
          GROUP BY c.category_name 
          ORDER BY order_count DESC 
          LIMIT 10 
        """
display(spark.sql(query))

In [0]:
pop_cat = (order_items_df
 .join(products_df, order_items_df.order_item_product_id == products_df.product_id, how='inner')
 .join(categories_df, categories_df.category_id == products_df.product_category_id, how='inner')
 .groupBy('category_name')
 .agg(F.sum('order_item_quantity').alias('order_count'))
 .orderBy('order_count', ascending=False)
 .limit(10))
pop_cat.show()

In [0]:
q = """
        SELECT customer_id, customer_fname, customer_lname, customer_email, sum(order_item_quantity) as quantity_item_total, sum(order_item_subtotal)as total
        FROM
            customers as c
        INNER JOIN
            orders as o
            ON c.customer_id = o.order_customer_id
        INNER JOIN
           order_items as oi
            ON o.order_id = oi.order_item_order_id
        WHERE order_status <> 'CANCELED'
        GROUP BY customer_id, customer_fname, customer_lname, customer_email
        ORDER BY  total DESC
        LIMIT 20
    """
display(spark.sql(q))

In [0]:
q = """
        SELECT
            ca.category_name, sum(order_item_quantity) as item_quantity, cast(sum(order_item_subtotal) AS INT )as total
        FROM order_items as oi
        INNER JOIN
            products as p
            ON oi.order_item_product_id = p.product_id
        INNER JOIN
            categories as ca
            ON p.product_category_id = ca.category_id
        GROUP BY ca.category_name
      """
display(spark.sql(q))

In [0]:
q = """
        SELECT
            customer_city, category_name
        FROM (SELECT
            customer_city, category_name, count(category_name) as quantity, DENSE_RANK () OVER ( 
                        PARTITION BY customer_city 
                        ORDER BY count(category_name) DESC
                    ) rank
            FROM
                customers as c
            INNER JOIN
                orders as o
                ON c.customer_id = o.order_customer_id
            INNER JOIN
                order_items as oi
                ON o.order_id = oi.order_item_order_id
            INNER JOIN
                products as p
                ON oi.order_item_product_id = p.product_id
            INNER JOIN
                categories as ca
                ON p.product_category_id = ca.category_id
            GROUP BY customer_city, category_name
            ) t
            WHERE rank = 1
       """

display(spark.sql(q))

In [0]:
q = """
        SELECT
            customer_city, product_name, quantity, total
            FROM (SELECT
                customer_city, product_name,sum(order_item_quantity) as quantity,sum(order_item_subtotal) as total, DENSE_RANK () OVER ( 
                            PARTITION BY customer_city 
                            ORDER BY sum(order_item_quantity) DESC
                        ) rank
                FROM
                    customers as c
                INNER JOIN
                    orders as o
                    ON c.customer_id = o.order_customer_id
                INNER JOIN
                    order_items as oi
                    ON o.order_id = oi.order_item_order_id
                INNER JOIN
                    products as p
                    ON oi.order_item_product_id = p.product_id
                INNER JOIN
                    categories as ca
                    ON p.product_category_id = ca.category_id
                GROUP BY customer_city, product_name
                ) t
        WHERE rank < 6
        ORDER BY quantity DESC
     """
display(spark.sql(q))