In [None]:
import re 
from pyspark.sql import DataFrame
from pyspark.sql.functions import sum
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [None]:
# Definindo informações de conexão para um banco de dados PostgreSQL.
# Aqui, estamos configurando os detalhes do driver JDBC, credenciais do banco de dados
# e a URL de conexão necessária para acessar o banco de dados.

# Driver JDBC para PostgreSQL
driver = "org.postgresql.Driver"

# Credenciais do banco de dados
database_host = "psql-mock-database-cloud.postgres.database.azure.com"
database_port = "5432"
database_name = "ecom1692302642986jdgfoiqqgnwhjgtz"
user = "bktdrkjovneylsygcyjfmgsp@psql-mock-database-cloud"
password = "vokkfcasyhwkvhytorkpjimf"

# URL de conexão construída usando as informações acima
url = f"jdbc:postgresql://{database_host}:{database_port}/{database_name}"


In [None]:
def get_remote_table(driver: str, url: str, table: str, user: str, password: str) -> DataFrame:
    """
    Lê uma tabela remota utilizando Spark DataFrame API.

    Args:
        driver (str): O driver JDBC para o banco de dados.
        url (str): A URL do banco de dados.
        table (str): O nome da tabela a ser lida.
        user (str): O nome de usuário para autenticação.
        password (str): A senha para autenticação.

    Returns:
        DataFrame: Um DataFrame contendo os dados da tabela remota.
    """
    
    remote_table = (spark.read
                    .format("jdbc")
                    .option("driver", driver)
                    .option("url", url)
                    .option("dbtable", table)
                    .option("user", user)
                    .option("password", password)
                    .load()
                    )
    
    return remote_table


In [None]:
# Obtendo dados de tabelas remotas do banco de dados utilizando a função get_remote_table.
# Aqui, estamos buscando dados de várias tabelas usando as credenciais e informações de conexão
# previamente configuradas. Cada chamada da função get_remote_table resulta em um DataFrame contendo
# os dados da tabela correspondente.

# Obtendo dados da tabela 'customers'
customers = get_remote_table(driver=driver, url=url, table='customers', user=user, password=password)

# Obtendo dados da tabela 'employees'
employees = get_remote_table(driver=driver, url=url, table='employees', user=user, password=password)

# Obtendo dados da tabela 'offices'
offices = get_remote_table(driver=driver, url=url, table='offices', user=user, password=password)

# Obtendo dados da tabela 'orderdetails'
orderdetails = get_remote_table(driver=driver, url=url, table='orderdetails', user=user, password=password)

# Obtendo dados da tabela 'orders'
orders = get_remote_table(driver=driver, url=url, table='orders', user=user, password=password)

# Obtendo dados da tabela 'payments'
payments = get_remote_table(driver=driver, url=url, table='payments', user=user, password=password)

# Obtendo dados da tabela 'product_lines'
product_lines = get_remote_table(driver=driver, url=url, table='product_lines', user=user, password=password)

# Obtendo dados da tabela 'products'
products = get_remote_table(driver=driver, url=url, table='products', user=user, password=password)


In [None]:
product_lines.show(n=5)

+------------+--------------------+----------------+-----+
|product_line|    text_description|html_description|image|
+------------+--------------------+----------------+-----+
|Classic Cars|Attention car ent...|            null| null|
| Motorcycles|Our motorcycles a...|            null| null|
|      Planes|Unique, diecast a...|            null| null|
|       Ships|The perfect holid...|            null| null|
|      Trains|Model trains are ...|            null| null|
+------------+--------------------+----------------+-----+
only showing top 5 rows



In [None]:
# Salvando DataFrames em formato Parquet.
# Neste trecho de código, estamos salvando os DataFrames resultantes das consultas
# às tabelas remotas em formato Parquet. Cada DataFrame é salvo em um local específico
# usando o método write.parquet, facilitando o armazenamento dos dados em um formato eficiente.

# Salvando DataFrame 'customers' em Parquet
customers.write.parquet("FileStore/CaseTecnico/customers.parquet")

# Salvando DataFrame 'employees' em Parquet
employees.write.parquet("FileStore/CaseTecnico/employees.parquet")

# Salvando DataFrame 'offices' em Parquet
offices.write.parquet("FileStore/CaseTecnico/offices.parquet")

# Salvando DataFrame 'orderdetails' em Parquet
orderdetails.write.parquet("FileStore/CaseTecnico/orderdetails.parquet")

# Salvando DataFrame 'orders' em Parquet
orders.write.parquet("FileStore/CaseTecnico/orders.parquet")

# Salvando DataFrame 'payments' em Parquet
payments.write.parquet("FileStore/CaseTecnico/payments.parquet")

