In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import yaml

#Criar sessão Spark
spark = SparkSession.builder \
    .appName('RNP - E-Commerce Case - Victor Hugo') \
    .config('spark.sql.legacy.pathOptionBehavior.enabled', True) \
    .config('spark.databricks.delta.formatCheck.enabled', False)  \
    .getOrCreate()

# Configurar as opções de conexão ao banco de dados
credenciais = yaml.load(open('./credenciais.yml'))
propriedades = {
    "user": credenciais['database']['username'],
    "password": credenciais['database']['password'],
    "driver": credenciais['database']['driver'],
    "url": credenciais['database']['url']
}

jdbc_url = propriedades["url"]

In [None]:
create_bdfs = dbutils.fs.mkdirs("/Users/dev-torugo")
if create_bdfs:
    print("DBFS Criado")
else:
    print("Criação DBFS falhou")

In [None]:
# QUESTÃO 3 - No Notebook crie lógica para salvar cada tabela como parquet

# Configurações de conexão com o banco de dados
database_url = propriedades['url']

pgsql_dfs = {}

# Obtendo a lista de todas as tabelas do banco de dados
query = "(SELECT table_name FROM information_schema.tables WHERE table_schema = 'public') AS tables"
tabelas_ecommerce = spark.read.jdbc(url=database_url, table=query, properties=propriedades)

# Excluindo as tabelas que você deseja evitar
tabelas_excluir = ["pg_stat_statements", "pg_buffercache"]
tabelas_filtradas = tabelas_ecommerce.filter(~tabelas_ecommerce.table_name.isin(tabelas_excluir))

# Coletando a lista final de tabelas a serem salvas como Parquet
tabelas = tabelas_filtradas.select("table_name").rdd.flatMap(lambda x: x).collect()

# Loop através das tabelas e salvando como Parquet
for tabela in tabelas:
    query = f"(SELECT * FROM {tabela}) AS subquery"
    tabela_pgsql = spark.read.jdbc(url=database_url, table=query, properties=propriedades)
    nome_parquet = f"/Users/dev-torugo/{tabela}.parquet"
    tabela_pgsql.write.parquet(nome_parquet, mode="overwrite")
    pgsql_dfs[tabela] = tabela_pgsql
    print(f"{nome_parquet} salvo")
display(dbutils.fs.ls("/Users/dev-torugo/"))

In [None]:
#QUESTÃO 4 - No notebook crie um merge entre a tabela JDBC e os arquivos parquet, o merge deve conter a lógica de insert, update e delete.

