<a href="https://colab.research.google.com/github/TandaraJS/ELT_Integrado_Womakers/blob/main/PipelineMulheresTecnologia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Pipeline ELT - Mulheres na Tecnologia**


## **1 - Preparação do Ambiente**

### 1.1 Instalação e importação

In [None]:
# Instalação dos pacotes que não vem por padrão
!pip install dbt-sqlite prefect -q

In [2]:
import requests
import pandas as pd
import sqlite3
from prefect import task, flow, get_run_logger
import subprocess
import dbt
import logging
import yaml
import os

### 1.2 Importação da fonte de dados `CSV`

Arquivo do kaggle contendo dados mundiais sobre mulheres na tecnologia

In [3]:
# Criação do arquivo csv
!wget -O kaggle_survey_2022.csv 'https://raw.githubusercontent.com/paulalcssantos/Desafio-Pipeline-WoMakersCode/refs/heads/main/kaggle_survey_2022_mulheres_dados.csv'

--2025-12-04 19:54:44--  https://raw.githubusercontent.com/paulalcssantos/Desafio-Pipeline-WoMakersCode/refs/heads/main/kaggle_survey_2022_mulheres_dados.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 103121 (101K) [text/plain]
Saving to: ‘kaggle_survey_2022.csv’


2025-12-04 19:54:44 (6.41 MB/s) - ‘kaggle_survey_2022.csv’ saved [103121/103121]



### 1.3 Criação das fontes de dados `SQL`

Criação de um banco contendo dados das participantes do bootcamp e um data Warehouse vazio

In [4]:
# Bootcamp.db
conn_bootcamp = sqlite3.connect('bootcamp.db')

cursor_bootcamp = conn_bootcamp.cursor()

cursor_bootcamp.execute('''
CREATE TABLE IF NOT EXISTS PARTICIPANTES (
  ID_PARTICIPANTE INT,
  NOME VARCHAR(150),
  PAIS_ORIGEM VARCHAR(100)
)
''')

participantes = [
    (1, 'Maria', "Brazil"),
    (2, 'Luana', 'Portugal'),
    (3, 'Camila', 'Brazil'),
    (4, 'Luiza', 'Argentina'),
    (5, 'Silvia', 'Colombia'),
    (6, 'Paola', 'Brazil'),
    (7, 'Vitoria', 'Mexico'),
    (8, 'Caroline', 'Argentina'),
    (9, 'Marta', 'Portugal'),
    (10, 'Ana', 'Brazi')
]

cursor_bootcamp.executemany('INSERT INTO PARTICIPANTES VALUES (?,?,?)', participantes)

conn_bootcamp.commit()
conn_bootcamp.close()
print("Banco de Dados e tabela criados com sucesso!")

Banco de Dados e tabela criados com sucesso!


In [5]:
# Datawarehouse.db
conn_datawarehouse = sqlite3.connect('data_warehouse.db')
conn_datawarehouse.close()

print("Data Warehouse criado com sucesso!")

Data Warehouse criado com sucesso!


### 1.4 Criação da fonte de dados `JSON`

Criando um JSON com as habilidades técnicas

In [6]:
%%writefile habilidades_categorias.json
{
  "Ferramentas de Análise": ["Python", "R", "SQL"],
  "Ferramentas de BI": ["Power Bi", "Tableau", "Looker"],
  "Plataformas de Nuvem": ["AWS", "Google Cloud", "Microsoft Azure"]
}

Writing habilidades_categorias.json


## **2 - Extração e carregamento: `SQL` e`CSV`**

Ingestão das fontes de dados SQL e CSV no Data Warehouse


### 2.1 Configuração do logger

Criação de um arquivo de registros de eventos e mensagens de nível INFO.

In [7]:
# Configuração básica do logging
logging.basicConfig(
    level= logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='pipeline.log',
    filemode='w' #Sobrescreve o arquivo de log a cada execução
)

logger = logging.getLogger()

print("Logging configurado com sucesso!")

Logging configurado com sucesso!


### 2.2 Extração `CSV`

In [8]:
# Função para extrair os dados do kaggle
def extrai_dados_kaggle(path):
  try:
    logger.info(f'Iniciando a extração do arquivo {path}')

    # Leitura dos dados
    df = pd.read_csv(path)

    logger.info(f'Extração do CSV concluída. Leitura de {df.shape[0]} Linhas e {df.shape[1]} colunas ')

    logger.info(f'Os tipos de dados encontrados foram:\n{df.dtypes}')

    return df

  except FileNotFoundError:
    logger.error(f'Arquivo não encontrado: {path}')
    return None

  except Exception as e:
    logger.error(f'Ocorreu um erro durante a extração dos dados: {e}')
    return None

