### Importação das bibliotecas

In [0]:

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, sum, regexp_replace, when, regexp_replace,  year, expr


In [0]:

spark = SparkSession.builder \
    .appName("Análise de Dados Database Type Ecommerce UI Bakery") \
    .getOrCreate()

### Funções

In [0]:
def exibir_info_df(df: DataFrame) -> None:
    """
    Imprime o número de linhas, número de colunas e o esquema do DataFrame fornecido.

    Parâmetros:
        df (DataFrame): O DataFrame a ser analisado.

    Returns:
        None
    """

    print(f'\nLinhas = {df.count()} \nColunas = {len(df.columns)}\n')
    df.printSchema()


### Perguntas

####  - Qual país possui a maior quantidade de itens cancelados?

Dataframes abordados para essa questão:
- Orders
- Orders Details
- Customers

In [0]:
df_orders = spark.read.format('delta').load('/join/silver/df_orders')
exibir_info_df(df_orders)

df_orderdetails = spark.read.format('delta').load('/join/silver/df_orderdetails')
exibir_info_df(df_orderdetails)

df_customers = spark.read.format('delta').load('/join/silver/df_customers')
exibir_info_df(df_customers)


Linhas = 329 
Colunas = 7

root
 |-- order_number: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- required_date: date (nullable = true)
 |-- shipped_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- customer_number: integer (nullable = true)


Linhas = 2997 
Colunas = 5

root
 |-- order_number: integer (nullable = true)
 |-- product_code: string (nullable = true)
 |-- quantity_ordered: integer (nullable = true)
 |-- price_each: decimal(10,2) (nullable = true)
 |-- order_line_number: short (nullable = true)


Linhas = 122 
Colunas = 12

root
 |-- customer_number: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- contact_last_name: string (nullable = true)
 |-- contact_first_name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address_line1: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: s

In [0]:
df_orders.filter('status == "Cancelled"').display()

order_number,order_date,required_date,shipped_date,status,comments,customer_number
10167,2003-10-23,2003-10-30,2023-01-01,Cancelled,Customer called to cancel. The warehouse was notified in time and the order didn't ship. They have a new VP of Sales and are shifting their sales model. Our VP of Sales should contact them.,448
10179,2003-11-11,2003-11-17,2003-11-13,Cancelled,Customer cancelled due to urgent budgeting issues. Must be cautious when dealing with them in the future. Since order shipped already we must discuss who would cover the shipping charges.,496
10248,2004-05-07,2004-05-14,2023-01-01,Cancelled,Order was mistakenly placed. The warehouse noticed the lack of documentation.,131
10253,2004-06-01,2004-06-09,2004-06-02,Cancelled,"Customer disputed the order and we agreed to cancel it. We must be more cautions with this customer going forward, since they are very hard to please. We must cover the shipping fees.",201
10260,2004-06-16,2004-06-22,2023-01-01,Cancelled,Customer heard complaints from their customers and called to cancel this order. Will notify the Sales Manager.,357
10262,2004-06-24,2004-07-01,2023-01-01,Cancelled,This customer found a better offer from one of our competitors. Will call back to renegotiate.,141


In [0]:
orders_cancelled = df_orders.filter('status == "Cancelled"')
orders_cancelled.display()

order_number,order_date,required_date,shipped_date,status,comments,customer_number
10167,2003-10-23,2003-10-30,2023-01-01,Cancelled,Customer called to cancel. The warehouse was notified in time and the order didn't ship. They have a new VP of Sales and are shifting their sales model. Our VP of Sales should contact them.,448
10179,2003-11-11,2003-11-17,2003-11-13,Cancelled,Customer cancelled due to urgent budgeting issues. Must be cautious when dealing with them in the future. Since order shipped already we must discuss who would cover the shipping charges.,496
10248,2004-05-07,2004-05-14,2023-01-01,Cancelled,Order was mistakenly placed. The warehouse noticed the lack of documentation.,131
10253,2004-06-01,2004-06-09,2004-06-02,Cancelled,"Customer disputed the order and we agreed to cancel it. We must be more cautions with this customer going forward, since they are very hard to please. We must cover the shipping fees.",201
10260,2004-06-16,2004-06-22,2023-01-01,Cancelled,Customer heard complaints from their customers and called to cancel this order. Will notify the Sales Manager.,357
10262,2004-06-24,2004-07-01,2023-01-01,Cancelled,This customer found a better offer from one of our competitors. Will call back to renegotiate.,141


