In [70]:
from prefect import task, flow
import duckdb
#from core.duckdb.silver_layer import deduplication, load_parquet_from_bucket, setup_duckdb_connection, save_parquet_to_bucket
from dotenv import load_dotenv
import os
from datetime import datetime

# Load environment variables from .env file
load_dotenv()

# load env variables
ACCESS = os.getenv("GCS_ACCESS_KEY")
SECRET = os.getenv("GCS_SECRET")
BUCKET_NAME = os.getenv("GCS_BUCKET_NAME")
BUCKET_PATH_BRONZE = os.getenv("GCS_BUCKET_PATH_BRONZE")
BUCKET_PATH_SILVER = os.getenv("GCS_BUCKET_PATH_SILVER")
BUCKET_PATH_GOLD = os.getenv("GCS_BUCKET_PATH_GOLD")


In [71]:
def setup_duckdb_connection(ACCESS, SECRET, db_path: str = ":memory:", read_only: str = False):
    """
    Sets up a duckdb connection and configures it for S3 access.
    """
    try:
        duckdb_connection = duckdb.connect(
            database=db_path, read_only=read_only
        )
        duckdb_connection.sql("INSTALL httpfs")
        duckdb_connection.sql("LOAD httpfs")
        duckdb_connection.sql(f"SET s3_access_key_id='{ACCESS}'")
        duckdb_connection.sql(f"SET s3_secret_access_key='{SECRET}'")
        duckdb_connection.sql("SET s3_endpoint='storage.googleapis.com'")
        return duckdb_connection
    except Exception as e:
        print(f"Failed to setup DuckDB connection: {e}")
        raise  
    