In [9]:
# Extração do dataset do kagle
df_kaggle = extrai_dados_kaggle('kaggle_survey_2022.csv')

if df_kaggle is not None:
  display(df_kaggle.head())

Unnamed: 0,idade,genero,pais,nivel_educacional,anos_programando,cargo_atual,anos_usando_ml,salario_anual_usd,linguagens_usadas,bancos_de_dados_usados,ferramentas_bi_usadas
0,30-34,Woman,Pakistan,Professional doctorate,5-10 years,Data Scientist,Other,,,,
1,35-39,Woman,Nigeria,Master’s degree,< 1 years,"Data Analyst (Business, Marketing, Financial, ...",Non-profit/Service,$0-999,Python;R;SQL,,
2,25-29,Woman,Germany,Bachelor’s degree,3-5 years,"Data Analyst (Business, Marketing, Financial, ...",Computers/Technology,$0-999,Python;SQL,,
3,30-34,Woman,Tunisia,Master’s degree,< 1 years,Data Scientist,Academics/Education,$0-999,Python,,
4,45-49,Woman,Egypt,Doctoral degree,10-20 years,Data Administrator,Computers/Technology,"10,000-14,999",Python;SQL,PostgreSQL;SQLite;Oracle Database;Snowflake,Qlik Sense


### 2.3 Extração `SQL`

In [10]:
# Função para extrair os dados do banco de dados SQL
def extrai_dados_sql(path_database):
  try:
    logger.info(f'Iniciando a extração do banco de dados {path_database}')

    # Criando a conexão com o banco de dados
    con = sqlite3.connect(path_database)

    # Query para selecionar os dados
    query = "SELECT * FROM PARTICIPANTES"

    # Transforma a consulta em um dataframe
    df = pd.read_sql_query(query, con)

    logger.info('Extração do banco de dados realizada com sucesso')

    return df

  except sqlite3.Error as e:
    logger.error(f"Ocorreu um erro ao conectar ao banco de dados")
    return None

  except Exception as e:
    logger.error(f"Ocorreu um erro inesperado ao extrair os dados: {e}")
    return None

  finally: # garante que a conexão seja fechada
    if con:
      con.close()
      logger.info('Conexão com o banco de dados fechada')

In [11]:
# Extração do banco de dados

df_participantes = extrai_dados_sql('bootcamp.db')

if df_participantes is not None:
  display(df_participantes.head())

Unnamed: 0,ID_PARTICIPANTE,NOME,PAIS_ORIGEM
0,1,Maria,Brazil
1,2,Luana,Portugal
2,3,Camila,Brazil
3,4,Luiza,Argentina
4,5,Silvia,Colombia


### 2.4 Carga de dados `CSV` e `SQL` no data warehouse

In [12]:
# Função para o carregamento
def carregar_dados(df, tabela, path_dw):
  # Verificação da extração:
  if df is None:
    logger.warning('Dataframe vazio, Não há dados para serem carregados. Verifique a extração.')
    return

  try:
    logger.info(f'Iniciando a carga dos dados da tabela {tabela}')

    con_dw = sqlite3.connect(path_dw)

    # Uso do to_sql para transformar dataframes em tabelas no banco de dados
    df.to_sql(tabela, con_dw, if_exists = 'replace', index = False)

    logger.info(f"Carga para a tabela '{tabela}' concluída com sucesso!")

  except Exception as e:
    logger.error(f"Ocorreu um erro ao carregar os dados para a tabela '{tabela}': {e}")

  finally:
    if con_dw:
      logger.info("Conexão com o Data Warehouse fechada")

In [13]:
# Carga dos dados CSV e SQL
carregar_dados(df_kaggle, 'kaggle_survey', 'data_warehouse.db')
carregar_dados(df_participantes, 'participantes', 'data_warehouse.db')

## **3 - Extração e carregamento `API` e `JSON`**

### 3.1 Extração dos dados da API de países

