In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
from delta.tables import *
from pyspark.sql import functions as F

In [0]:
jdbcUsername = 'wlkkathxvpzhpvfcveqrxeig@psql-mock-database-cloud'
jdbcPassword = 'vdqamaealbmsocpjmeeprcwf'
jdbcHostname = 'psql-mock-database-cloud.postgres.database.azure.com'
jdbcPort = 5432
jdbcDatabase = 'ecom1691087984366inbesicysqojdtiq'

In [0]:
spark = (SparkSession.builder
        .appName('ListTablesExample')
        .getOrCreate())

jdbcUrl = f'jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}'


In [0]:
def lerTabela(tabela):
    df = spark.read.jdbc(url=jdbcUrl, table= tabela, properties={
    'user': jdbcUsername,
    'password': jdbcPassword,
    'driver': 'org.postgresql.Driver'
    })
    return df

def deletaTabelas():
    # Caminho do diretório que você deseja dropar (excluir)
    caminho_do_diretorio = "dbfs:/user/hive/warehouse/"

    # Dropar (excluir) o diretório recursivamente
    dbutils.fs.rm(caminho_do_diretorio, True)

In [0]:
tabelas = ['customers', 'employees', 'offices', 'orderdetails', 'orders', 'payments', 'product_lines', 'products']

for tabela in tabelas:
    try:
        print(f'lendo tabela: {tabela}.')
        df = lerTabela(tabela)
        
        parquetFilePath = f'dbfs:/ecommerce_db/{tabela}.parquet'
        df.write.format('parquet').mode('overwrite').parquet(parquetFilePath)
    
    except Exception as e:
        print(e)

    else:
        print('parquet criado com sucesso')


lendo tabela: customers.
parquet criado com sucesso
lendo tabela: employees.
parquet criado com sucesso
lendo tabela: offices.
parquet criado com sucesso
lendo tabela: orderdetails.
parquet criado com sucesso
lendo tabela: orders.
parquet criado com sucesso
lendo tabela: payments.
parquet criado com sucesso
lendo tabela: product_lines.
parquet criado com sucesso
lendo tabela: products.
parquet criado com sucesso


In [0]:
%fs ls dbfs:/ecommerce_db/

path,name,size,modificationTime
dbfs:/ecommerce_db/customers.parquet/,customers.parquet/,0,0
dbfs:/ecommerce_db/employees.parquet/,employees.parquet/,0,0
dbfs:/ecommerce_db/offices.parquet/,offices.parquet/,0,0
dbfs:/ecommerce_db/orderdetails.parquet/,orderdetails.parquet/,0,0
dbfs:/ecommerce_db/orders.parquet/,orders.parquet/,0,0
dbfs:/ecommerce_db/payments.parquet/,payments.parquet/,0,0
dbfs:/ecommerce_db/product_lines.parquet/,product_lines.parquet/,0,0
dbfs:/ecommerce_db/products.parquet/,products.parquet/,0,0


In [0]:
pathEcommerceFiles = 'dbfs:/ecommerce_db'
arquivos = dbutils.fs.ls(pathEcommerceFiles)
columns = {}

for arquivo in arquivos:
    nomeFormatado = str.replace(arquivo.name, '.parquet/','')
    deltaTableName = 'delta_' + nomeFormatado
    df = spark.read.format('parquet').load(str(arquivo.path))
    columns[nomeFormatado] = df.columns

    try:
        if spark.catalog.tableExists(deltaTableName):
            print(f'A tabela {deltaTableName} já existe.')
        else:
            df.write.format('delta').saveAsTable(deltaTableName)
            print('Tabela', deltaTableName, 'criada com sucesso')

    except Exception as e:
        print(e)

Tabela delta_customers criada com sucesso
Tabela delta_employees criada com sucesso
Tabela delta_offices criada com sucesso
Tabela delta_orderdetails criada com sucesso
Tabela delta_orders criada com sucesso
Tabela delta_payments criada com sucesso
Tabela delta_product_lines criada com sucesso
Tabela delta_products criada com sucesso


In [0]:
pathEcommerceFiles = 'dbfs:/ecommerce_db'
arquivos = dbutils.fs.ls(pathEcommerceFiles)
colunasDelta = {}
primaryKey = {'delta_offices':'office_code',
              'delta_customers':'custumer_number',
              'delta_employees':'employee_number',
              'delta_orderdetails': 'order_number',
              'delta_orders':'order_number',
              'delta_payments':'customer_number',
              'delta_product_lines':'product_line',
              'delta_products':'product_code'}

for arquivo in arquivos:
    nomeFormatado = str.replace(arquivo.name, '.parquet/','')
    deltaTableName = 'delta_' + nomeFormatado
    dfParquet = spark.read.format('parquet').load(str(arquivo.path))
    
    colunasDelta.update(dfParquet.columns)

    try:
        df_delta = spark.read.format('delta').table(deltaTableName)
        
        deltaTable = DeltaTable.forPath(spark, deltaTableName)
        deltaTable.alias('deltaTable').merge(dfParquet.alias('dfParquet'), f'deltaTable.{primaryKey[deltaTableName]} = dfParquet.{primaryKey[deltaTableName]}') \
                        .whenMatchedUpdate(set=dict((col, f'dfParquet.{col}') for col in colunasDelta)) \
                        .whenNotMatchedInsert(values=dict((col, f'dfParquet.{col}') for col in colunasDelta)) \
                        .whenNotMatchedDelete() \
                        .execute()

        print('Dados mesclados com sucesso para a tabela', deltaTableName)

    except Exception as e:
        print(e)




In [0]:
%sql
select 
	c.country, 
	count(country) as contador 
from delta_orders o
join delta_customers c on o.customer_number = c.customer_number 
where o.status = 'Cancelled'
group by c.country
order by contador desc
limit 1

country,contador
New Zealand,2


In [0]:
%sql
select
	o2.product_code, 
	sum(price_each * quantity_ordered) as faturado
from delta_orders o 
join delta_orderdetails o2 on o.order_number = o2.order_number 
					and o.status = 'Shipped' 
					and extract(year from order_date) = 2005
group by product_code
order by faturado desc
limit 1

product_code,faturado
S18_3232,52978.28


In [0]:
%sql
select 
	first_name, 
	last_name, 
	regexp_replace(email, '^[^@]+', 'xxx') as email_mascarado
from delta_employees e
join delta_offices o on e.office_code = o.office_code 
			  and country = 'Japan'

first_name,last_name,email_mascarado
Mami,Nishi,xxx@classicmodelcars.com
Yoshimi,Kato,xxx@classicmodelcars.com
