In [12]:
%pip install duckdb

Note: you may need to restart the kernel to use updated packages.


In [13]:
import duckdb 
import os
import time

#Importa as Libs e cria o Banco de Dados onde será persistido posteriormente os dados.
con = duckdb.connect(database="ibge.db",read_only=False)
# Apenas para verificar execução.


In [14]:
#Cria a tabela se ela não existir, definida por DATA_IBGE
#COD_UF
#COD_MUN
#COD_ESPECIE
#LATITUDE
#LONGITUDE

con.execute(

"""

  CREATE TABLE IF NOT EXISTS DATA_IBGE (
  COD_UF       INTEGER  NOT NULL
  ,COD_MUN      INTEGER  NOT NULL
  ,COD_ESPECIE  INTEGER  NOT NULL
  ,LATITUDE       NUMERIC(9,6) NOT NULL
  ,LONGITUDE      NUMERIC(10,6) NOT NULL
  ,NV_GEO_COORD INTEGER  NOT NULL
) 
"""
)

<duckdb.duckdb.DuckDBPyConnection at 0x1ad774ceff0>

In [15]:
#Verificando se a tabela foi criada.
con.sql("SELECT * FROM DATA_IBGE")


┌────────┬─────────┬─────────────┬──────────────┬───────────────┬──────────────┐
│ COD_UF │ COD_MUN │ COD_ESPECIE │   LATITUDE   │   LONGITUDE   │ NV_GEO_COORD │
│ int32  │  int32  │    int32    │ decimal(9,6) │ decimal(10,6) │    int32     │
├──────────────────────────────────────────────────────────────────────────────┤
│                                    0 rows                                    │
└──────────────────────────────────────────────────────────────────────────────┘

```
Uma outra possibilidade, seria utilizando o pyarrow para processamento, localizando os arquivos localmente, salvando-os em parquet, 
mas para esta solução, optamos pelo DuckDB.


import pyarrow as pa
import pyarrow.csv as pacsv
import pyarrow.parquet as papq
import os
# Especifique o diretório que você deseja listar
path = os.getcwd()
# Lista todos os arquivos e diretórios no diretório especificado
arquivo = os.listdir(path)

database_path = r"C:\Users\wilke\OneDrive\Área de Trabalho\Unifor\Engenharia de dados\IBGE\Pipe\database"
tables = []

for arquivos in arquivo:
    if arquivos.endswith('.csv'):
        files_path = os.path.join(path, arquivos)
        try:
            table = pacsv.read_csv(files_path)
            tables.append(table)
        except Exception as e:
            print(f"Erro ao ler {arquivo}: {e}")

temp = r"C:\Users\wilke\OneDrive\Área de Trabalho\Unifor\Engenharia de dados\IBGE\Pipe\database\ibge.parquet"

database = pa.concat_tables(tables)

papq.write_table(database,temp,compression='snappy')
        
print("Conversão concluída.")
```
      

In [16]:
#Obtendo caminho local
#Uma outra possibilidade seria a solução também, ser utilizada lendo diretamente em outros serviços, o google collab para fins de estudos poderia ler diretamente no google drive, fazendo a conexão com o google drive ou demais fontes conectoras em nuvem.
#No caso utilizei a leitura local.

path = os.getcwd()

print(path)



c:\Users\wilke\OneDrive\Área de Trabalho\Unifor\Engenharia de dados\IBGE\Pipe


In [17]:
%%time

con.sql(f"COPY (SELECT * FROM '{path}/*.csv') TO 'data'   (FORMAT PARQUET,PER_THREAD_OUTPUT true, COMPRESSION Snappy,OVERWRITE_OR_IGNORE 1, ROW_GROUP_SIZE 100_000);")
#salvando em parquet no folder data, onde poderia ser uma outra camada do processo, dependendo da arquitetura, parametros de encoding viabilizados pela documentação


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

CPU times: total: 1min 7s
Wall time: 18.1 s


No nosso projeto, estamos dividindo os parquets em 7 partes, isto pode ser utilizado com outras estratégias de armazenamento como no exemplo a seguir:

```bash
├── year=2021
│ ├── month=1
│ │ ├── data_1.parquet
│ │ └── data_2.parquet
│ └── month=2
│ └── data_1.parquet
└── year=2022
├── month=11
│ ├── data_1.parquet
│ └── data_2.parquet
└── month=12
└── data_1.parquet
```

In [18]:
%%time
con.sql(f"INSERT INTO DATA_IBGE SELECT * FROM '{path}/data/*.parquet'")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

CPU times: total: 45.5 s
Wall time: 10.6 s


In [19]:
%%time
con.sql("SELECT COUNT(*)  AS TOTAL_DE_LINHAS FROM DATA_IBGE")
#Verificando o total de linhas inseridas


CPU times: total: 0 ns
Wall time: 0 ns


┌─────────────────┐
│ TOTAL_DE_LINHAS │
│      int64      │
├─────────────────┤
│       111102875 │
└─────────────────┘

In [20]:
%%time
con.sql('SELECT COD_UF as UF,COUNT(COD_UF) AS Total_de_Registros FROM DATA_IBGE GROUP BY COD_UF')


CPU times: total: 0 ns
Wall time: 0 ns


┌───────┬────────────────────┐
│  UF   │ Total_de_Registros │
│ int32 │       int64        │
├───────┼────────────────────┤
│    11 │             965370 │
│    12 │             410524 │
│    13 │            1658970 │
│    14 │             260515 │
│    15 │            3911170 │
│    16 │             312665 │
│    17 │             848437 │
│    21 │            3257843 │
│    22 │            1891421 │
│    23 │            4750642 │
│     · │               ·    │
│     · │               ·    │
│     · │               ·    │
│    32 │            2221348 │
│    33 │            8962200 │
│    35 │           22953725 │
│    41 │            6122025 │
│    42 │            4181139 │
│    43 │            6456747 │
│    50 │            1507819 │
│    51 │            1985314 │
│    52 │            3960937 │
│    53 │            1318887 │
├───────┴────────────────────┤
│     27 rows (20 shown)     │
└────────────────────────────┘

In [21]:
con.close()




In [22]:
# Utilizado a documentação do Duck DB referenciada: https://duckdb.org/docs/guides/index
# Parquet Files https://duckdb.org/docs/data/partitioning/partitioned_writes
# How to head CSV Files https://duckdb.org/docs/guides/import/csv_import
