In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder \
    .appName("Vendedores Japão") \
    .getOrCreate()

In [0]:
cancelados = """
SELECT 
    c.country, 
    SUM(od.quantity_Ordered) AS total_cancelled_items
FROM hive_metastore. default.orders o
JOIN hive_metastore. default.orderdetails od ON o.order_Number = od.order_Number
JOIN hive_metastore. default.customers c ON o.customer_Number = c.customer_Number
WHERE 
    o.status = 'Cancelled'
GROUP BY 
    c.country
ORDER BY 
    total_cancelled_items DESC
LIMIT 1;
"""

In [0]:
cancelados_df = spark.sql(cancelados)

In [0]:
cancelados_output_path = "/mnt/delta/itens_cancelados_pais"
cancelados_df.write.format("delta").mode("overwrite").save(cancelados_output_path)

In [0]:
#############################################################################################################################

In [0]:
faturameto = """
SELECT 
    pl.product_line, 
    SUM(od.quantity_Ordered * od.price_Each) AS total_revenue
FROM hive_metastore.default.orders  o
JOIN hive_metastore.default.orderdetails  od ON o.order_Number = od.order_Number
JOIN hive_metastore.default.products p ON od.product_Code = p.product_Code
JOIN product_lines pl ON p.product_Line = pl.product_Line
WHERE 
    o.status = 'Shipped'
    AND YEAR(o.order_Date) = 2005
GROUP BY 
    pl.product_line
ORDER BY 
    total_revenue DESC
LIMIT 1;
"""

In [0]:
faturamento_df = spark.sql(faturameto)

In [0]:
faturamento_output_path = "/mnt/delta/faturamento_linha_produto"
faturamento_df.write.format("delta").mode("overwrite").save(faturamento_output_path)

In [0]:
##################################################################################################################################################################

In [0]:
vendedores_japao = """
SELECT 
    e.first_Name AS nome, 
    e.last_Name AS sobrenome, 
    CONCAT('*****@', SUBSTRING_INDEX(e.email, '@', -1)) AS email_mascarado
FROM 
    hive_metastore.default.employees e
WHERE 
    e.office_Code IN (
        SELECT office_Code 
        FROM offices 
        WHERE city = 'Tokyo'
    )
ORDER BY 
    e.last_Name, e.first_Name;
"""

In [0]:
vendedores_df = spark.sql(vendedores_japao)

In [0]:
vendedores_output_path = "/mnt/delta/vendedores_japao"
vendedores_df.write.format("delta").mode("overwrite").save(vendedores_output_path)


In [0]:
spark.stop()