# Salvando DataFrame 'product_lines' em Parquet
product_lines.write.parquet("FileStore/CaseTecnico/product_lines.parquet")

# Salvando DataFrame 'products' em Parquet
products.write.parquet("FileStore/CaseTecnico/products.parquet")


In [None]:
%fs ls /FileStore/CaseTecnico/

path,name,size,modificationTime
dbfs:/FileStore/CaseTecnico/customers.parquet/,customers.parquet/,0,1692501896613
dbfs:/FileStore/CaseTecnico/employees.parquet/,employees.parquet/,0,1692501896613
dbfs:/FileStore/CaseTecnico/offices.parquet/,offices.parquet/,0,1692501896613
dbfs:/FileStore/CaseTecnico/orderdetails.parquet/,orderdetails.parquet/,0,1692501896613
dbfs:/FileStore/CaseTecnico/orders.parquet/,orders.parquet/,0,1692501896613
dbfs:/FileStore/CaseTecnico/payments.parquet/,payments.parquet/,0,1692501896613
dbfs:/FileStore/CaseTecnico/product_lines.parquet/,product_lines.parquet/,0,1692501896613
dbfs:/FileStore/CaseTecnico/products.parquet/,products.parquet/,0,1692501896613


In [None]:
# Lendo arquivos Parquet como DataFrames.
# Neste trecho de código, estamos lendo os arquivos Parquet que foram previamente
# salvos como DataFrames. Cada arquivo Parquet é lido usando o método read.parquet,
# resultando em um DataFrame que contém os dados do arquivo.

# Lendo arquivo Parquet 'customers'
customers_parquet = spark.read.parquet("/FileStore/CaseTecnico/customers.parquet")

# Lendo arquivo Parquet 'employees'
employees_parquet = spark.read.parquet("/FileStore/CaseTecnico/employees.parquet")

# Lendo arquivo Parquet 'offices'
offices_parquet = spark.read.parquet("/FileStore/CaseTecnico/offices.parquet")

# Lendo arquivo Parquet 'orderdetails'
orderdetails_parquet = spark.read.parquet("/FileStore/CaseTecnico/orderdetails.parquet")

# Lendo arquivo Parquet 'orders'
orders_parquet = spark.read.parquet("/FileStore/CaseTecnico/orders.parquet")

# Lendo arquivo Parquet 'payments'
payments_parquet = spark.read.parquet("/FileStore/CaseTecnico/payments.parquet")

# Lendo arquivo Parquet 'product_lines'
product_lines_parquet = spark.read.parquet("/FileStore/CaseTecnico/product_lines.parquet")

# Lendo arquivo Parquet 'products'
products_parquet = spark.read.parquet("/FileStore/CaseTecnico/products.parquet")


In [None]:
product_lines_parquet.show(n=10)

+----------------+--------------------+----------------+-----+
|    product_line|    text_description|html_description|image|
+----------------+--------------------+----------------+-----+
|    Classic Cars|Attention car ent...|            null| null|
|     Motorcycles|Our motorcycles a...|            null| null|
|          Planes|Unique, diecast a...|            null| null|
|           Ships|The perfect holid...|            null| null|
|          Trains|Model trains are ...|            null| null|
|Trucks and Buses|The Truck and Bus...|            null| null|
|    Vintage Cars|Our Vintage Car m...|            null| null|
+----------------+--------------------+----------------+-----+



In [None]:
# Obtendo novos dados da tabela 'product_lines'
product_lines = get_remote_table(driver=driver, url=url, table='product_lines', user=user, password=password)

In [None]:
# identificar as mudanças entre as fontes de dados
changes = product_lines.subtract(product_lines_parquet)
changes.show()

+--------------+--------------------+--------------------+-----+
|  product_line|    text_description|    html_description|image|
+--------------+--------------------+--------------------+-----+
|Uno com Escada|A cada dia cresce...|                null| null|
|  Vintage Cars|Our Vintage Car m...|https://www.pexel...| null|
+--------------+--------------------+--------------------+-----+



In [None]:
changes.toJSON().collect()

