In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [0]:
#### Inicialização de variáveis

spark = SparkSession.builder.appName("SparkJob").getOrCreate()

# Configurações do banco e dados
driver = "org.postgresql.Driver"
database_host = "psql-mock-database-cloud.postgres.database.azure.com"
database_port = "5432"
database_name = "ecom1692290925429vkltgmdlpclbtrmh"
user = "mttdnzukfyledzwzollrcydx@psql-mock-database-cloud"
password = "anljjxxmzjlpbtvkvegkwnyt"
url = f"jdbc:postgresql://{database_host}:{database_port}/{database_name}"

# Dicionário de Dados contendo o nome da tabela e a sua respectiva chave primária
dict_table_key = {'customers': ['customer_number'], 
                  'employees': ['employee_number'], 
                  'offices': ['office_code'], 
                  'orderdetails': ['order_line_number', 'order_number'], 
                  'orders': ['order_number'], 
                  'payments': ['check_number'], 
                  'product_lines': ['product_line'], 
                  'products': ['product_code']}

In [0]:
#### Salvar cada tabela como parquet

for table in dict_table_key.keys():
    df = (spark.read
            .format("jdbc")
            .option("driver", driver)
            .option("url", url)
            .option("dbtable", table)
            .option("user", user)
            .option("password", password)
            .load()
         )
    
    df.write.mode("overwrite").parquet("/parquet/" + table + ".parquet")

#df=spark.read.parquet("/parquet/product_lines.parquet")
#df.show()

In [0]:
#### Merge entre a tabela JDBC e os arquivos parquet, o merge contém a lógica de insert, update e delete.

for table, p_key in dict_table_key.items():
    (spark.read
         .format("jdbc")
         .option("driver", driver)
         .option("url", url)
         .option("dbtable", table)
         .option("user", user)
         .option("password", password)
         .load()
         .createOrReplaceTempView(table + "_jdbc"))
    
    # O dado é replicado em delta porque o Databricks só executa o MERGE INTO nesse tipo de dado
    spark.read.parquet("/parquet/" + table + ".parquet").write.format("delta").mode("overwrite").save("/tmp/delta/" + table)
    spark.read.load("/tmp/delta/" + table).createOrReplaceTempView(table + "_delta")

    # SQL dinâmico para cada tabela 
    lst_columns = spark.sql("select * from " + table + "_delta limit 1").columns
    str_set = ', '.join('TARGET.{0}=SOURCE.{0}'.format(c) for c in lst_columns)
    str_columns = ', '.join(lst_columns)
    str_key = ' AND '.join('TARGET.{0}=SOURCE.{0}'.format(c) for c in p_key)

    sql = """
        MERGE INTO {0} AS TARGET
        USING {1} AS SOURCE
        ON {2}
        WHEN MATCHED AND 1=1 THEN 
        UPDATE SET {3}
        WHEN NOT MATCHED BY TARGET THEN
        INSERT ({4})
        VALUES ({4})
        WHEN NOT MATCHED BY SOURCE THEN
        DELETE
        """.format(table + "_delta", table + "_jdbc", str_key, str_set, str_columns)

    spark.sql(sql)

    #Salva o resultado no parquet
    spark.sql("select * from " + table + "_delta").write.mode("overwrite").parquet("/parquet/" + table + ".parquet")

    #exclui a tabela delta temporária
    dbutils.fs.rm("/tmp/delta/" + table,recurse=True)


In [0]:
#### Análise de dados para responder as seguintes perguntas e salvar o resultado em formato delta

# Qual país possui a maior quantidade de itens cancelados?
spark.read.parquet("/parquet/orders.parquet").createOrReplaceTempView("orders")
spark.read.parquet("/parquet/customers.parquet").createOrReplaceTempView("customers")
spark.read.parquet("/parquet/orderdetails.parquet").createOrReplaceTempView("orderdetails")

sql = """
    select sum(ord_det.quantity_ordered) qtde_cancelados, country 
      from orders ord
inner join customers cus on cus.customer_number = ord.customer_number
inner join orderdetails ord_det on ord_det.order_number = ord.order_number
     where ord.status = 'Cancelled'
  group by country
  order by 1 desc
     limit 1
"""     

spark.sql(sql).show()
spark.sql(sql).write.format("delta").mode("overwrite").save("/delta/result_pais_cancelados")

+---------------+-------+
|qtde_cancelados|country|
+---------------+-------+
|            605|  Spain|
+---------------+-------+



In [0]:
# Qual o faturamento da linha de produto mais vendido, considere como os itens Shipped, cujo o pedido foi realizado no ano de 2005?
spark.read.parquet("/parquet/product_lines.parquet").createOrReplaceTempView("product_lines")
spark.read.parquet("/parquet/products.parquet").createOrReplaceTempView("products")
spark.read.parquet("/parquet/orders.parquet").createOrReplaceTempView("orders")
spark.read.parquet("/parquet/orderdetails.parquet").createOrReplaceTempView("orderdetails")

sql = """
    select pl.product_line, sum(od.quantity_ordered * od.price_each) as faturamento 
      from product_lines pl
inner join products p on p.product_line = pl.product_line
inner join orderdetails od on od.product_code = p.product_code
inner join orders o on o.order_number = od.order_number
     where o.status = 'Shipped' and extract(year from o.order_date) = 2005
group by pl.product_line
order by 2 desc
   limit 1
"""     

spark.sql(sql).show()
spark.sql(sql).write.format("delta").mode("overwrite").save("/delta/result_faturamento_produto")

+------------+-----------+
|product_line|faturamento|
+------------+-----------+
|Classic Cars|  603666.99|
+------------+-----------+



In [0]:
# Nome, sobrenome e e-mail dos vendedores do Japão, o local-part do e-mail deve estar mascarado.
spark.read.parquet("/parquet/employees.parquet").createOrReplaceTempView("employees")
spark.read.parquet("/parquet/offices.parquet").createOrReplaceTempView("offices")

sql = """
    select first_name, last_name, replace(email, split_part(email, '@', 1), '########') email
      from employees emp
inner join offices off on off.office_code = emp.office_code
where off.country = 'Japan'
"""     

spark.sql(sql).show()
spark.sql(sql).write.format("delta").mode("overwrite").save("/delta/result_vend_japao")

+----------+---------+--------------------+
|first_name|last_name|               email|
+----------+---------+--------------------+
|      Mami|    Nishi|########@classicm...|
|   Yoshimi|     Kato|########@classicm...|
+----------+---------+--------------------+

