# Data Engineer with Azure
## Datapath Project
## Author: Gael Velasquez

In [1]:
spark

StatementMeta(DatapathSpark, 7, 1, Finished, Available)

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, BooleanType
from pyspark.sql.functions import substring, col

StatementMeta(DatapathSpark, 7, 2, Finished, Available)

In [3]:
bronze_path = 'abfss://projectcontainer@datapathproject.dfs.core.windows.net/bronze'
silver_path = 'abfss://projectcontainer@datapathproject.dfs.core.windows.net/silver'
gold_path = 'abfss://projectcontainer@datapathproject.dfs.core.windows.net/gold'

StatementMeta(DatapathSpark, 7, 3, Finished, Available)

## Reading Silver Layer

In [4]:
ddepartments = spark.read.format("delta").load(f"{silver_path}/departments")
dcategories = spark.read.format("delta").load(f"{silver_path}/categories")
dproducts = spark.read.format("delta").load(f"{silver_path}/products")
dorder_items = spark.read.format("delta").load(f"{silver_path}/order_items")
dorders = spark.read.format("delta").load(f"{silver_path}/orders")
dcustomers = spark.read.format("delta").load(f"{silver_path}/customer")

StatementMeta(DatapathSpark, 7, 4, Finished, Available)

In [5]:
ddepartments.createOrReplaceTempView("departments")
dcategories.createOrReplaceTempView("categories")
dproducts.createOrReplaceTempView("products")
dorder_items.createOrReplaceTempView("order_items")
dorders.createOrReplaceTempView("orders")
dcustomers.createOrReplaceTempView("customers")

StatementMeta(DatapathSpark, 7, 5, Finished, Available)

## Making Transformations

#### Report_1: Cantidad de ordenes realizadas agrupadas por customer (concatenar firstname y last name = full name) y ordenados por la cantidad de ordenes descendentemente

In [6]:
R1 = spark.sql(
  """
  SELECT customer_full_name, count(order_id) AS Total_Orders
  FROM(
  SELECT customer_id, CONCAT(customer_fname,' ',customer_lname) AS customer_full_name, orders.order_id 
  FROM customers
  LEFT OUTER JOIN orders ON orders.order_customer_id = customers.customer_id)
  GROUP BY customer_full_name
  ORDER BY Total_Orders desc
  """)

StatementMeta(DatapathSpark, 7, 6, Finished, Available)

In [7]:
R1.write.mode("overwrite").format("delta").save(f"{gold_path}/R1")

StatementMeta(DatapathSpark, 7, 7, Finished, Available)

### R2: Obtener la cantidad de productos vendidos agrupando año, mes y por categoria, mostrando el nombre de la categoria, la cantidad, el periodo (yyyyMM)

In [8]:
R2 = spark.sql(
  """
  SELECT year_order, month_order, categories.category_name, SUM(order_item_quantity)AS Total_Quantity
  FROM(
  SELECT year_order, month_order, products.product_category_id, order_item_quantity
  FROM(
  SELECT year(order_date) AS year_order, month(order_date) AS month_order, order_items.order_item_product_id, order_items.order_item_quantity 
  FROM orders
  INNER JOIN order_items ON orders.order_id = order_items.order_item_order_id
  WHERE order_status <> 'CANCELED')
  LEFT OUTER JOIN products ON product_id = order_item_product_id)
  LEFT OUTER JOIN categories ON category_id = product_category_id
  GROUP BY year_order, month_order, category_name
  ORDER BY year_order, month_order, category_name
  """)

StatementMeta(DatapathSpark, 7, 8, Finished, Available)

In [9]:
R2.write.mode("overwrite").format("delta").save(f"{gold_path}/R2")

StatementMeta(DatapathSpark, 7, 9, Finished, Available)

### R3: Ingresos promedio (Dia, Mes, Año)

In [10]:
R31 = spark.sql(
  """
  SELECT year(order_date) AS year_order, mean(order_item_subtotal) AS Income_Mean
  FROM orders
  INNER JOIN order_items ON orders.order_id = order_items.order_item_order_id
  WHERE order_status <> 'CANCELED'
  GROUP BY year_order 
  ORDER BY year_order
  """)

R32 = spark.sql(
  """
  SELECT month(order_date) AS month_order, mean(order_item_subtotal) AS Income_Mean
  FROM orders
  INNER JOIN order_items ON orders.order_id = order_items.order_item_order_id
  WHERE order_status <> 'CANCELED'
  GROUP BY month_order
  ORDER BY month_order
  """)

R33 = spark.sql(
  """
  SELECT day(order_date) AS day_order, mean(order_item_subtotal) AS Income_Mean
  FROM orders
  INNER JOIN order_items ON orders.order_id = order_items.order_item_order_id
  WHERE order_status <> 'CANCELED'
  GROUP BY day_order
  ORDER BY day_order
  """)

StatementMeta(DatapathSpark, 7, 10, Finished, Available)

In [11]:
R31.write.mode("overwrite").format("delta").save(f"{gold_path}/R31")
R32.write.mode("overwrite").format("delta").save(f"{gold_path}/R32")
R33.write.mode("overwrite").format("delta").save(f"{gold_path}/R33")

StatementMeta(DatapathSpark, 7, 11, Finished, Available)

### R4: Obtenga todos los pedidos CANCELADOS con monto superior a $ 1000

In [12]:
R4 = spark.sql(
  """
  SELECT *
  FROM(
  SELECT order_id, order_status, sum(order_item_subtotal) AS Total_Income
  FROM orders
  INNER JOIN order_items ON orders.order_id = order_items.order_item_order_id
  WHERE order_status = 'CANCELED'
  GROUP BY order_status, order_id
  ORDER BY Total_Income)
  WHERE Total_Income > 1000
  """)

StatementMeta(DatapathSpark, 7, 12, Finished, Available)

In [13]:
R4.write.mode("overwrite").format("delta").save(f"{gold_path}/R4")

StatementMeta(DatapathSpark, 7, 13, Finished, Available)

### R5: Obtenga los ingresos de cada categoría por año por trimestre

In [14]:
R5 = spark.sql(
  """
  SELECT year_order, quarter_order, category_name, SUM(order_item_subtotal) AS Total_Income
  FROM (SELECT year_order, quarter_order, order_item_subtotal, product_category_id
  FROM(SELECT year(order_date) AS year_order, quarter(order_date) AS quarter_order, order_item_product_id, order_item_subtotal
  FROM orders
  INNER JOIN order_items ON orders.order_id = order_items.order_item_order_id
  WHERE order_status <> 'CANCELED')
  LEFT OUTER JOIN products ON order_item_product_id = product_id)
  LEFT OUTER JOIN categories ON product_category_id = category_id
  GROUP BY year_order, quarter_order, category_name
  ORDER BY year_order, quarter_order, category_name
  """)

StatementMeta(DatapathSpark, 7, 14, Finished, Available)

In [15]:
R5.write.mode("overwrite").format("delta").save(f"{gold_path}/R5")

StatementMeta(DatapathSpark, 7, 15, Finished, Available)