In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *

In [0]:
landing_path = '/mnt/proyect/landing/'
bronze_path = '/mnt/proyect/bronze/'
silver_path = '/mnt/proyect/silver/'
gold_path = '/mnt/proyect/gold/'

In [0]:
query = '''
SELECT
    customer_id, customer_fname, customer_lname, customer_email,
    sum(order_item_quantity) as quantity_item_total,
    sum(order_item_subtotal) as total
FROM
    delta.`{}/customers` as c
INNER JOIN
    delta.`{}/orders` as o
    ON c.customer_id = o.order_customer_id
INNER JOIN
    delta.`{}/order_items` as oi
    ON o.order_id = oi.order_item_order_id
WHERE
    order_status <> 'CANCELED'
GROUP BY
    customer_id, customer_fname, customer_lname, customer_email
ORDER BY
    total DESC
LIMIT 20
'''.format(bronze_path, bronze_path, bronze_path)

statement1 = spark.sql(query)


In [0]:
display(statement1)

customer_id,customer_fname,customer_lname,customer_email,quantity_item_total,total
791,Mary,Smith,XXXXXXXXX,82,10524.170177459717
8766,Mary,Duncan,XXXXXXXXX,87,9296.140186309814
1657,Betty,Phillips,XXXXXXXXX,111,9223.710151672363
2641,Betty,Spears,XXXXXXXXX,90,9130.920223236084
1288,Evelyn,Thompson,XXXXXXXXX,89,9019.11019897461
3710,Ashley,Smith,XXXXXXXXX,89,9019.10020828247
5654,Jerry,Smith,XXXXXXXXX,103,8904.950210571289
5624,Mary,Mata,XXXXXXXXX,89,8761.980182647705
5715,Kelly,Smith,XXXXXXXXX,99,8595.130157470703
664,Bobby,Jimenez,XXXXXXXXX,76,8394.260208129883


In [0]:
statement1.write.mode("overwrite").format("delta").save(f"{silver_path}/statement1")

In [0]:
query = '''
    SELECT
                    ca.category_name, sum(order_item_quantity) as item_quantity, cast(sum(order_item_subtotal) AS INT )as total
                FROM delta.`{}/order_items` as oi
                INNER JOIN
                    delta.`{}/products` as p
                    ON oi.order_item_product_id = p.product_id
                INNER JOIN
                    delta.`{}/categories` as ca
                    ON p.product_category_id = ca.category_id
                GROUP BY ca.category_name
'''.format(bronze_path, bronze_path, bronze_path)

statement2 = spark.sql(query)

In [0]:
display(statement2)

category_name,item_quantity,total
Camping & Hiking,13729,4118425
Fitness Accessories,856,35601
Golf Shoes,1444,107998
Men's Footwear,22246,2891757
Electronics,9436,371034
Women's Apparel,62956,3147800
Girls' Apparel,3615,151706
Boxing & MMA,1265,85205
Baseball & Softball,1785,94057
Hunting & Shooting,1239,56848


In [0]:
statement2.write.mode("overwrite").format("delta").save(f"{silver_path}/statement2")

In [0]:
#revisado revisado  vi que quantity no estaba siendo mapeado 
query = '''
    SELECT
                customer_city, category_name
                FROM (SELECT
                    customer_city, category_name, count(category_name) as quantity, DENSE_RANK () OVER ( 
                                PARTITION BY customer_city 
                                ORDER BY count(category_name) DESC
                            ) rank
                    FROM
                        delta.`{}/customers` as c
                    INNER JOIN
                        delta.`{}/orders` as o
                        ON c.customer_id = o.order_customer_id
                    INNER JOIN
                        delta.`{}/order_items` as oi
                        ON o.order_id = oi.order_item_order_id
                    INNER JOIN
                        delta.`{}/products` as p
                        ON oi.order_item_product_id = p.product_id
                    INNER JOIN
                        delta.`{}/categories` as ca
                        ON p.product_category_id = ca.category_id
                    GROUP BY customer_city, category_name
                    ) t
            WHERE rank = 1
'''.format(bronze_path, bronze_path, bronze_path, bronze_path, bronze_path)

statement3 = spark.sql(query)

In [0]:
display(statement3)

customer_city,category_name
Aguadilla,Cleats
Alameda,Cleats
Albany,Women's Apparel
Albuquerque,Cleats
Algonquin,Cleats
Alhambra,Women's Apparel
Allentown,Cleats
Alpharetta,Men's Footwear
Amarillo,Cleats
Amarillo,Indoor/Outdoor Games


In [0]:
statement3.write.mode("overwrite").format("delta").save(f"{silver_path}/statement3")

In [0]:
query = '''
SELECT
    customer_city, product_name, quantity, total
FROM (
    SELECT
        customer_city, product_name,
        sum(order_item_quantity) as quantity,
        sum(order_item_subtotal) as total,
        DENSE_RANK() OVER (PARTITION BY customer_city ORDER BY sum(order_item_quantity) DESC) as rank
    FROM
        delta.`{}/customers` as c
    INNER JOIN
        delta.`{}/orders` as o
        ON c.customer_id = o.order_customer_id
    INNER JOIN
        delta.`{}/order_items` as oi
        ON o.order_id = oi.order_item_order_id
    INNER JOIN
        delta.`{}/products` as p
        ON oi.order_item_product_id = p.product_id
    INNER JOIN
        delta.`{}/categories` as ca
        ON p.product_category_id = ca.category_id
    GROUP BY
        customer_city, product_name
) t
WHERE rank < 6
ORDER BY quantity DESC
'''.format(bronze_path, bronze_path, bronze_path, bronze_path, bronze_path)

statement4 = spark.sql(query)

In [0]:
display(statement4)

In [0]:
statement4.write.mode("overwrite").format("delta").save(f"{silver_path}/statement4")