- Solução utilizando SQL

Criando tabelas temporárias

In [0]:

orders_cancelled.createOrReplaceTempView('cancel')
df_orderdetails.createOrReplaceTempView('details')
df_customers.createOrReplaceTempView('customer')

In [0]:
%sql

SELECT 
  c.order_number,
  customer.customer_number,
  customer.country,
  SUM(d.quantity_ordered) AS cont_quantity_ordered
FROM details AS d
INNER JOIN cancel AS c
INNER JOIN customer
ON d.order_number = c.order_number AND customer.customer_number = c.customer_number
GROUP BY c.order_number, customer.customer_number, customer.country
ORDER BY cont_quantity_ordered DESC 

order_number,customer_number,country,cont_quantity_ordered
10262,141,Spain,605
10167,448,Sweden,550
10248,131,USA,454
10253,201,UK,429
10260,357,New Zealand,308
10179,496,New Zealand,288


In [0]:

%sql

WITH cte_tabela AS (
  SELECT 
    c.order_number,
    cl.customer_number,
    cl.country,
    SUM(d.quantity_ordered) AS cont_quantity_ordered
  FROM details AS d
  INNER JOIN cancel AS c
  INNER JOIN customer AS cl
  ON d.order_number = c.order_number AND cl.customer_number = c.customer_number
  GROUP BY c.order_number, cl.customer_number, cl.country
  ORDER BY cont_quantity_ordered DESC 
)
SELECT 
   country AS pais,
   SUM(cont_quantity_ordered) AS contagem_itens_cancelados
FROM cte_tabela
GROUP BY pais
ORDER BY contagem_itens_cancelados DESC

pais,contagem_itens_cancelados
Spain,605
New Zealand,596
Sweden,550
USA,454
UK,429


In [0]:

%sql

WITH cte_tabela AS (
  SELECT 
    c.order_number,
    cl.customer_number,
    cl.country,
    SUM(d.quantity_ordered) AS cont_quantity_ordered
  FROM details AS d
  INNER JOIN cancel AS c
  INNER JOIN customer AS cl
  ON d.order_number = c.order_number AND cl.customer_number = c.customer_number
  GROUP BY c.order_number, cl.customer_number, cl.country
  ORDER BY cont_quantity_ordered DESC 
)
SELECT 
   country AS pais,
   SUM(cont_quantity_ordered) AS contagem_itens_cancelados
FROM cte_tabela
GROUP BY pais
ORDER BY contagem_itens_cancelados DESC

pais,contagem_itens_cancelados
Spain,605
New Zealand,596
Sweden,550
USA,454
UK,429


Databricks visualization. Run in Databricks to view.

Consulta final apresentando o resultado

In [0]:
%sql

WITH cte_tabela AS (
  SELECT 
    c.order_number,
    cl.customer_number,
    cl.country,
    SUM(d.quantity_ordered) AS cont_quantity_ordered
  FROM details AS d
  INNER JOIN cancel AS c
  INNER JOIN customer AS cl
  ON d.order_number = c.order_number AND cl.customer_number = c.customer_number
  GROUP BY c.order_number, cl.customer_number, cl.country
  ORDER BY cont_quantity_ordered DESC 
)
SELECT 
   country AS pais,
   SUM(cont_quantity_ordered) AS contagem_itens_cancelados
FROM cte_tabela
GROUP BY pais
ORDER BY contagem_itens_cancelados DESC
LIMIT 1

pais,contagem_itens_cancelados
Spain,605


- Solução utilizando PySpark

In [0]:
df_q1 = orders_cancelled.join(df_orderdetails, orders_cancelled.order_number == df_orderdetails.order_number, 'INNER')\
                             .join(df_customers, df_customers.customer_number == orders_cancelled.customer_number, 'INNER')\
                             .select(df_customers.country, df_orderdetails.quantity_ordered)

In [0]:
df_q1.display()