In [14]:
# Função para extrair dados de países a partir de uma API
def extrai_dados_paises_api(url_api):
  try:
    logger.info(f'Iniciando a requisição à API {url_api}')

    response = requests.get(url_api)

    if response.status_code == 200:
      dados_paises = response.json() # Converte a resposta para uma estrutura json
      logger.info(f'Requisição à API realizada com sucesso. {len(dados_paises)} dados extraídos.')
      return dados_paises
    else:
      logger.info(f'Falha na requisição à API. Código de status: {response.status_code}')

  except exception as e:
    logger.error(f'Ocorreu um erro ao extrair os dados da API: {e}')

In [15]:
# Extração da API países
url_paises = 'https://restcountries.com/v3.1/all?fields=name,cca3,region'

dados_paises = extrai_dados_paises_api(url_paises)

# Leitura dos dados

if dados_paises is not None:
  df_paises = pd.json_normalize(dados_paises)
  display(df_paises.head())

Unnamed: 0,cca3,region,name.common,name.official,name.nativeName.eng.official,name.nativeName.eng.common,name.nativeName.dzo.official,name.nativeName.dzo.common,name.nativeName.ita.official,name.nativeName.ita.common,...,name.nativeName.ind.official,name.nativeName.ind.common,name.nativeName.fil.official,name.nativeName.fil.common,name.nativeName.hun.official,name.nativeName.hun.common,name.nativeName.nep.official,name.nativeName.nep.common,name.nativeName.khm.official,name.nativeName.khm.common
0,ATG,Americas,Antigua and Barbuda,Antigua and Barbuda,Antigua and Barbuda,Antigua and Barbuda,,,,,...,,,,,,,,,,
1,BTN,Asia,Bhutan,Kingdom of Bhutan,,,འབྲུག་རྒྱལ་ཁབ་,འབྲུག་ཡུལ་,,,...,,,,,,,,,,
2,ITA,Europe,Italy,Italian Republic,,,,,Repubblica italiana,Italia,...,,,,,,,,,,
3,TUV,Oceania,Tuvalu,Tuvalu,Tuvalu,Tuvalu,,,,,...,,,,,,,,,,
4,AIA,Americas,Anguilla,Anguilla,Anguilla,Anguilla,,,,,...,,,,,,,,,,


### 3.2 Extração do JSON habilidades

In [16]:
# Função para extração do json de habilidades

def extrai_dados_json(path_json):
  try:
    logger.info(f'Iniciando a extração do arquivo {path_json}')
   # Leitura dos dados
    df = pd.read_json(path_json)

    logger.info(f'Extração do Json concluída. Leitura de {df.shape[0]} Linhas e {df.shape[1]} colunas ')

    logger.info(f'Os tipos de dados encontrados foram:\n{df.dtypes}')

    return df

  except FileNotFoundError:
    logger.error(f'Arquivo não encontrado: {path_json}')
    return None

  except Exception as e:
    logger.error(f'Ocorreu um erro durante a extração dos dados: {e}')
    return None

In [17]:
# Extração dos dados json
df_json= extrai_dados_json('habilidades_categorias.json')

if df_json is not None:
  display(df_json.head())

Unnamed: 0,Ferramentas de Análise,Ferramentas de BI,Plataformas de Nuvem
0,Python,Power Bi,AWS
1,R,Tableau,Google Cloud
2,SQL,Looker,Microsoft Azure


In [18]:
# Função para alterar o formato do dataframe
def transforma_json(df_json):
  if df_json is None:
    logger.warning('Dados json vazios. Nada a ser transformado. Verifique a extração')
    return None

  df_json_melt = df_json.melt(
      ignore_index = False,
      var_name = 'CATEGORIA',
      value_name = 'HABILIDADE')
  logger.info(f'A transformação do json {df_json} para o formato longo foi realizado com sucesso')
  return df_json_melt

In [19]:
# Tranformação do json em formato longo
df_categorias = transforma_json(df_json)

if df_categorias is not None:
  display(df_categorias.head())

Unnamed: 0,CATEGORIA,HABILIDADE
0,Ferramentas de Análise,Python
1,Ferramentas de Análise,R
2,Ferramentas de Análise,SQL
0,Ferramentas de BI,Power Bi
1,Ferramentas de BI,Tableau


### 3.3 Carga de dados `JSON` no data warehouse

In [20]:
# Carga dos dados json
carregar_dados(df_paises, 'paises', 'data_warehouse.db')
carregar_dados(df_categorias, 'categorias', 'data_warehouse.db')

## **4 - Transformação com dbt**

### 4.1 Configuração do dbt

