##Criando conexão com banco de dados

In [0]:
from pyspark.sql import SparkSession

# Inicializar sessão do Spark
spark = SparkSession.builder \
    .appName("PostgreSQL reader") \
    .getOrCreate()

#Detalhes da conexão
database_url = "jdbc:postgresql://psql-mock-database-cloud.postgres.database.azure.com:5432/ecom1692884890642lsbtqzttgrtzvnoc"
properties = {
    "user": "qvwyyojejjekojytdvfmnsfk@psql-mock-database-cloud",
    "password": "pemhzteldibgloibpdqjdccq",
    "driver": "org.postgresql.Driver"
}

#Função para ler tabelas
def read_table(table_name):
    return spark.read.jdbc(database_url, table_name, properties=properties)

# lendo as tabelas
customers_df = read_table("customers")
employees_df = read_table("employees")
offices_df = read_table("offices")
orderdetails_df = read_table("orderdetails")
orders_df = read_table("orders")
payments_df = read_table("payments")
product_lines_df = read_table("product_lines")
products_df = read_table("products")

#Verifica se os dados estão carregados
customers_df.show(1)




+---------------+-------------+-----------------+------------------+----------+--------------+-------------+------+--------+-----------+-------+-------------------------+------------+
|customer_number|customer_name|contact_last_name|contact_first_name|     phone| address_line1|address_line2|  city|   state|postal_code|country|sales_rep_employee_number|credit_limit|
+---------------+-------------+-----------------+------------------+----------+--------------+-------------+------+--------+-----------+-------+-------------------------+------------+
|            103|         Jake|             King|           Carine |40.32.2555|54, rue Royale|         null|Nantes|Victoria|      44000| France|                     1370|    21000.00|
+---------------+-------------+-----------------+------------------+----------+--------------+-------------+------+--------+-----------+-------+-------------------------+------------+
only showing top 1 row



#criando a lógica para salvar cada tabela como parquet

In [0]:
#criando caminho para salvar as tabelas em parquet
base_path = "/edson_arquivos_parquet/"

#Função para salvar DataFrame como Parquet
def save_as_parquet(df, table_name):
    parquet_path = base_path + table_name + ".parquet"
    df.write.mode('overwrite').parquet(parquet_path)

# Salva cada tabela como um arquivo Parquet
save_as_parquet(customers_df, "customers")
save_as_parquet(employees_df, "employees")
save_as_parquet(offices_df, "offices")
save_as_parquet(orderdetails_df, "orderdetails")
save_as_parquet(orders_df, "orders")
save_as_parquet(payments_df, "payments")
save_as_parquet(product_lines_df, "product_lines")
save_as_parquet(products_df, "products")


In [0]:
# criando tabelas temporarias, para montar as consulta em SQL
customers_df.createOrReplaceTempView("customers")
employees_df.createOrReplaceTempView("employees")
offices_df.createOrReplaceTempView("offices")
payments_df.createOrReplaceTempView("payments")
product_lines_df.createOrReplaceTempView("product_lines")
orderdetails_df.createOrReplaceTempView("orderdetails")
orders_df.createOrReplaceTempView("orders")
products_df.createOrReplaceTempView("products")


#Qual país possui a maior quantidade de itens cancelados?

In [0]:
result1 = spark.sql("""
      SELECT c.country, SUM(od.quantity_ordered) AS total_cancelled
    FROM orders o
    JOIN orderdetails od ON o.order_number = od.order_number
    JOIN customers c ON o.customer_number = c.customer_number
    WHERE o.status = 'Cancelled'
    GROUP BY c.country
    ORDER BY total_cancelled DESC
    LIMIT 1
""")
result1.show()


+-------+---------------+
|country|total_cancelled|
+-------+---------------+
|  Spain|            605|
+-------+---------------+



#Qual o faturamento da linha de produto mais vendido, considere como os itens Shipped, cujo o pedido foi realizado no ano de 2005?

In [0]:
result2 = spark.sql("""
    SELECT p.product_line, SUM(od.price_each) AS total_revenue
    FROM orders o
    JOIN orderdetails od ON o.order_number = od.order_number
    JOIN products p ON od.product_code = p.product_code
    WHERE o.status = 'Shipped' AND YEAR(o.order_date) = 2005
    GROUP BY p.product_line
    ORDER BY total_revenue DESC
    LIMIT 1
""")
result2.show()


+------------+-------------+
|product_line|total_revenue|
+------------+-------------+
|Classic Cars|     16220.62|
+------------+-------------+



#Nome, sobrenome e e-mail dos vendedores do Japão, o local-part do e-mail deve estar mascarado.

In [0]:
result3 = spark.sql("""
    SELECT e.first_name, e.last_name, 
           CONCAT(REPEAT('*', LENGTH(SPLIT(e.email, '@')[0])), '@', SPLIT(e.email, '@')[1]) as masked_email
    FROM employees e
    JOIN offices o ON e.office_code = o.office_code
    WHERE o.country = 'Japan'
""")
result3.show()


+----------+---------+--------------------+
|first_name|last_name|        masked_email|
+----------+---------+--------------------+
|      Mami|    Nishi|******@classicmod...|
|   Yoshimi|     Kato|*****@classicmode...|
+----------+---------+--------------------+



#No notebook crie um merge entre a tabela JDBC e os arquivos parquet, o merge 
###deve conter a lógica de insert, update e delete