country,quantity_ordered
Sweden,44
Sweden,43
Sweden,46
Sweden,34
Sweden,33
Sweden,21
Sweden,20
Sweden,32
Sweden,29
Sweden,43


In [0]:
df_q1.groupBy('country')\
     .agg(sum('quantity_ordered').alias('qtd_itens_devolvidos'))\
     .orderBy(col('qtd_itens_devolvidos').desc())\
     .display()

country,qtd_itens_devolvidos
Spain,605
New Zealand,596
Sweden,550
USA,454
UK,429


In [0]:
df_q1 = df_q1.groupBy('country')\
             .agg(sum('quantity_ordered').alias('qtd_itens_devolvidos'))\
             .orderBy(col('qtd_itens_devolvidos').desc())\
             .limit(1)

In [0]:
df_q1.display()

country,qtd_itens_devolvidos
Spain,605


Salvando o resultado

In [0]:
df_q1.write.mode('overwrite').format('delta').option('mergeSchema', 'True').save('/join/gold/resposta_q1')

> <b>Resposta</b>
-  O país com maior quantidade de itens cancelados é a Espanha com 605 itens devolvidos

#### - Qual o faturamento da linha de produto mais vendido, considere os itens com status 'Shipped', cujo o pedido foi realizado no ano de 2005

Dataframes abordados para essa questão:
- Orders
- Orders Details
- Products

In [0]:
df_orders = spark.read.format('delta').load('/join/silver/df_orders')
exibir_info_df(df_orders)

df_orderdetails = spark.read.format('delta').load('/join/silver/df_orderdetails')
exibir_info_df(df_orderdetails)

df_products = spark.read.format('delta').load('/join/silver/df_products')
exibir_info_df(df_products)


Linhas = 329 
Colunas = 7

root
 |-- order_number: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- required_date: date (nullable = true)
 |-- shipped_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- customer_number: integer (nullable = true)


Linhas = 2997 
Colunas = 5

root
 |-- order_number: integer (nullable = true)
 |-- product_code: string (nullable = true)
 |-- quantity_ordered: integer (nullable = true)
 |-- price_each: decimal(10,2) (nullable = true)
 |-- order_line_number: short (nullable = true)


Linhas = 110 
Colunas = 9

