In [103]:
from deltalake.writer import write_deltalake
from deltalake import DeltaTable
import duckdb
import pandas
import os
from dotenv import load_dotenv

In [104]:
# Carrega as variáveis do arquivo .env
load_dotenv()

# Carrega as variáveis do arquivo .env
load_dotenv()

# Acessa as credenciais
azure_storage_account_name = os.getenv('AZURE_STORAGE_ACCOUNT_NAME')
azure_storage_access_key = os.getenv('AZURE_STORAGE_ACCESS_KEY')
azure_storage_client_id = os.getenv('AZURE_STORAGE_CLIENT_ID')
azure_storage_client_secret = os.getenv('AZURE_STORAGE_CLIENT_SECRET')
azure_storage_tenant_id = os.getenv('AZURE_STORAGE_TENANT_ID')

In [105]:
con = duckdb.connect()

In [106]:
con.sql("LOAD delta;")
con.sql("LOAD azure;")

In [107]:
con.sql(f"""
CREATE SECRET azure_spn (
    TYPE AZURE,
    PROVIDER SERVICE_PRINCIPAL,
    TENANT_ID '{azure_storage_tenant_id}',
    CLIENT_ID '{azure_storage_client_id}',
    CLIENT_SECRET '{azure_storage_client_secret}',
    ACCOUNT_NAME '{azure_storage_account_name}'
);
""")


┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ true    │
└─────────┘

### Carga Completa arquivos

In [108]:
storage_options = {
    'AZURE_STORAGE_ACCOUNT_NAME': azure_storage_account_name,
    'AZURE_STORAGE_ACCESS_KEY': azure_storage_access_key,
    'AZURE_STORAGE_CLIENT_ID': azure_storage_client_id,
    'AZURE_STORAGE_CLIENT_SECRET': azure_storage_client_secret, 
    'AZURE_STORAGE_TENANT_ID': azure_storage_tenant_id
}


In [109]:
def escreve_delta(df,table_name, modoEscrita):
    uri= f'az://bronze/vendas/{table_name}'
    write_deltalake( 
        uri,
        df,
        mode=modoEscrita,
        storage_options=storage_options
    )

In [110]:
def ler_delta(table_name):
    uri= f'az://bronze/vendas/{table_name}'
    
    dt = DeltaTable(uri, storage_options=storage_options)
    return dt

In [111]:
def tabela_delta_existe(table_name):
    uri = f'az://bronze/vendas/{table_name}'
    try:
        DeltaTable(uri, storage_options=storage_options)
        return True
    except Exception as e:
        return False

## Escreve arquivos caso não exista tabelas

In [74]:
# Função escrever arquivos
arquivos = ['brands', 'categories', 'customers', 'products', 'staffs', 'stores','order_items', 'orders','stocks'] 
for tabela in arquivos:
    if not tabela_delta_existe(tabela):
        df = con.sql(f"""
            SELECT * FROM 'abfss://landing/bike_store/{tabela}.csv'
            """).to_df()
        tabela_dtl = escreve_delta(df,tabela,'append')


## Incremental

### Dimensões

In [40]:
arquivos = ['brands', 'categories', 'customers', 'products', 'staffs', 'stores']  

for tabela in arquivos:
    new_df = con.sql(f"""
        SELECT * FROM 'abfss://landing/bike_store/{tabela}.csv'
        """).to_df()
        
    tabela_dtl = ler_delta(tabela)
    # Define a coluna a ser usada na comparação
    if tabela == 'categories':
        coluna = 'category'
    else:
        coluna = tabela[:-1]

    # Realiza o merge se a tabela Delta já existe
    (
    tabela_dtl.merge(
        source=new_df,
        predicate=f'target.{coluna}_id = source.{coluna}_id',
        source_alias="source",
        target_alias="target"
    ).when_not_matched_insert_all().execute()
    )


In [39]:
customers = ler_delta('customers')
customers = customers.to_pandas()
customers

