# Salvando tabelas como Parquet

In [0]:
HOST = 'psql-mock-database-cloud.postgres.database.azure.com'
PORT = 5432
DATABASE = 'ecom1692715661854plhkzofwthsxkoxs'
USER = 'fejsuzwpknrwbdbdwlrkqjtv@psql-mock-database-cloud'
PASSWORD = 'anyjuzyqrduelgjlkxhrplkr'

def table_to_parquet(table_name):
    remote_table = (
        spark.read
            .format('postgresql')
            .option('dbtable', table_name)
            .option('host', HOST)
            .option('port', PORT)
            .option('database', DATABASE)
            .option('user', USER)
            .option('password', PASSWORD)
            .load()
    )
    remote_table.write.mode("overwrite").parquet(f'/tmp/{table_name}.parquet')
    return remote_table.cache()

In [0]:
TABLES = ['offices', 'employees', 'customers', 'payments', 'product_lines', 'products', 'orders', 'orderdetails']
dataframes = {}

for table in TABLES:
    df = table_to_parquet(table)
    dataframes[table] = df

# Salvando tabelas como Delta

Numa primeira execução precisamos ter os arquivos no formato.
Comentado pois nas execuções posteriores não é necessário.

In [0]:
#for table in TABLES:
#    dataframes[table].write.format('delta').mode('overwrite').save(f'/tmp/delta/{table}')

# Mesclando no Delta Lake

## Carregando os parquets (previamente lidos da base PostgreSQL)

In [0]:
parquet_dataframes = {}

for table in TABLES:
    parquet_dataframes[table] = spark.read.parquet(f'/tmp/{table}.parquet').cache()

## Carregando os Deltas

In [0]:
from delta.tables import DeltaTable

delta_dataframes = {}

for table in TABLES:
    delta_dataframes[table] = DeltaTable.forPath(spark, f'/tmp/delta/{table}')

## Executando os merges

In [0]:
def do_merge(source, destination, key_fields):
    (
        destination
            .alias('dest')
            .merge(
                source.alias('src'), 
                ' AND '.join([f'src.{key_field} = dest.{key_field}' for key_field in key_fields])
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .whenNotMatchedBySourceDelete()
    ).execute()

In [0]:
KEYS = {
    'offices': ['office_code'], 
    'employees': ['employee_number'], 
    'customers': ['customer_number'], 
    'payments': ['customer_number', 'check_number'], 
    'product_lines': ['product_line'], 
    'products': {'product_code'}, 
    'orders': ['order_number'], 
    'orderdetails': ['order_number', 'order_line_number']
}

for table in TABLES:
    do_merge(
        source=parquet_dataframes[table],
        destination=delta_dataframes[table],
        key_fields=KEYS[table]
    )

# Consultas na base

Usamos os lidos do PostgreSQL no início.

## Qual país possui a maior quantidade de itens cancelados?

In [0]:
for table in TABLES:
    dataframes[table].createOrReplaceTempView(table)

In [0]:
spark.sql("""
    select c.country, count(*) as qtd_cancelados
    from 
        customers c 
        join orders o on o.customer_number = c.customer_number 
        join orderdetails d on d.order_number = o.order_number 
    group by c.country 
    order by qtd_cancelados desc
    limit 1
""").collect()

Out[28]: [Row(country='USA', qtd_cancelados=1004)]

## Qual o faturamento da linha de produto mais vendido, considere como os itens Shipped, cujo o pedido foi realizado no ano de 2005?

In [0]:
spark.sql("""
    select p.product_line, sum(d.quantity_ordered * d.price_each) as faturamento
    from
        products p
        join orderdetails d on d.product_code = p.product_code
        join orders o on d.order_number = o.order_number 
        and o.order_date between '2005-01-01' and '2005-12-31' 
        and o.status = 'Shipped'
    group by p.product_line 
    order by faturamento desc
    limit 1
""").collect()

Out[27]: [Row(product_line='Classic Cars', faturamento=Decimal('603666.99'))]

## Nome, sobrenome e e-mail dos vendedores do Japão, o local-part do e-mail deve estar mascarado.

In [0]:
spark.sql("""
    select e.first_name, e.last_name, '*****@' || split(e.email, '@')[1] as email
    from 
        employees e
        join offices o on e.office_code = o.office_code 
            and o.country = 'Japan'
""").collect()

Out[30]: [Row(first_name='Mami', last_name='Nishi', email='*****@classicmodelcars.com'),
 Row(first_name='Yoshimi', last_name='Kato', email='*****@classicmodelcars.com')]