Out[31]: ['{"product_line":"Uno com Escada","text_description":"A cada dia cresce a fama dos Fiat Uno com escada no teto, que costumam ser veículos de serviço das firmas no Brasil. Há tempos surgem memes e piadas na internet envolvendo esse “superesportivo” considerando o seu alto desempenho, especialmente quando levam nas suas portas adesivos de empresas de telecomunicações."}',
 '{"product_line":"Vintage Cars","text_description":"Our Vintage Car models realistically portray automobiles produced from the early 1900s through the 1940s. Materials used include Bakelite, diecast, plastic and wood. Most of the replicas are in the 1:18 and 1:24 scale sizes, which provide the optimum in detail and accuracy. Prices range from $30.00 up to $180.00 for some special limited edition replicas. All models include a certificate of authenticity from their manufacturers and come fully assembled and ready for display in the home or office.","html_description":"https://www.pexels.com/photo/blue-sedan-71

In [None]:
def merge_data_sources(df_jdbc: DataFrame, df_parquet: DataFrame, key: str, path_to_parquet: str) -> None:
    """
    Realiza a ação de merge entre duas fontes de dados: uma DataFrame da fonte JDBC e outra DataFrame
    dos arquivos Parquet. A função identifica registros a serem inseridos, atualizados e excluídos para
    manter a consistência entre as fontes de dados. Os resultados são salvos como arquivos Parquet.

    Args:
        df_jdbc (DataFrame): DataFrame da fonte JDBC contendo os dados atualizados.
        df_parquet (DataFrame): DataFrame dos arquivos Parquet como fonte de armazenamento local.
        key (str): Nome da coluna que é chave primária para identificação de registros.
        path_to_parquet (str): Caminho para onde os arquivos Parquet atualizados serão salvos.

    Returns:
        None: A função não retorna valor. Os resultados são salvos nos arquivos Parquet especificados.
    """
    # Identificar mudanças entre as fontes de dados
    changes = df_jdbc.subtract(df_parquet)

    # Identificar registros a serem excluídos
    deletes = df_parquet.join(changes, key, "inner")

    # Identificar registros a serem inseridos
    inserts = changes

    # Identificar registros a serem atualizados
    updates = df_jdbc.join(df_parquet.alias("parquet_alias"), key, "inner") \
                     .select("parquet_alias.*")

    # Remover registros a serem excluídos dos arquivos Parquet
    df_updated = df_parquet.join(deletes, key, "left_anti")

    # Adicionar registros a serem inseridos aos arquivos Parquet
    df_updated = df_updated.union(inserts)

    # Atualizar registros a serem atualizados nos arquivos Parquet
    df_updated = df_updated.join(updates, updates.columns, "left_anti")

    # Salvar os resultados de volta como Parquet
    df_updated.write.mode("overwrite").parquet(path_to_parquet)

In [None]:
# Executando merge entre as fontes de dados
merge_data_sources(
    df_jdbc=product_lines, 
    df_parquet=product_lines_parquet, 
    key="product_line", 
    path_to_parquet="/FileStore/CaseTecnico/product_lines.parquet"
)

In [None]:
new_product_lines_parquet = spark.read.parquet("/FileStore/CaseTecnico/product_lines.parquet")
new_product_lines_parquet.show()

+----------------+--------------------+--------------------+-----+
|    product_line|    text_description|    html_description|image|
+----------------+--------------------+--------------------+-----+
|     Motorcycles|Our motorcycles a...|                null| null|
|           Ships|The perfect holid...|                null| null|
|Trucks and Buses|The Truck and Bus...|                null| null|
|    Classic Cars|Attention car ent...|                null| null|
|          Trains|Model trains are ...|                null| null|
|          Planes|Unique, diecast a...|                null| null|
|  Uno com Escada|A cada dia cresce...|                null| null|
|    Vintage Cars|Our Vintage Car m...|https://www.pexel...| null|
+----------------+--------------------+--------------------+-----+



In [None]:
# Questão N1: Qual país possui a maior quantidade de itens cancelados?

# Calculando a quantidade de itens cancelados por país:
# - Filtrar os pedidos com status 'Cancelled'
# - Realizar um join com a tabela de clientes baseado no número do cliente
# - Agrupar por país e contar a quantidade de pedidos cancelados por país
# - Ordenar em ordem decrescente pela contagem

cancelled_count_by_country = orders \
    .filter("status = 'Cancelled'") \
    .join(customers, on=orders.customer_number == customers.customer_number, how="inner") \
    .groupBy(customers.country) \
    .count() \
    .orderBy("count", ascending=0)

# Salvando a contagem de itens cancelados por país em formato Delta:
# - Utilizando o método write.mode("overwrite").format("delta").save() para salvar em Delta
# - O caminho "/FileStore/CaseTecnico/cancelled_count_by_country.delta" indica onde os dados serão salvos

cancelled_count_by_country.write.mode("overwrite").format("delta").save("/FileStore/CaseTecnico/cancelled_count_by_country.delta")

# Exibindo o resultado da contagem de itens cancelados por país
cancelled_count_by_country.show(n=1)


+-----------+-----+
|    country|count|
+-----------+-----+
|New Zealand|    2|
+-----------+-----+
only showing top 1 row



In [None]:
# Questão N2: Qual o faturamento da linha de produto mais vendido?
# Considerando os itens Shipped cujos pedidos foram realizados no ano de 2005.

# Calculando o faturamento da linha de produto mais vendido:
# - Filtrar os pedidos com status 'Shipped' e que foram realizados no ano de 2005
# - Realizar joins para unir os pedidos, detalhes do pedido, produtos e linhas de produto
# - Selecionar a linha de produto e calcular o valor total (quantidade * preço) para cada item
# - Agrupar por linha de produto e somar os valores totais

revenue_product_line = orders \
    .filter("status = 'Shipped' AND (order_date >= '2005-01-01' and order_date <= '2005-12-31')") \
    .join(orderdetails, on=orderdetails.order_number == orders.order_number, how="inner") \
    .join(products, on=products.product_code == orderdetails.product_code , how="inner") \
    .join(product_lines, on=product_lines.product_line == products.product_line, how="inner") \
    .select(
        product_lines.product_line,
        (orderdetails.quantity_ordered * orderdetails.price_each).alias("revenue_amount")
    ) \
    .groupBy(product_lines.product_line) \
    .agg(sum("revenue_amount").alias("revenue")) \
    .orderBy("revenue", ascending=False)
    
    

# Salvando o resultado em formato Delta:
revenue_product_line.write.mode("overwrite").format("delta").save("/FileStore/CaseTecnico/revenue_product_line.delta")

# Exibindo o faturamento da linha de produto mais vendido:
revenue_product_line.show(n=1)


+------------+---------+
|product_line|  revenue|
+------------+---------+
|Classic Cars|603666.99|
+------------+---------+
only showing top 1 row



In [None]:
# Questão N3: Nome, sobrenome e e-mail dos vendedores do Japão, o local-part do e-mail deve estar mascarado.

# Definindo a UDF para mascarar o local-part do e-mail
def mask_email(email):
    regex = r'^([^@]+)@'
    match = re.match(regex, email)
    local_part = match.group(1)
    return email.replace(local_part, f"{local_part[0]}{'*' * (len(local_part)-2)}{local_part[-1]}")

# Criando a UDF
mask_email_udf = udf(mask_email, StringType())

# Filtrar os vendedores do escritório com código '5' (Japão)
japan_employees = employees \
    .filter("office_code = '5'") \
    .withColumn("e-mail", mask_email_udf("email")) \
    .select("first_name", "last_name", "e-mail")

# Salvando o resultado em formato Delta:
japan_employees.write.mode("overwrite").format("delta").save("/FileStore/CaseTecnico/japan_employees.delta")

# Exibindo os vendedores do escritório '5' com e-mails mascarados
japan_employees.show()


+----------+---------+--------------------+
|first_name|last_name|              e-mail|
+----------+---------+--------------------+
|      Mami|    Nishi|m****i@classicmod...|
|   Yoshimi|     Kato|y***o@classicmode...|
+----------+---------+--------------------+



In [None]:
japan_employees.show(truncate=50)

+----------+---------+---------------------------+
|first_name|last_name|                     e-mail|
+----------+---------+---------------------------+
|      Mami|    Nishi|m****i@classicmodelcars.com|
|   Yoshimi|     Kato| y***o@classicmodelcars.com|
+----------+---------+---------------------------+



In [None]:
%fs ls /FileStore/CaseTecnico/

path,name,size,modificationTime
dbfs:/FileStore/CaseTecnico/cancelled_count_by_country.delta/,cancelled_count_by_country.delta/,0,1692506802517
dbfs:/FileStore/CaseTecnico/customers.parquet/,customers.parquet/,0,1692506802517
dbfs:/FileStore/CaseTecnico/employees.parquet/,employees.parquet/,0,1692506802517
dbfs:/FileStore/CaseTecnico/japan_employees.delta/,japan_employees.delta/,0,1692506802517
dbfs:/FileStore/CaseTecnico/offices.parquet/,offices.parquet/,0,1692506802517
dbfs:/FileStore/CaseTecnico/orderdetails.parquet/,orderdetails.parquet/,0,1692506802517
dbfs:/FileStore/CaseTecnico/orders.parquet/,orders.parquet/,0,1692506802517
dbfs:/FileStore/CaseTecnico/payments.parquet/,payments.parquet/,0,1692506802517
dbfs:/FileStore/CaseTecnico/product_lines.parquet/,product_lines.parquet/,0,1692506802517
dbfs:/FileStore/CaseTecnico/products.parquet/,products.parquet/,0,1692506802517


In [None]:
# %fs rm -r /FileStore/CaseTecnico/