root
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_line: string (nullable = true)
 |-- product_scale: string (nullable = true)
 |-- product_vendor: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- quantity_in_stock: short (nullable = true)
 |-- buy_price: decimal(10,2) (nullable = tru

In [0]:
df_orders.filter(expr('status = "Shipped" AND year(shipped_date) = 2005')).display()

order_number,order_date,required_date,shipped_date,status,comments,customer_number
10362,2005-01-05,2005-01-16,2005-01-10,Shipped,No comments,161
10363,2005-01-06,2005-01-12,2005-01-10,Shipped,No comments,334
10364,2005-01-06,2005-01-17,2005-01-09,Shipped,No comments,350
10365,2005-01-07,2005-01-18,2005-01-11,Shipped,No comments,320
10366,2005-01-10,2005-01-19,2005-01-12,Shipped,No comments,381
10368,2005-01-19,2005-01-27,2005-01-24,Shipped,Can we renegotiate this one?,124
10369,2005-01-20,2005-01-28,2005-01-24,Shipped,No comments,379
10370,2005-01-20,2005-02-01,2005-01-25,Shipped,No comments,276
10371,2005-01-23,2005-02-03,2005-01-25,Shipped,No comments,124
10372,2005-01-26,2005-02-05,2005-01-28,Shipped,No comments,398


In [0]:
df_orders_q2 = df_orders.filter(expr('status = "Shipped" AND year(shipped_date) = 2005'))
df_orders_q2.display()

order_number,order_date,required_date,shipped_date,status,comments,customer_number
10362,2005-01-05,2005-01-16,2005-01-10,Shipped,No comments,161
10363,2005-01-06,2005-01-12,2005-01-10,Shipped,No comments,334
10364,2005-01-06,2005-01-17,2005-01-09,Shipped,No comments,350
10365,2005-01-07,2005-01-18,2005-01-11,Shipped,No comments,320
10366,2005-01-10,2005-01-19,2005-01-12,Shipped,No comments,381
10368,2005-01-19,2005-01-27,2005-01-24,Shipped,Can we renegotiate this one?,124
10369,2005-01-20,2005-01-28,2005-01-24,Shipped,No comments,379
10370,2005-01-20,2005-02-01,2005-01-25,Shipped,No comments,276
10371,2005-01-23,2005-02-03,2005-01-25,Shipped,No comments,124
10372,2005-01-26,2005-02-05,2005-01-28,Shipped,No comments,398


- Solução utilizando SQL

Criando tabelas temporárias

In [0]:
df_orders_q2.createOrReplaceTempView('ordersq2')
df_orderdetails.createOrReplaceTempView('details')
df_products.createOrReplaceTempView('products')

In [0]:
%sql
WITH cte_tabela AS (
  SELECT 
    * 
  FROM details AS d
  INNER JOIN ordersq2 AS o 
  ON d.order_number = o.order_number
)
SELECT 
  p.product_line,
  SUM(c.price_each) AS faturamento
FROM cte_tabela AS c
INNER JOIN products AS p
ON c.product_code = p.product_code
GROUP BY p.product_line
ORDER BY faturamento DESC
-- LIMIT(1)

product_line,faturamento
Classic Cars,15559.72
Vintage Cars,5861.56
Motorcycles,5488.5
Trucks and Buses,4073.94
Planes,3165.93
Ships,1746.82
Trains,705.81


In [0]:
%sql
WITH cte_tabela AS (
  SELECT 
    * 
  FROM details AS d
  INNER JOIN ordersq2 AS o 
  ON d.order_number = o.order_number
)
SELECT 
  p.product_line,
  SUM(c.price_each) AS faturamento
FROM cte_tabela AS c
INNER JOIN products AS p
ON c.product_code = p.product_code
GROUP BY p.product_line
ORDER BY faturamento DESC


product_line,faturamento
Classic Cars,15559.72
Vintage Cars,5861.56
Motorcycles,5488.5
Trucks and Buses,4073.94
Planes,3165.93
Ships,1746.82
Trains,705.81


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
WITH cte_tabela AS (
  SELECT 
    * 
  FROM details AS d
  INNER JOIN ordersq2 AS o 
  ON d.order_number = o.order_number
)
SELECT 
  p.product_line,
  SUM(c.price_each) AS faturamento
FROM cte_tabela AS c
INNER JOIN products AS p
ON c.product_code = p.product_code
GROUP BY p.product_line
ORDER BY faturamento DESC
LIMIT(1)

product_line,faturamento
Classic Cars,15559.72


- Solução utilizando PySpark

In [0]:
df_orderdetails.join(df_orders_q2, df_orderdetails.order_number == df_orders_q2.order_number, 'INNER')\
               .select(df_orderdetails.price_each, df_orderdetails.product_code).display()

price_each,product_code
182.04,S10_4698
131.04,S12_2823
53.91,S18_2625
91.29,S24_1578
180.95,S12_1099
106.87,S12_3380
68.63,S12_3990
103.64,S12_4675
61.6,S18_1889
69.15,S18_3278


In [0]:
df_q2 = df_orderdetails.join(df_orders_q2, df_orderdetails.order_number == df_orders_q2.order_number, 'INNER')\
                       .select(df_orderdetails.price_each, df_orderdetails.product_code)


df_q2 = df_q2.join(df_products, df_q2.product_code == df_products.product_code, 'INNER') 

In [0]:
df_q2 = df_q2.groupBy('product_line')\
             .agg(sum('price_each').alias('faturamento'))\
             .orderBy(col('faturamento').desc())\
             .limit(1)

In [0]:
df_q2.display()

product_line,faturamento
Classic Cars,15559.72


> <b>Resposta</b>
-  O faturamento da linha de produto mais vendido no ano de 2005 e com o status Shipped é $ 15.559,72

Salvando o resultado

In [0]:
df_q2.write.mode('overwrite').format('delta').option('mergeSchema', 'True').save('/join/gold/resposta_q2')

#### - Nome, sobrenome e email dos vendedores do Japão, o local-part do e-mail deve estar mascarado.

- Solução utilizando SQL

Dataframes abordados para essa questão:
- Employees
- Offices

In [0]:
df_employees = spark.read.format('delta').load('/join/silver/df_employees')
exibir_info_df(df_employees)

df_offices = spark.read.format('delta').load('/join/silver/df_offices')
exibir_info_df(df_offices)


Linhas = 23 
Colunas = 8

root
 |-- employee_number: integer (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- extension: string (nullable = true)
 |-- email: string (nullable = true)
 |-- office_code: string (nullable = true)
 |-- reports_to: integer (nullable = true)
 |-- job_Title: string (nullable = true)


Linhas = 7 
Colunas = 9

root
 |-- office_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address_line1: string (nullable = true)
 |-- address_line2: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- territory: string (nullable = true)



In [0]:
df_employees.display()

employee_number,last_name,first_name,extension,email,office_code,reports_to,job_Title
1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,0,President
1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002,VP Sales
1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002,VP Marketing
1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056,Sales Manager (APAC)
1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056,Sale Manager (EMEA)
1143,Bow,Anthony,x5428,abow@classicmodelcars.com,1,1056,Sales Manager (NA)
1165,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1,1143,Sales Rep
1166,Thompson,Leslie,x4065,lthompson@classicmodelcars.com,1,1143,Sales Rep
1188,Firrelli,Julie,x2173,jfirrelli@classicmodelcars.com,2,1143,Sales Rep
1216,Patterson,Steve,x4334,spatterson@classicmodelcars.com,2,1143,Sales Rep


In [0]:
df_offices.display()

office_code,city,phone,address_line1,address_line2,state,country,postal_code,territory
1,San Francisco,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
2,Boston,+1 215 837 0825,1550 Court Place,Suite 102,MA,USA,02107,
3,NYC,+1 212 555 3000,523 East 53rd Street,apt. 5A,NY,USA,10022,
4,Paris,+33 14 723 4404,43 Rue Jouffroy D'abbans,Uninformed,capital,France,75017,EMEA
5,Tokyo,+81 33 224 5000,4-1 Kioicho,Uninformed,Chiyoda-Ku,Japan,102-8578,Japan
6,Sydney,+61 2 9264 2451,5-11 Wentworth Avenue,Floor #2,capital,Australia,NSW 2010,APAC
7,London,+44 20 7877 2041,25 Old Broad Street,Level 7,capital,UK,EC2N 1HN,EMEA


In [0]:

df_office_japan = df_offices.filter('country = "Japan"')
df_office_japan.display()

office_code,city,phone,address_line1,address_line2,state,country,postal_code,territory
5,Tokyo,+81 33 224 5000,4-1 Kioicho,Uninformed,Chiyoda-Ku,Japan,102-8578,Japan


- Solução utilizando SQL

Criando tabelas temporárias

In [0]:
df_office_japan.createOrReplaceTempView('office')
df_employees.createOrReplaceTempView('emp')

Consulta final apresentando o resultado

In [0]:
%sql

SELECT 
  e.first_name,
  e.last_name,
  CONCAT('******', SUBSTRING(e.email, LOCATE('@', e.email))) AS email
FROM emp AS e
INNER JOIN office AS o 
ON e.office_code = o.office_code;

first_name,last_name,email
Mami,Nishi,******@classicmodelcars.com
Yoshimi,Kato,******@classicmodelcars.com


- Solução utilizando PySpark

In [0]:
df_employees.join(df_office_japan, df_employees.office_code == df_office_japan.office_code, 'INNER')\
           .select(df_employees.first_name, df_employees.last_name, df_employees.email).display()

first_name,last_name,email
Mami,Nishi,mnishi@classicmodelcars.com
Yoshimi,Kato,ykato@classicmodelcars.com


Consulta final apresentando o resultado

In [0]:
df_q3 = df_employees.join(df_office_japan, df_employees.office_code == df_office_japan.office_code, 'INNER')\
                    .select(df_employees.first_name, df_employees.last_name, df_employees.email)

df_q3 = df_q3.withColumn('email', regexp_replace('email', '[^@]+@', '*****@'))
df_q3.display()

first_name,last_name,email
Mami,Nishi,*****@classicmodelcars.com
Yoshimi,Kato,*****@classicmodelcars.com


Salvando o resultado

In [0]:
df_q3.write.mode('overwrite').format('delta').option('mergeSchema', 'True').save('/join/gold/resposta_q3')