#QUERYS QUE SERÃO UTILIZADAS NA ROTINA DE MERGE
merge_querys = {
    "customers": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.customer_number = source.customer_number
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.customer_number = source.customer_number,
                            target.customer_name = source.customer_name,
                            target.contact_last_name = source.contact_last_name,
                            target.contact_first_name = source.contact_first_name,
                            target.phone = source.phone,
                            target.address_line1 = source.address_line1,
                            target.address_line2 = source.address_line2,
                            target.city = source.city,
                            target.state = source.state,
                            target.postal_code = source.postal_code,
                            target.country = source.country,
                            target.sales_rep_employee_number = source.sales_rep_employee_number,
                            target.credit_limit = source.credit_limit
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """,

    "employees": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.employee_number = source.employee_number
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.employee_number = source.employee_number,
                            target.last_name = source.last_name,
                            target.first_name = source.first_name,
                            target.extension = source.extension,
                            target.email = source.email,
                            target.office_code = source.office_code,
                            target.reports_to = source.reports_to,
                            target.job_Title = source.job_Title
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """,

    "offices": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.office_code = source.office_code
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.office_code = source.office_code,
                            target.city = source.city,
                            target.phone = source.phone,
                            target.address_line1 = source.address_line1,
                            target.address_line2 = source.address_line2,
                            target.state = source.state,
                            target.country = source.country,
                            target.postal_code = source.postal_code,
                            target.territory = source.territory
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """,


    "orderdetails": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.order_number = source.order_number AND target.product_code = source.product_code
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.order_number = source.order_number,
                            target.product_code = source.product_code,
                            target.quantity_ordered = source.quantity_ordered,
                            target.price_each = source.price_each,
                            target.order_line_number = source.order_line_number
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """,

    "orders": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.order_number = source.order_number
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.order_number = source.order_number,
                            target.order_date = source.order_date,
                            target.required_date = source.required_date,
                            target.shipped_date = source.shipped_date,
                            target.status = source.status,
                            target.comments = source.comments,
                            target.customer_number = source.customer_number
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """,


    "payments": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.check_number = source.check_number
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.customer_number = source.customer_number,
                            target.check_number = source.check_number,
                            target.payment_date = source.payment_date,
                            target.amount = source.amount
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """,

    "product_lines": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.product_line = source.product_line
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.product_line = source.product_line,
                            target.text_description = source.text_description,
                            target.html_description = source.html_description,
                            target.image = source.image
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """,
    "products": f"""
                    MERGE INTO tabela_delta AS target
                    USING tabela_postgresql AS source
                    ON target.product_code = source.product_code
                    WHEN MATCHED THEN
                        UPDATE SET
                            target.product_code = source.product_code,
                            target.product_name = source.product_name,
                            target.product_line = source.product_line,
                            target.product_scale = source.product_scale,
                            target.product_vendor = source.product_vendor,
                            target.product_description = source.product_description,
                            target.quantity_in_stock = source.quantity_in_stock,
                            target.buy_price = source.buy_price,
                            target.msrp = source.msrp
                    WHEN NOT MATCHED BY target THEN
                        INSERT *
                    WHEN NOT MATCHED BY source THEN
                        DELETE """ 
}

def parquet_jdbc_merge(pgsql_dfs, tabelas):
    try:
        for tabela in tabelas:
            # Lendo o arquivo Parquet e pegando a sua tabela postgre
            parquet_path = f"/Users/dev-torugo/{tabela}.parquet"
            delta_path = f"/Users/dev-torugo/{tabela}_delta"
            df_parquet = spark.read.parquet(parquet_path)

            #Criar Delta Lake para o parquet
            df_parquet.write.format('delta').mode('overwrite').option("overwriteSchema", "true").save(f'{delta_path}')
            df_delta = spark.read.format("delta").load(delta_path)


            df_delta.createOrReplaceTempView("tabela_delta") # Cria uma view temporária para a tabela Delta
            pgsql_df = pgsql_dfs[tabela] # Obtém o DataFrame correspondente à tabela PostgreSQL
            pgsql_df.createOrReplaceTempView("tabela_postgresql") # Cria uma view temporária para a tabela PostgreSQL

            merge = merge_querys[tabela]

            spark.sql(merge)
            spark.catalog.dropTempView("tabela_postgresql")
            print(f"{tabela} merged.")
        print("Todas as tabelas foram processadas)
    except Exception as e:
        print(e)
        pass

#Uso
parquet_jdbc_merge(pgsql_dfs, tabelas)


In [None]:
#FERRAMENTA AUXILIAR PARA AS RESPOSTAS DA QUESTÃO 5

# Esta função utiliza o Spark para consultar dados de e-commerce de uma fonte JDBC especificada pela URL e tabela fornecidas. Os resultados são armazenados em um arquivo Delta no caminho especificado, permitindo a substituição do esquema anterior. Em caso de erro, a função exibe uma mensagem de erro detalhada.

def query_ecommerce_data (url, table, properties):
    try:
        df = spark.read.jdbc(url,
                            table,
                            properties=propriedades)
        file_path = 'Users/dev-torugo/'
        df.write.format('delta').mode('overwrite').option("overwriteSchema", "true").save(f'{file_path}')
        return df
    except Exception as e:
        print(f'Erro: {e}')



In [None]:
# QUESTÃO 5.1 - Qual país possui a maior quantidade de itens cancelados?

query_Q5_1 = '''
(   SELECT DISTINCT c.country AS pais, SUM(dos.quantity_ordered) AS total_itens_cancelados
    FROM public.customers c 
    LEFT JOIN public.orders os ON c.customer_number = os.customer_number
    LEFT JOIN orderdetails dos ON dos.order_number = os.order_number
    WHERE os.status = 'Cancelled'
    GROUP BY c.country
    ORDER BY total_itens_cancelados DESC
    LIMIT 1) AS subquery
'''

df_qtd_cancelados = query_ecommerce_data(jdbc_url, query_Q5_1, propriedades)

In [None]:
df_qtd_cancelados.show()

+-----+----------------------+
| pais|total_itens_cancelados|
+-----+----------------------+
|Spain|                   605|
+-----+----------------------+



In [None]:
# QUESTÃO 5.2 - Qual o faturamento da linha de produto mais vendido, considere os itens com status 'Shipped', cujo o pedido foi realizado no ano de 2005?

query_Q5_2 = '''
    (SELECT pd.product_line,
     SUM(p.amount) AS faturamento
    FROM products pd
    LEFT JOIN orderdetails od ON od.product_code  = pd.product_code
    LEFT JOIN orders o ON o.order_number  = od.order_number 
    LEFT JOIN payments p ON p.customer_number = o.customer_number 
    WHERE o.status = 'Shipped' AND EXTRACT(year FROM o.order_date) = 2005
    GROUP BY pd.product_line
    ORDER BY faturamento DESC
    LIMIT 1) As subquery
'''

df_produto_2005 = query_ecommerce_data(jdbc_url, query_Q5_2, propriedades)

In [None]:
df_produto_2005.show()

+------------+--------------------+
|product_line|         faturamento|
+------------+--------------------+
|Classic Cars|47574393.22000000...|
+------------+--------------------+



In [None]:
# QUESTÃO 5.3 - Nome, sobrenome e e-mail dos vendedores do Japão, o local-part do e-mail deve estar mascarado.

query_Q5_3 = '''
(   SELECT
        e.first_name AS nome,
        e.last_name AS sobrenome,
        CONCAT(REPEAT('*', POSITION('@' IN e.email) - 1), SUBSTRING(e.email, POSITION('@' IN e.email))) AS email
    FROM employees e 
    LEFT JOIN offices o 
    ON o.office_code = e.office_code 
    WHERE o.country = 'Japan'
    ) As subquery
    '''

df_vendedores_japao = query_ecommerce_data(jdbc_url, query_Q5_3, propriedades)

In [None]:
df_vendedores_japao.show()

+-------+---------+--------------------+
|   nome|sobrenome|               email|
+-------+---------+--------------------+
|   Mami|    Nishi|******@classicmod...|
|Yoshimi|     Kato|*****@classicmode...|
+-------+---------+--------------------+



In [None]:
spark.stop()