In [0]:
def cast_dataframe(target_df, reference_df):
    """
    Convertendo colunas de target_df para corresponder aos tipos de dados de reference_df.
    Argumentos:
    - target_df (DataFrame): DataFrame cujas colunas precisam ser convertidas.
    - reference_df (DataFrame): DataFrame utilizado como referência para o casting.
    
    Retorna:
    - DataFrame: target_df após a conversão.
    """
    for column in target_df.columns:
        ref_column = column.replace("parquet_", "")
        target_df = target_df.withColumn(column, target_df[column].cast(reference_df.schema[ref_column].dataType))
    return target_df

def upsert_table(jdbc_df, parquet_df, primary_keys):
    # Gerando hashes para jdbc_df
    hash_expression = md5(concat_ws("|", *jdbc_df.columns)).alias("row_hash")
    jdbc_df = jdbc_df.withColumn("row_hash", hash_expression)

    # Gerando hashes para parquet_df
    hash_expression = md5(concat_ws("|", *parquet_df.columns)).alias("row_hash")
    parquet_df = parquet_df.withColumn("row_hash", hash_expression)

    # Identificando registros novos e atualizados
    new_records = parquet_df.join(jdbc_df, on=["row_hash"], how="left_anti")
    updated_records = parquet_df.join(jdbc_df, on=["row_hash"], how="inner")

    # Identificando registros excluídos
    deleted_records_hash = jdbc_df.join(parquet_df, on=["row_hash"], how="left_anti").select("row_hash")

    # Seleção explícita das colunas de parquet_df
    columns_to_select = [f"parquet_{col}" for col in jdbc_df.columns if col != "row_hash"]
    updated_records = updated_records.select(*columns_to_select)
    new_records = new_records.select(*columns_to_select)

    # Casting
    new_records = cast_dataframe(new_records, jdbc_df)
    updated_records = cast_dataframe(updated_records, jdbc_df)

    # Excluindo registros deletados
    result_df = jdbc_df.join(deleted_records_hash, on="row_hash", how="left_anti").drop("row_hash")

    # Adicionando registros atualizados e novos
    return result_df.union(new_records).union(updated_records)

In [0]:
# Iniciar uma sessão Spark
spark = SparkSession.builder.appName("Upsert").getOrCreate()
from pyspark.sql.functions import col, md5, concat_ws



def rename_parquet_columns(df):
    return df.select([col(column).alias(f"parquet_{column}") for column in df.columns])


# Renomeando as colunas em parquet_df antes da junção
def rename_parquet_columns(df):
    return df.select([col(column).alias(f"parquet_{column}") for column in df.columns])

# Para a tabela customers
jdbc_df = spark.sql('SELECT * FROM customers')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/customers.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_customers_df = upsert_table(jdbc_df, parquet_df, ["parquet_customer_number"])

# Para a tabela employees
jdbc_df = spark.sql('SELECT * FROM employees')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/employees.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_employee_df = upsert_table(jdbc_df, parquet_df, ["parquet_employee_number"])

# Para a tabela offices
jdbc_df = spark.sql('SELECT * FROM offices')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/offices.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_office_df = upsert_table(jdbc_df, parquet_df, ["parquet_office_code"])

# Para a tabela orderdetails
jdbc_df = spark.sql('SELECT * FROM orderdetails')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/orderdetails.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_orderdetails_df = upsert_table(jdbc_df, parquet_df, ["parquet_order_number", "parquet_product_code"])

# Para a tabela orders
jdbc_df = spark.sql('SELECT * FROM orders')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/orders.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_order_df = upsert_table(jdbc_df, parquet_df, ["parquet_order_number"])

# Para a tabela payments
jdbc_df = spark.sql('SELECT * FROM payments')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/payments.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_payments_df = upsert_table(jdbc_df, parquet_df, ["parquet_customer_number", "parquet_check_number"])

# Para a tabela product_lines
jdbc_df = spark.sql('SELECT * FROM product_lines')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/product_lines.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_product_lines_df = upsert_table(jdbc_df, parquet_df, ["parquet_product_line"])

# Para a tabela products
jdbc_df = spark.sql('SELECT * FROM products')
parquet_df = spark.read.parquet("/edson_arquivos_parquet/products.parquet")
parquet_df = rename_parquet_columns(parquet_df)
merged_products_df = upsert_table(jdbc_df, parquet_df, ["parquet_product_code"])



#Salve os resultados em formato delta.

In [0]:
# Para o dataframe merged_customers_df
path_customers = "/edson_arquivos_delta/customers"
merged_customers_df.write.format("delta").mode("overwrite").save(path_customers)

# Para o dataframe merged_employee_df
path_employee = "/edson_arquivos_delta/employee"
merged_employee_df.write.format("delta").mode("overwrite").save(path_employee)

# Para o dataframe merged_office_df
path_office = "/edson_arquivos_delta/office"
merged_office_df.write.format("delta").mode("overwrite").save(path_office)

# Para o dataframe merged_orderdetails_df
path_orderdetails = "/edson_arquivos_delta/orderdetails"
merged_orderdetails_df.write.format("delta").mode("overwrite").save(path_orderdetails)

# Para o dataframe merged_order_df
path_order = "/edson_arquivos_delta/order"
merged_order_df.write.format("delta").mode("overwrite").save(path_order)

# Para o dataframe merged_payments_df
path_payments = "/edson_arquivos_delta/payments"
merged_payments_df.write.format("delta").mode("overwrite").save(path_payments)

# Para o dataframe merged_product_lines_df
path_product_lines = "/edson_arquivos_delta/product_lines"
merged_product_lines_df.write.format("delta").mode("overwrite").save(path_product_lines)

# Para o dataframe merged_products_df
path_products = "/edson_arquivos_delta/products"
merged_products_df.write.format("delta").mode("overwrite").save(path_products)
