In [0]:
#!pip install -r requirements.txt

In [0]:
import polars as pl

from pyspark.sql import SparkSession

#Cria a sessão Spark
spark = SparkSession.builder \
    .appName('Convert to PySpark DataFrame') \
    .config('spark.sql.legacy.pathOptionBehavior.enabled', True) \
    .config('spark.databricks.delta.formatCheck.enabled', False)  \
    .getOrCreate()

In [0]:
#Funções utilizadas durante o case

'''
A função abaixo retorna um dataframe a partir de uma consulta SQL.
    Params:
        query: str - Consulta SQL a ser executada      
'''
connection_string = 'postgresql://kskocgzinxdjzxbefhvzukcm%40psql-mock-database-cloud:jezjgtkxislmdytscsmfzaey@psql-mock-database-cloud.postgres.database.azure.com:5432/ecom1689961191152kwdflhebkfqsxgdn'

def query_data(query : str): 
    try:
        df = pl.read_database(query, connection_string)
        display(df)

        return df
    except Exception as e:
        print(f'Erro: {e}')

'''
A função abaixo converte um dataframe polars para pandas, 
depois pySpark e salva o dataframe em formato Delta.
    Params:
        file_name: str - Nome do arquivo onde o parquet será armazenado.
'''
def dataframe_to_delta(file_name : str):
    try:
        df_pandas = df_polars.to_pandas()
        df_pyspark = spark.createDataFrame(df_pandas)
        display(df_pyspark)
        file_path = f'/dbfs/Users/devrvegh@gmail.com/dados_delta/{file_name}'
        df_pyspark.write.format('delta').save(f'{file_path}')
    except Exception as e:
        print(f'Erro: {e}')

In [0]:
#PERGUNTA 1 - Qual país possui a maior quantidade de itens cancelados?

query_1 = '''
    SELECT C.COUNTRY,
       SUM(OD.QUANTITY_ORDERED) AS TOTAL_ITEMS_CANCELADOS
FROM CUSTOMERS C
LEFT JOIN ORDERS O
    ON O.CUSTOMER_NUMBER = C.CUSTOMER_NUMBER
LEFT JOIN ORDERDETAILS OD
    ON O.ORDER_NUMBER = OD.ORDER_NUMBER 
WHERE O.STATUS = 'Cancelled'
GROUP BY C.COUNTRY
ORDER BY TOTAL_ITEMS_CANCELADOS DESC
LIMIT 1
'''

df_polars = query_data(query=query_1)

dataframe_to_delta('Pergunta_01')

country,total_items_cancelados
str,i64
"""Spain""",605


country,total_items_cancelados
Spain,605


In [0]:
#PERGUNTA 2 - Qual o faturamento da linha de produto mais vendido, considere os itens com status 'Shipped', cujo o pedido foi realizado no ano de 2005?

query_2 = '''
    SELECT PD.PRODUCT_LINE,
	SUM(P.AMOUNT) AS FATURAMENTO
FROM PRODUCTS PD
LEFT JOIN ORDERDETAILS OD
ON OD.PRODUCT_CODE  = PD.PRODUCT_CODE 
LEFT JOIN ORDERS O
ON O.ORDER_NUMBER  = OD.ORDER_NUMBER 
LEFT JOIN PAYMENTS P 
ON P.CUSTOMER_NUMBER = O.CUSTOMER_NUMBER 
WHERE O.STATUS = 'Shipped'
AND EXTRACT(YEAR FROM O.ORDER_DATE) = 2005
GROUP BY PD.PRODUCT_LINE
ORDER BY FATURAMENTO DESC
LIMIT 1
'''

df_polars = query_data(query=query_2)

dataframe_to_delta('Pergunta_02')

product_line,faturamento
str,f64
"""Classic Cars""",47574000.0


product_line,faturamento
Classic Cars,47574393.22


In [0]:
#PERGUNTA 3 - Nome, sobrenome e email dos vendedores do Japão. O local-part do email deve estar mascarado.

query_3 = '''
    SELECT
	E.FIRST_NAME AS NOME,
	E.LAST_NAME AS SOBRENOME,
    CONCAT(REPEAT('X', POSITION('@' IN E.EMAIL) - 1), SUBSTRING(EMAIL, POSITION('@' IN E.EMAIL))) AS EMAIL
FROM EMPLOYEES E 
LEFT JOIN OFFICES O 
ON O.OFFICE_CODE = E.OFFICE_CODE 
WHERE O.COUNTRY = 'Japan'
'''

df_polars = query_data(query=query_3)

dataframe_to_delta('Pergunta_03')

nome,sobrenome,email
str,str,str
"""Mami""","""Nishi""","""XXXXXX@classic…"
"""Yoshimi""","""Kato""","""XXXXX@classicm…"


nome,sobrenome,email
Mami,Nishi,XXXXXX@classicmodelcars.com
Yoshimi,Kato,XXXXX@classicmodelcars.com