Unnamed: 0,customer_id,first_name,last_name,phone,email,street,city,state,zip_code
0,1,Debra,Burks,,debra.burks@yahoo.com,9273 Thorne Ave.,Orchard Park,NY,14127
1,2,Kasha,Todd,,kasha.todd@yahoo.com,910 Vine Street,Campbell,CA,95008
2,3,Tameka,Fisher,,tameka.fisher@aol.com,769C Honey Creek St.,Redondo Beach,CA,90278
3,4,Daryl,Spence,,daryl.spence@aol.com,988 Pearl Lane,Uniondale,NY,11553
4,5,Charolette,Rice,(916) 381-6003,charolette.rice@msn.com,107 River Dr.,Sacramento,CA,95820
...,...,...,...,...,...,...,...,...,...
1440,1441,Jamaal,Morrison,,jamaal.morrison@msn.com,796 SE. Nut Swamp St.,Staten Island,NY,10301
1441,1442,Cassie,Cline,,cassie.cline@gmail.com,947 Lafayette Drive,Brooklyn,NY,11201
1442,1443,Lezlie,Lamb,,lezlie.lamb@gmail.com,401 Brandywine Street,Central Islip,NY,11722
1443,1444,Ivette,Estes,,ivette.estes@gmail.com,88 N. Canterbury Ave.,Canandaigua,NY,14424


### Fatos

#### order_items

In [115]:
order_items = ler_delta('order_items')
order_items  = order_items.to_pandas()

df = con.sql(f"""
    with dlt_order_items AS (
        SELECT * FROM order_items
    ),
    arquivo_order_items AS (
        SELECT * FROM 'abfss://landing/bike_store/order_items.csv'
    )
        SELECT 
            AR.*
        FROM arquivo_order_items AR
        LEFT JOIN dlt_order_items DLT
        ON hash(AR.order_id, AR.product_id) = hash(DLT.order_id, DLT.product_id)
        WHERE DLT.order_id IS NULL
            """).to_df()

if len(df) > 0:
    escreve_delta(df,'order_items','append')


In [94]:
order_items = ler_delta('order_items')
order_items = order_items.to_pandas()
order_items

Unnamed: 0,order_id,item_id,product_id,quantity,list_price,discount
0,1,1,20,1,599.99,0.20
1,1,2,8,2,1799.99,0.07
2,1,3,10,2,1549.00,0.05
3,1,4,16,2,599.99,0.05
4,1,5,4,1,2899.99,0.20
...,...,...,...,...,...,...
4717,1614,2,159,2,2299.99,0.07
4718,1614,3,213,2,269.99,0.20
4719,1615,1,197,2,2299.99,0.20
4720,1615,2,214,1,899.99,0.07


In [112]:
df_order_items = con.sql("SELECT * FROM 'abfss://landing/bike_store/order_items.csv'").to_df()

# Verifique se o DataFrame foi carregado corretamente
df_order_items.head()

Unnamed: 0,order_id,item_id,product_id,quantity,list_price,discount
0,1,1,20,1,599.99,0.2
1,1,2,8,2,1799.99,0.07
2,1,3,10,2,1549.0,0.05
3,1,4,16,2,599.99,0.05
4,1,5,4,1,2899.99,0.2


#### orders

In [81]:
orders = ler_delta('orders')
orders = orders.to_pandas()

df = con.sql(f"""
    WITH arquivo_orders AS (
        SELECT 
            * 
        FROM 'abfss://landing/bike_store/orders.csv'
    ),
    dlt_orders AS (
        SELECT 
            MAX(order_date) AS order_date
        FROM orders
    )
    SELECT 
        AR.*
    FROM arquivo_orders AR
    WHERE AR.order_date > (
        SELECT 
            DLT.order_date 
        FROM dlt_orders AS DLT
        )
""").to_df()

if len(df) > 0:
    escreve_delta(df,'orders','append')


In [82]:
orders = ler_delta('orders')
orders = orders.to_pandas()
orders.head(5)


Unnamed: 0,order_id,customer_id,order_status,order_date,required_date,shipped_date,store_id,staff_id
0,1616,137,3,2024-12-28,2018-12-28,,3,8
1,1,259,4,2016-01-01,2016-01-03,2016-01-03,1,2
2,2,1212,4,2016-01-01,2016-01-04,2016-01-03,2,6
3,3,523,4,2016-01-02,2016-01-05,2016-01-03,2,7
4,4,175,4,2016-01-03,2016-01-04,2016-01-05,1,3


#### stocks

In [53]:
dados = con.sql(f"""
    SELECT
        *
    FROM 'abfss://landing/bike_store/stocks.csv'     
""").to_df()

escreve_delta(dados,'stocks','overwrite')

In [54]:
stocks = ler_delta('stocks')
stocks =stocks.to_pandas()
stocks.head(5)


Unnamed: 0,store_id,product_id,quantity
0,1,1,27
1,1,2,5
2,1,3,6
3,1,4,23
4,1,5,22


In [55]:
len(stocks)

939

## Fecha conexões

In [84]:
con.close()