O dbt permite transformar os dados do data warehouse de forma organizada, versionada e automatizada.

In [21]:
# Iniciando
!dbt init pipeline_mulheres_tecnologia

[0m19:55:38  Running with dbt=1.10.15
[0m19:55:38  Creating dbt configuration folder at /root/.dbt
[0m19:55:38  
Your new dbt project "pipeline_mulheres_tecnologia" was created!

For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:

  https://docs.getdbt.com/docs/configure-your-profile

One more thing:

Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:

  https://community.getdbt.com/

Happy modeling!

[0m19:55:38  Setting up your profile.
Which database would you like to use?
[1] sqlite

(Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)

Enter a number: 1
[0m19:56:36  Profile pipeline_mulheres_tecnologia written to /root/.dbt/profiles.yml using target's sample configuration. Once updated, you'll be able to start developing with dbt.


In [22]:
# Configuração do perfil do dbt
profiles_config = {
    'pipeline_mulheres_tecnologia': {
        'target': 'dev', # define o tipo de ambiente, podendo ser desenvolvimento ou produção
        'outputs': { # configuração do ambiente
            'dev': {
                'type': 'sqlite',
                'threads': 1, # Quantidade de tarefas em parelelo.

                # Parâmetro para versões mais recentes
                'schemas_and_paths': {
                    'main': '../data_warehouse.db'
                },

                # Parâmetro para versões mais antigas
                'database': '../data_warehouse.db',

                # Parâmetro para versões mais antigas
                'schema': 'main',

                # Parâmetro opcional
                'schema_directory': '.'
            }
        }
    }}

# Caminho onde o dbt espera encontrar o arquivo de perfis
dbt_profile_dir = os.path.expanduser('~/.dbt/')
os.makedirs(dbt_profile_dir, exist_ok=True)
profiles_path = os.path.join(dbt_profile_dir, 'profiles.yml')

# Converte o dicionário de configuração em um arquivo yaml
with open(profiles_path, 'w') as f:
    yaml.dump(profiles_config, f)

Verificação da configuração

In [23]:
%cd pipeline_mulheres_tecnologia/
!dbt debug

/content/pipeline_mulheres_tecnologia
[0m19:56:52  Running with dbt=1.10.15
[0m19:56:52  dbt version: 1.10.15
[0m19:56:52  python version: 3.12.12
[0m19:56:52  python path: /usr/bin/python3
[0m19:56:52  os info: Linux-6.6.105+-x86_64-with-glibc2.35
[0m19:56:52  Using profiles dir at /root/.dbt
[0m19:56:52  Using profiles.yml file at /root/.dbt/profiles.yml
[0m19:56:52  Using dbt_project.yml file at /content/pipeline_mulheres_tecnologia/dbt_project.yml
[0m19:56:52  adapter type: sqlite
[0m19:56:52  adapter version: 1.10.0
[0m19:56:53  Configuration:
[0m19:56:53    profiles.yml file [[32mOK found and valid[0m]
[0m19:56:53    dbt_project.yml file [[32mOK found and valid[0m]
[0m19:56:53  Required dependencies:
[0m19:56:53   - git [[32mOK found[0m]

[0m19:56:53  Connection:
[0m19:56:53    database: ../data_warehouse.db
[0m19:56:53    schema: main
[0m19:56:53    schemas_and_paths: {'main': '../data_warehouse.db'}
[0m19:56:53    schema_directory: .
[0m19:56:53  Regi

## 5 - Camada de staging

Preparação inicial dos dados brutos

### 5.1  Criação do diretório de staging

In [24]:
!mkdir /content/pipeline_mulheres_tecnologia/models/staging/

In [25]:
%cd /content/pipeline_mulheres_tecnologia/models/staging

/content/pipeline_mulheres_tecnologia/models/staging


In [26]:
!touch source.yml

### 5.2  Criação de um mapeamento das fontes de dados

Arquivo de configuração `.yml` contendo informações de onde encontrar os dados brutos

In [27]:
%%writefile /content/pipeline_mulheres_tecnologia/models/staging/source.yml
version: 2

sources:
  - name: dados_brutos
    database: data_warehouse.db
    schema: main

    tables:
      - name: kaggle_survey
        description: "Dados brutos da pesquisa Kaggle, filtrado por mulheres na área de dados"
      - name: participantes
        description: "Dados das participantes do bootcamp BI"
      - name: paises
        description: "Dados dos países extraídos da API REST Countries"
      - name: categorias
        description: "Mapeamento das habilidades e categorias, extraído de um arquivo JSON"

Overwriting /content/pipeline_mulheres_tecnologia/models/staging/source.yml


### 5.3 Criação da camada de staging

In [28]:
!touch stg_kaggle_survey.sql

#### 5.3.1 Staging dados kaggle

In [29]:
%%writefile /content/pipeline_mulheres_tecnologia/models/staging/stg_kaggle_survey.sql

SELECT PAIS,
        NIVEL_EDUCACIONAL,
        ANOS_PROGRAMANDO,
        CARGO_ATUAL,
        ANOS_USANDO_ML,
        CASE WHEN SALARIO_ANUAL_USD LIKE '%-%'
             THEN CAST(REPLACE(SUBSTR(SALARIO_ANUAL_USD, 1, INSTR(SALARIO_ANUAL_USD, '-') - 1), ',', '') AS REAL)
             WHEN SALARIO_ANUAL_USD IS NOT NULL
             THEN CAST(REPLACE(SALARIO_ANUAL_USD, ',', '') AS REAL)
             ELSE NULL
        END AS SALARIO_ANUAL_USD,
        LINGUAGENS_USADAS,
        BANCOS_DE_DADOS_USADOS,
        FERRAMENTAS_BI_USADAS
FROM {{ source('dados_brutos', 'kaggle_survey') }}

Overwriting /content/pipeline_mulheres_tecnologia/models/staging/stg_kaggle_survey.sql


#### 5.3.2 Staging dados participantes

In [30]:
!touch stg_participantes.sql

In [31]:
%%writefile /content/pipeline_mulheres_tecnologia/models/staging/stg_participantes.sql
SELECT
  ID_PARTICIPANTE,
  NOME,
  PAIS_ORIGEM
FROM {{ source('dados_brutos', 'participantes') }}

Overwriting /content/pipeline_mulheres_tecnologia/models/staging/stg_participantes.sql


#### 5.3.3 Staging dados paises

In [32]:
!touch stg_paises.sql

In [33]:
%%writefile /content/pipeline_mulheres_tecnologia/models/staging/stg_paises.sql
SELECT "name.common" AS NOME_PAIS,
       cca3          AS CODIGO_PAIS,
       region        AS REGIAO
FROM {{ source('dados_brutos', 'paises') }}

Overwriting /content/pipeline_mulheres_tecnologia/models/staging/stg_paises.sql


In [34]:
%cd /content/pipeline_mulheres_tecnologia

/content/pipeline_mulheres_tecnologia


In [35]:
!dbt run

[0m19:58:26  Running with dbt=1.10.15
[0m19:58:27  Registered adapter: sqlite=1.10.0
[0m19:58:27  Unable to do partial parsing because saved manifest not found. Starting full parse.
[0m19:58:29  Found 5 models, 4 data tests, 4 sources, 416 macros
[0m19:58:29  
[0m19:58:29  Concurrency: 1 threads (target='dev')
[0m19:58:29  
[0m19:58:30  1 of 5 START sql table model main.my_first_dbt_model ........................... [RUN]
[0m19:58:30  1 of 5 OK created sql table model main.my_first_dbt_model ...................... [[32mOK[0m in 0.07s]
[0m19:58:30  2 of 5 START sql view model main.stg_kaggle_survey ............................. [RUN]
[0m19:58:30  2 of 5 OK created sql view model main.stg_kaggle_survey ........................ [[32mOK[0m in 0.06s]
[0m19:58:30  3 of 5 START sql view model main.stg_paises .................................... [RUN]
[0m19:58:30  3 of 5 OK created sql view model main.stg_paises ............................... [[32mOK[0m in 0.04s]
[0m19:58:

## 6 - Data marts - dim_desenvolvedoras

Modelo final, contendo as tabelas de países e do kaggle conectadas

In [36]:
!mkdir /content/pipeline_mulheres_tecnologia/models/marts

In [37]:
%cd /content/pipeline_mulheres_tecnologia/models/marts

/content/pipeline_mulheres_tecnologia/models/marts


In [38]:
!touch dim_desenvolvedoras.sql

In [39]:
%%writefile /content/pipeline_mulheres_tecnologia/models/marts/dim_desenvolvedoras.sql

-- Configuração de materialização: vamos criar como uma tabela física.
{{config(materialized = 'table')}}

WITH stg_kaggle AS (
    SELECT * FROM {{ref('stg_kaggle_survey')}}
),
stg_paises AS (
    SELECT * FROM {{ref('stg_paises')}}
)

SELECT stg_kaggle.PAIS,
       stg_paises.CODIGO_PAIS,
       stg_paises.REGIAO,
       stg_kaggle.NIVEL_EDUCACIONAL,
       stg_kaggle.ANOS_PROGRAMANDO,
       stg_kaggle.CARGO_ATUAL,
       stg_kaggle.ANOS_USANDO_ML,
       stg_kaggle.SALARIO_ANUAL_USD,
       stg_kaggle.LINGUAGENS_USADAS,
       stg_kaggle.BANCOS_DE_DADOS_USADOS,
       stg_kaggle.FERRAMENTAS_BI_USADAS
FROM stg_kaggle
   LEFT JOIN stg_paises ON stg_kaggle.PAIS = stg_paises.NOME_PAIS

Overwriting /content/pipeline_mulheres_tecnologia/models/marts/dim_desenvolvedoras.sql


In [40]:
%cd /content/pipeline_mulheres_tecnologia

/content/pipeline_mulheres_tecnologia


In [41]:
!dbt run

[0m19:59:10  Running with dbt=1.10.15
[0m19:59:11  Registered adapter: sqlite=1.10.0
[0m19:59:12  Found 6 models, 4 data tests, 4 sources, 416 macros
[0m19:59:12  
[0m19:59:12  Concurrency: 1 threads (target='dev')
[0m19:59:12  
[0m19:59:12  1 of 6 START sql table model main.my_first_dbt_model ........................... [RUN]
[0m19:59:12  1 of 6 OK created sql table model main.my_first_dbt_model ...................... [[32mOK[0m in 0.11s]
[0m19:59:12  2 of 6 START sql view model main.stg_kaggle_survey ............................. [RUN]
[0m19:59:12  2 of 6 OK created sql view model main.stg_kaggle_survey ........................ [[32mOK[0m in 0.10s]
[0m19:59:12  3 of 6 START sql view model main.stg_paises .................................... [RUN]
[0m19:59:12  3 of 6 OK created sql view model main.stg_paises ............................... [[32mOK[0m in 0.07s]
[0m19:59:12  4 of 6 START sql view model main.stg_participantes ............................. [RUN]
[0m19:

## 7 - Orquestração do pipeline com prefect

Automatização das tarefas de ELT

In [None]:
# 1 Instalação dos pacotes
!pip install pandas prefect dbt-sqlite requests -q

# 2 Importação
import pandas as pd
import requests
import json
import sqlite3
from prefect import task, flow, get_run_logger
import subprocess
import os
import logging

# 3 Cpnfiguração do logger
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='pipeline.log',
    filemode='w'
)

logger = logging.getLogger()

print("Logging configurado com sucesso!")

# Definição das tarefas de extração e carga (EL) como @tasks

@task(retries=3, retry_delay_seconds=5)
def extrair_dados_kaggle(caminho_arquivo):
  try:
    logger.info(f"Inicando a extração do arquivo: {caminho_arquivo}")

    df = pd.read_csv(caminho_arquivo)

    logger.info(f"Extração do CSV concluída. {len(df)} linhas lidas e {len(df.columns)} colunas.")

    logger.info(f"Tipos de dados das colunas: \n{df.dtypes}")

    return df

  except FileNotFoundError:
    logger.error(f"Arquivo não encontrado: {caminho_arquivo}")
    return None
  except Exception as e:
    logger.error(f"Ocorreu um erro inesperado ao extrair os dados: {e}")
    return None

@task
def extrair_dados_sql(caminho_banco):
  try:
    logger.info(f"Inicando a extração do banco de dados: {caminho_banco}")

    conexao = sqlite3.connect(caminho_banco)

    query = "SELECT * FROM PARTICIPANTES"

    df = pd.read_sql_query(query, conexao)

    logger.info(f"Extração do banco de dados concluída com sucesso!")
    return df

  except sqlite3.Error as e:
    logger.error(f"Ocorreu um erro ao conectar ao banco de dados")
    return None
  except Exception as e:
    logger.error(f"Ocorreu um erro inesperado ao extrair os dados: {e}")
    return None

  finally:
    if conexao:
      conexao.close()
      logger.info("Conexão com o banco de dados fechada.")

@task
def extrair_categorias_habilidades_json(caminho_arquivo):
  try:
    logger.info(f"Iniciando a extração do arquivo JSON: {caminho_arquivo}")

    with open(caminho_arquivo, 'r') as arquivo:
      dados_json = json.load(arquivo)

    logger.info("Extração do JSON concluída com sucesso!")

    if dados_json is None:
      logger.warning("Dados JSON vazios. Nada a ser transformado")
    return None

    lista_categorias = []

    for categoria, habilidades in dados_json.items():
      for habilidade in habilidades:
        lista_categorias.append({"CATEGORIA": categoria, "HABILIDADE" : habilidade})

    df_categorias = pd.DataFrame(lista_categorias)

    logger.info("Transformação do JSON em DataFrame concluída com sucesso!")
    return df_categorias

  except FileNotFoundError:
    logger.error(f"Arquivo não encontrado: {caminho_arquivo}")
    return None
  except Exception as e:
    logger.error(f"Ocorreu um erro inesperado ao extrair os dados: {e}")
    return None

@task
def extrair_dados_paises_api(url_api):
  try:
    logger.info(f"Iniciando a requisição à API: {url_api}")

    resposta = requests.get(url_api)

    if resposta.status_code == 200:
      dados_json = resposta.json()
      logger.info(f"Dados da API extraídos com sucessos. {len(dados_json)} registros de países.")
      return dados_json
    else:
      logger.error(f"Falha na requisição à API. Código de status: {resposta.status_code}")
      return None

  except Exception as e:
      logger.error(f"Ocorreu um erro inesperado ao extrair os dados")
      return None

@task
def carregar_dados(df, nome_tabela, caminho_dw):
  if df is None:
    logger.warning("DataFrame vazio. Nada a ser carregado.")
    return

  try:
    logger.info(f"Iniciando o carregamento dos dados na tabela: {nome_tabela}")

    conexao_dw = sqlite3.connect(caminho_dw)

    df.to_sql(nome_tabela, conexao_dw, if_exists='replace', index=False)

    logger.info(f"Carga para a tabela '{nome_tabela}' concluída com sucesso!")

  except Exception as e:
      logger.error(f"Ocorreu um erro ao carregar os dados para a tabela '{nome_tabela}': {e}")

  finally:
    if conexao_dw:
      conexao_dw.close()
      logger.info("Conexão com o Data Warehouse fechada.")


# Tarefas de transformação (T) via dbt

@task
def executar_dbt_run():
    logger = get_run_logger()
    logger.info("Iniciando 'dbt run'...")
    try:
        subprocess.run(['dbt', 'run'], check=True, cwd='/content/pipeline_mulheres_tecnologia')
        logger.info("'dbt run' concluído com sucesso.")
    except subprocess.CalledProcessError as e:
        logger.error(f"Falha no 'dbt run': {e}")
        raise

# FLUXO PRINCIPAL (@flow)
# Orquestra a execução de todas as tarefas na ordem correta.

@flow(name="Pipeline ETL - Mulheres na Tecnologia")
def pipeline_principal():
    logger = get_run_logger()
    logger.info("### Iniciando Pipeline ELT ###")

    # Fase 1: Extração e Carga (EL)
    # Execução em parelelo
    df_kaggle = extrair_dados_kaggle('/content/kaggle_survey_2022.csv')
    df_participantes = extrair_dados_sql('/content/bootcamp.db') # Changed from bootcampBI.db to bootcamp.db
    df_paises = pd.json_normalize(extrair_dados_paises_api("https://restcountries.com/v3.1/all?fields=name,cca3,region"))
    dados_habilidades = extrair_categorias_habilidades_json("/content/habilidades_categorias.json")

    # Fase 1.2 As cargas dependem das extrações
    carga_kaggle = carregar_dados(df_kaggle, 'kaggle_survey', 'data_warehouse.db')
    carga_participantes = carregar_dados(df_participantes, 'participantes', 'data_warehouse.db')
    carga_paises = carregar_dados(df_paises, 'paises', 'data_warehouse.db')
    carga_habilidades = carregar_dados(dados_habilidades, 'categorias', 'data_warehouse.db')

    #  Fase 2: Transformação (T): só ocorre após o término da fase 1
    dbt_run = executar_dbt_run(wait_for=[carga_kaggle, carga_participantes, carga_paises, carga_habilidades])

    logger.info("PIPELINE DE ETL CONCLUÍDo COM SUCESSO")

# Execução
if __name__ == "__main__":
    pipeline_principal()