def load_parquet_from_bucket(duckdb_connection, table_name, bucket_path):
    """
    Loads parquet data from S3 and creates a table in DuckDB
    """
    try:
        duckdb_connection.sql(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM 's3://{bucket_path}'")
    except Exception as e:
        print(f"Failed to load parquet from bucket: {e}")
        raise

duckdb_conn = setup_duckdb_connection(ACCESS, SECRET)


### Bronze data

In [16]:
bucket_path_bronze = f"{BUCKET_NAME}/{BUCKET_PATH_BRONZE}"
load_parquet_from_bucket(duckdb_conn, table_name='bronze_table', bucket_path=f"{bucket_path_bronze}/year=2023/month=07/day=27/*/*/*.parquet")
df_bronze = duckdb_conn.sql("SELECT * FROM bronze_table").df()
df_bronze.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 887 entries, 0 to 886
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   date          887 non-null    object 
 1   aisle_name    887 non-null    object 
 2   product_name  887 non-null    object 
 3   brand         838 non-null    object 
 4   price         887 non-null    float64
 5   package       887 non-null    object 
 6   store_name    887 non-null    object 
 7   store_city    887 non-null    object 
 8   search_term   887 non-null    object 
dtypes: float64(1), object(8)
memory usage: 62.5+ KB


In [29]:
bucket_path_bronze = f"{BUCKET_NAME}/{BUCKET_PATH_BRONZE}"
load_parquet_from_bucket(duckdb_conn, table_name='bronze_table', bucket_path=f"{bucket_path_bronze}/year=2023/month=07/day=27/*/*/*.parquet")
df_bronze = duckdb_conn.sql("SELECT * FROM bronze_table").df()
df_bronze.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1100 entries, 0 to 1099
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   date          1100 non-null   object 
 1   aisle_name    1100 non-null   object 
 2   product_name  1100 non-null   object 
 3   brand         1047 non-null   object 
 4   price         1100 non-null   float64
 5   package       1100 non-null   object 
 6   store_name    1100 non-null   object 
 7   store_city    1100 non-null   object 
 8   search_term   1100 non-null   object 
dtypes: float64(1), object(8)
memory usage: 77.5+ KB


In [30]:
df_bronze.search_term.unique()

array(['carne', 'fini', 'fruta', 'haribo', 'limpeza', 'shampoo',
       'verdura', '~q'], dtype=object)

In [95]:
df_bronze.head()

Unnamed: 0,date,aisle_name,product_name,brand,price,package,store_name,store_city,search_term
0,26-07-2023,Balas de gelatina,Goma sortida,Dori,14.9,1kg,Atacadão,FLN,Dori
1,26-07-2023,Balas de gelatina,Gomets minhoca acida,Dori,11.79,600g,Atacadão,FLN,Dori
2,26-07-2023,Balas de gelatina,Bala goma tubo creme frutas,Dori,16.9,30 unidades,Atacadão,FLN,Dori
3,26-07-2023,Chocolates para confeitaria,Confeito de chocolate colorido Chococandy,Dori,14.89,500g,Atacadão,FLN,Dori
4,26-07-2023,Chocolates para confeitaria,Granulado sabor chocolate,Dori,3.29,Pacote 120g,Atacadão,FLN,Dori


### Silver data

#### load transaction_info

In [79]:
bucket_path_silver = f"{BUCKET_NAME}/{BUCKET_PATH_SILVER}" 
load_parquet_from_bucket(duckdb_conn, table_name='transaction_info', bucket_path=f"{bucket_path_silver}/year=2023/month=07/day=26/transaction_info.parquet")
df_transaction_info = duckdb_conn.sql("SELECT * FROM transaction_info").df()
df_transaction_info.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 918 entries, 0 to 917
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   date        918 non-null    datetime64[ns]
 1   product_id  918 non-null    object        
 2   price       918 non-null    float64       
 3   store_id    918 non-null    object        
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 28.8+ KB


In [80]:
df_transaction_info.head()

Unnamed: 0,date,product_id,price,store_id
0,2023-07-26,Balas de gelatinaGoma sortidaDori1kgDori,14.9,AtacadãoFLN
1,2023-07-26,Balas de gelatinaGomets minhoca acidaDori600gDori,11.79,AtacadãoFLN
2,2023-07-26,Balas de gelatinaBala goma tubo creme frutasDo...,16.9,AtacadãoFLN
3,2023-07-26,Chocolates para confeitariaConfeito de chocola...,14.89,AtacadãoFLN
4,2023-07-26,Chocolates para confeitariaGranulado sabor cho...,3.29,AtacadãoFLN


In [99]:
bucket_path_silver = f"{BUCKET_NAME}/{BUCKET_PATH_SILVER}" 
load_parquet_from_bucket(duckdb_conn, table_name='transaction_info', bucket_path=f"{bucket_path_silver}/year=2023/month=07/day=26/transaction_info.parquet")
df_transaction_info = duckdb_conn.sql("SELECT * FROM transaction_info").df()
df_transaction_info.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 918 entries, 0 to 917
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   date        918 non-null    datetime64[ns]
 1   price       918 non-null    float64       
 2   product_id  918 non-null    uint64        
 3   store_id    918 non-null    uint64        
dtypes: datetime64[ns](1), float64(1), uint64(2)
memory usage: 28.8 KB


#### load store_info

In [76]:
bucket_path_silver = f"{BUCKET_NAME}/{BUCKET_PATH_SILVER}" 
load_parquet_from_bucket(duckdb_conn, table_name='store_info', bucket_path=f"{bucket_path_silver}/year=2023/month=07/day=26/store_info.parquet")
df_store_info = duckdb_conn.sql("SELECT * FROM store_info").df()
df_store_info.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12 entries, 0 to 11
Data columns (total 3 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   store_id    12 non-null     object
 1   store_name  12 non-null     object
 2   store_city  12 non-null     object
dtypes: object(3)
memory usage: 416.0+ bytes


In [81]:
df_store_info.head()

Unnamed: 0,store_id,store_name,store_city
0,AtacadãoFLN,Atacadão,FLN
1,Pague MenosFLN,Pague Menos,FLN
2,Droga RaiaFLN,Droga Raia,FLN
3,PanvelFLN,Panvel,FLN
4,Drogaria CatarinenseFLN,Drogaria Catarinense,FLN


#### load product_info


In [31]:
bucket_path_silver = f"{BUCKET_NAME}/{BUCKET_PATH_SILVER}" 
load_parquet_from_bucket(duckdb_conn, table_name='product_info', bucket_path=f"{bucket_path_silver}/year=2023/month=07/day=27/product_info_*.parquet")
df_product_info = duckdb_conn.sql("SELECT * FROM product_info").df()
df_product_info.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 690 entries, 0 to 689
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   product_id    690 non-null    uint64
 1   aisle_name    690 non-null    object
 2   product_name  690 non-null    object
 3   brand         690 non-null    object
 4   package       690 non-null    object
 5   search_term   690 non-null    object
dtypes: object(5), uint64(1)
memory usage: 32.5+ KB


In [33]:
df_product_info.search_term.unique()

array(['carne', 'fini', 'fruta', 'haribo', 'limpeza', 'verdura', '~q',
       'shampoo'], dtype=object)

In [106]:
df_product_info.head()

Unnamed: 0,product_id,aisle_name,product_name,brand,package,search_term
0,3916981574898139977,Balas de gelatina,Goma sortida,Dori,1kg,Dori
1,16616297229445218020,Balas de gelatina,Gomets minhoca acida,Dori,600g,Dori
2,4230605146124349718,Balas de gelatina,Bala goma tubo creme frutas,Dori,30 unidades,Dori
3,5871477341612877546,Chocolates para confeitaria,Confeito de chocolate colorido Chococandy,Dori,500g,Dori
4,6937026296695067779,Chocolates para confeitaria,Granulado sabor chocolate,Dori,Pacote 120g,Dori


### Load Gold data

In [72]:
all_gold_files_path = f"{BUCKET_NAME}/{BUCKET_PATH_GOLD}/year=2023/month=07/day=27/*.parquet"
load_parquet_from_bucket(duckdb_conn, table_name='agg_table', bucket_path=all_gold_files_path)

In [73]:
duckdb_conn.sql('DESCRIBE agg_table')

┌──────────────┬─────────────┬─────────┬─────────┬─────────┬───────┐
│ column_name  │ column_type │  null   │   key   │ default │ extra │
│   varchar    │   varchar   │ varchar │ varchar │ varchar │ int32 │
├──────────────┼─────────────┼─────────┼─────────┼─────────┼───────┤
│ date         │ TIMESTAMP   │ YES     │ NULL    │ NULL    │  NULL │
│ aisle_name   │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ product_name │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ brand        │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ package      │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ search_term  │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ price        │ DOUBLE      │ YES     │ NULL    │ NULL    │  NULL │
│ store_name   │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ store_city   │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
└──────────────┴─────────────┴─────────┴─────────┴─────────┴───────┘

In [45]:
all_gold_files_path = f"{BUCKET_NAME}/{BUCKET_PATH_GOLD}/year=2023/month=07/day=27/*.parquet"
load_parquet_from_bucket(duckdb_conn, table_name='agg_table', bucket_path=all_gold_files_path)
df = duckdb_conn.sql("SELECT * FROM agg_table").df()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 887 entries, 0 to 886
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   date          887 non-null    datetime64[ns]
 1   aisle_name    887 non-null    object        
 2   product_name  887 non-null    object        
 3   brand         887 non-null    object        
 4   package       887 non-null    object        
 5   search_term   887 non-null    object        
 6   price         887 non-null    float64       
 7   store_name    887 non-null    object        
 8   store_city    887 non-null    object        
dtypes: datetime64[ns](1), float64(1), object(7)
memory usage: 62.5+ KB


In [25]:
all_gold_files_path = f"{BUCKET_NAME}/{BUCKET_PATH_GOLD}/year=2023/month=07/day=27/golden_table_205958.parquet"
load_parquet_from_bucket(duckdb_conn, table_name='agg_table', bucket_path=all_gold_files_path)
df = duckdb_conn.sql("SELECT * FROM agg_table").df()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 887 entries, 0 to 886
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   date          887 non-null    datetime64[ns]
 1   aisle_name    887 non-null    object        
 2   product_name  887 non-null    object        
 3   brand         887 non-null    object        
 4   package       887 non-null    object        
 5   search_term   887 non-null    object        
 6   price         887 non-null    float64       
 7   store_name    887 non-null    object        
 8   store_city    887 non-null    object        
dtypes: datetime64[ns](1), float64(1), object(7)
memory usage: 62.5+ KB


In [48]:
all_gold_files_path

'cornershop-raw/gold/year=2023/month=07/day=27/*.parquet'

In [52]:
all_gold_files_path = f"{BUCKET_NAME}/{BUCKET_PATH_GOLD}/*/*/*/*.parquet"
load_parquet_from_bucket(duckdb_conn, table_name='agg_table', bucket_path=all_gold_files_path)
df = duckdb_conn.sql("SELECT * FROM agg_table").df()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5105 entries, 0 to 5104
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   date          5105 non-null   datetime64[ns]
 1   aisle_name    5105 non-null   object        
 2   product_name  5105 non-null   object        
 3   brand         5105 non-null   object        
 4   package       5105 non-null   object        
 5   search_term   5105 non-null   object        
 6   price         5105 non-null   float64       
 7   store_name    5105 non-null   object        
 8   store_city    5105 non-null   object        
dtypes: datetime64[ns](1), float64(1), object(7)
memory usage: 359.1+ KB


In [53]:
df.search_term.unique()

array(['Dori', 'Doritos', 'Fini', 'Haribo', 'Leite', 'Torcida', 'carne',
       'fini', 'fruta', 'haribo', 'limpeza', 'verdura', '~q', 'shampoo'],
      dtype=object)

_______________

In [56]:
representative_terms = {
    "frutas": [
        "laranja",
        "limão",
        "ameixa",
        "romã",
        "verde",
        "carambola",
        "caqui",
        "carambola",
        "groselha",
        "lichia",
        "verde",
        "kiwi",
        "pêssego",
        "verde",
        "carambola",
        "coco",
        "verde",
        "carambola",
        "abacaxi",
        "verde",
        "carambola",
        "jabuticaba",
        "carambola",
        "groselha",
        "nectarina",
        "verde",
        "carambola",
        "carambola",
        "verde",
        "groselha",
    ],
    "carnes": [
        "peru",
        "toucinho",
        "coxa",
        "tilápia",
        "coelho",
        "lagarto",
        "lombo",
        "porco",
        "chorizo",
        "porco",
        "costeleta",
        "toucinho",
        "picanha",
        "toucinho",
        "chorizo",
        "carne",
        "sol",
        "seca",
        "contrafilé",
        "toucinho",
        "linguiça",
        "frango",
        "peito",
        "toucinho",
        "maminha",
        "toucinho",
        "coelho",
        "lula",
        "toucinho",
        "coelho",
    ],
    "congelados": [
        "kibe",
        "nuggets",
        "bolinho",
        "empada",
        "torta",
        "hambúrguer",
        "torta",
        "hambúrguer",
        "batata",
        "empanado",
        "peixe",
        "torta",
        "croissant",
        "torta",
        "hambúrguer",
        "lasanha",
        "torta",
        "hambúrguer",
        "queijo",
        "pão",
        "torta",
        "batata",
        "frita",
        "torta",
        "esfiha",
        "torta",
        "hambúrguer",
        "hambúrguer",
        "torta",
        "batata",
    ],
    "laticinios": [
        "nata",
        "sorvete",
        "requeijão",
        "coalhada",
        "sorvete",
        "requeijão",
        "leite",
        "condensado",
        "sorvete",
        "requeijão",
        "sorvete",
        "queijo",
        "margarina",
        "sorvete",
        "requeijão",
        "sorvete",
        "requeijão",
        "queijo",
        "iogurte",
        "sorvete",
        "requeijão",
        "manteiga",
        "sorvete",
        "requeijão",
        "queijo",
        "sorvete",
        "requeijão",
        "creme",
        "leite",
        "sorvete",
    ],
    "cereais": [
        "milho",
        "massa",
        "lasanha",
        "cereal",
        "infantil",
        "barra",
        "granola",
        "fubá",
        "arroz",
        "farinha",
        "mandioca",
        "trigo",
        "arroz",
        "trigo",
        "farelo",
        "pipoca",
        "trigo",
        "cevada",
        "farofa",
        "pronta",
        "trigo",
        "biscoito",
        "trigo",
        "fubá",
        "amendoim",
        "fubá",
        "arroz",
        "chia",
        "trigo",
        "fubá",
    ],
    "outros": [
        "óleo",
        "ovo",
        "bala",
        "azeite",
        "óleo",
        "waffer",
        "fini",
        "óleo",
        "waffer",
        "geleia",
        "óleo",
        "waffer",
        "mortadela",
        "óleo",
        "haribo",
        "apresuntado",
        "óleo",
        "waffer",
        "bolo",
        "óleo",
        "waffer",
        "mostarda",
        "óleo",
        "haribo",
        "haribo",
        "óleo",
        "waffer",
        "achocolatado",
        "waffer",
        "apresuntado",
    ],
    "bebidas": [
        "suco",
        "fruta",
        "vodka",
        "refrigerante",
        "vodka",
        "vinho",
        "vodka",
        "vinho",
        "suco",
        "cerveja",
        "vodka",
        "vinho",
        "chá",
        "vodka",
        "vinho",
        "café",
        "vodka",
        "vinho",
        "vinho",
        "vodka",
        "suco",
    ],
    "higiene_pessoal": [
        "tônico",
        "facial",
        "acetona",
        "barbear",
        "tônico",
        "bucal",
        "desodorante",
        "tônico",
        "bucal",
        "hidratante",
        "tônico",
        "bucal",
        "absorvente",
        "dente",
        "cílios",
        "perfume",
        "tônico",
        "blush",
        "cabelo",
        "tônico",
        "dente",
        "bucal",
        "antisséptico",
        "tônico",
        "dental",
        "fio",
        "bucal",
        "higiênico",
        "tônico",
        "cílios",
    ],
    "limpeza": [
        "vidros",
        "mofo",
        "balde",
        "perfumado",
        "vidros",
        "lustra",
        "vassoura",
        "vidros",
        "balde",
        "rejuntes",
        "vidros",
        "lustra",
        "sanitária",
        "vidros",
        "lustra",
        "cera",
        "vidros",
        "vassoura",
        "repelente",
        "vidros",
        "lustra",
        "multiuso",
        "vidros",
        "lustra",
        "limpa",
        "vidros",
        "vassoura",
        "sabão",
        "vidros",
        "lustra",
    ],
    "verduras": [
        "wasabi",
        "cebolinha",
        "hortelã",
        "batata",
        "doce",
        "wasabi",
        "jiló",
        "wasabi",
        "cebola",
        "couve",
        "wasabi",
        "forte",
        "berinjela",
        "wasabi",
        "cebola",
        "chicória",
        "wasabi",
        "forte",
        "pepino",
        "wasabi",
        "cebola",
        "brócolis",
        "forte",
        "ervilha",
        "alho",
        "poró",
        "wasabi",
        "rúcula",
        "wasabi",
        "brócolis",
    ],
}

item_list = [term for terms in representative_terms.values() for term in terms]

In [67]:
item_list

['laranja',
 'limão',
 'ameixa',
 'romã',
 'verde',
 'carambola',
 'caqui',
 'carambola',
 'groselha',
 'lichia',
 'verde',
 'kiwi',
 'pêssego',
 'verde',
 'carambola',
 'coco',
 'verde',
 'carambola',
 'abacaxi',
 'verde',
 'carambola',
 'jabuticaba',
 'carambola',
 'groselha',
 'nectarina',
 'verde',
 'carambola',
 'carambola',
 'verde',
 'groselha',
 'peru',
 'toucinho',
 'coxa',
 'tilápia',
 'coelho',
 'lagarto',
 'lombo',
 'porco',
 'chorizo',
 'porco',
 'costeleta',
 'toucinho',
 'picanha',
 'toucinho',
 'chorizo',
 'carne',
 'sol',
 'seca',
 'contrafilé',
 'toucinho',
 'linguiça',
 'frango',
 'peito',
 'toucinho',
 'maminha',
 'toucinho',
 'coelho',
 'lula',
 'toucinho',
 'coelho',
 'kibe',
 'nuggets',
 'bolinho',
 'empada',
 'torta',
 'hambúrguer',
 'torta',
 'hambúrguer',
 'batata',
 'empanado',
 'peixe',
 'torta',
 'croissant',
 'torta',
 'hambúrguer',
 'lasanha',
 'torta',
 'hambúrguer',
 'queijo',
 'pão',
 'torta',
 'batata',
 'frita',
 'torta',
 'esfiha',
 'torta',
 'hambú

In [63]:
# Using dictionary comprehension to remove duplicates from all lists and preserve the names
supermercado_dict = {name: list(dict.fromkeys(lst)) for name, lst in representative_terms.items()}

unique_items = set(item for lst in supermercado_dict.values() for item in lst)
total_unique_items = len(unique_items)
print("Número total de produtos:", total_unique_items)

Número total de produtos: 146


In [65]:
itens= list(unique_items)

In [66]:
itens

['sorvete',
 'trigo',
 'costeleta',
 'groselha',
 'farinha',
 'vinho',
 'frango',
 'pipoca',
 'iogurte',
 'desodorante',
 'coxa',
 'chorizo',
 'maminha',
 'kibe',
 'perfume',
 'nectarina',
 'laranja',
 'cebolinha',
 'farofa',
 'wasabi',
 'esfiha',
 'rúcula',
 'azeite',
 'ovo',
 'cabelo',
 'cerveja',
 'coelho',
 'condensado',
 'margarina',
 'peito',
 'facial',
 'milho',
 'nuggets',
 'verde',
 'barra',
 'absorvente',
 'croissant',
 'abacaxi',
 'brócolis',
 'higiênico',
 'coalhada',
 'óleo',
 'manteiga',
 'batata',
 'cevada',
 'apresuntado',
 'infantil',
 'hambúrguer',
 'empada',
 'fubá',
 'waffer',
 'acetona',
 'vodka',
 'mofo',
 'nata',
 'multiuso',
 'queijo',
 'leite',
 'cereal',
 'jiló',
 'suco',
 'perfumado',
 'bolo',
 'lasanha',
 'geleia',
 'farelo',
 'lombo',
 'alho',
 'cílios',
 'ameixa',
 'tilápia',
 'pêssego',
 'bala',
 'limpa',
 'cebola',
 'pronta',
 'arroz',
 'antisséptico',
 'caqui',
 'fini',
 'toucinho',
 'coco',
 'bolinho',
 'mandioca',
 'vidros',
 'jabuticaba',
 'blush',
 