# **1. Configuração do Colab e preparação das fontes**

In [1]:
#instalação das bibliotecas
!pip install pandas -q  #-q para não poluir a tela
!pip install prefect -q
!pip install dbt-sqlite -q
!pip install requests -q

In [2]:
#importação das bibliotecas
import pandas as pd
import requests
import json
import sqlite3

print ("Ambiente configurado com sucesso!")


Ambiente configurado com sucesso!


In [3]:
# 1ª fonte CSV do Kaggle - pipeline para falar de mulheres na tecnologia. Já foi serpado colunas necessárias para fazer o projeto.
# Foi tratado para ter apenas mulheres, retirados colunas que não precisa e renomeados e renomeados os nomes das colunas

#trazendo o CSV do github
!wget -o kaglle_survey_2022.csv "https://raw.githubusercontent.com/paulalcssantos/Desafio-Pipeline-WoMakersCode/refs/heads/main/kaggle_survey_2022_mulheres_dados.csv"
#-o é para abrir o arquivo

In [4]:
#2ºbanco de dados sql
conn_bootcamp = sqlite3.connect("bootcampBI.db") #conexão
cursor_bootcamp = conn_bootcamp.cursor() #criação de cursos

#criação de BD - igual é criado no debever
cursor_bootcamp.execute('''
  CREATE TABLE IF NOT EXISTS PARTICIPANTES (
    ID_PARTICIPANTE INT,
    NOME VARCHAR(150),
    PAIS_ORIGEM VARCHAR(100)
  )
''')

#LISTA PARTICIPANTES - POPULAÇÃO
participantes = [
    (1,'Maria','Brazil'),
    (2, 'Luana', 'Portugual'),
    (3, 'Camila', 'Brazil'),
    (4, 'Luiza', 'Argentina'),
    (5, 'Silvia', 'Colombia'),
    (6, 'Paola', 'Brazil'),
    (7, 'Vitoria', 'Mexico'),
    (8, 'Caroline', 'Argeuntina'),
    (9, 'MArta', 'Portugual'),
    (10, 'Ana', 'Brazil'),
]

#INSERINDO OS VALORES NA TABELA
cursor_bootcamp.executemany("INSERT INTO PARTICIPANTES (ID_PARTICIPANTE, NOME, PAIS_ORIGEM) VALUES (?,?,?) ", participantes)
# executemany = execute varias vezes

#abre e fecha a conexão
conn_bootcamp.commit()
conn_bootcamp.close()

print("Banco de dados e tabela criados com sucesso ")



Banco de dados e tabela criados com sucesso 


In [5]:
%%writefile habilidades_categorias.json
{
"Ferramentas de Análise": ["Pyton", "R", "SQL"]
"Ferramentas de BI": ["Power BI", "Tableau","Looker"]
"Plataformas de Nuvem": ["AES","Google Cloud", "Microsoft Azure"]
}

Writing habilidades_categorias.json


In [6]:
#4º PREPARANDO O DATA WAREHOUSE

conn_datawarehouse = sqlite3.connect("datawarehouse.db")
conn_datawarehouse.close()

print ("Data Warehouse criado com sucesso!")

Data Warehouse criado com sucesso!


# **2. Extração e Carregamento - CSV E SQL**

Três Principais regras:

*  Modularidade

    Inclapsular as funções - ao invez de ter um script unico teremos varios blocos, organizando assim o código

*  Logging

    Não serão usados o print, usaremos a biblioteca de logging para criar um histórico do que deu certo e o que não deu certo.
    Isto faz com que o processo fique mais automatizado

*  Tratamento de Erros

   O que acontece se der algum erro de execução ou se algum arquivo não ler.
   Usaremos os **blocos de try e e excep** para poder registar os erros no log e ter o pipeline execute de uma forma mais controlada.

#**2.1 Extração de Dados**

In [7]:
import logging

In [8]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filename="pipeline.log",
    filemode="w"
)

#criação do objeto logger
logger = logging.getLogger()

print("Loggin configurado com sucesso!")

Loggin configurado com sucesso!


In [9]:
#Robo expecialista em extrair os arquivos do csv
def extrair_dados_kaglle(caminho_arquivo):
  try:
    logger.info(f"Iniciando a extração do arquivo: {caminho_arquivo}")

    #criação de dataframe no pandas
    df = pd.read_csv(caminho_arquivo)

    logger.info(f"Extração do CSV concluída. {len(df)} linhas lidas e {len(df.columns)} colunas.")

    logger.info(f"Tipo de dados das colunas: \n{df.dtypes}")

    return df

  except FileNotFoundError:
    logger.error(f"Arquivo não encontrado: {caminho_arquivo}")
    return None

  #Erro se o arquivo for encontrato, mas tiver erro na extração dos dados precisamos ter uma exessão também
  except Exception as e:
    logger.error(f"Erro durante a extração do CSV: {str(e)}")
    return None

#Teste da função
df_kaglle = extrair_dados_kaglle("kaglle_survey_2022.csv")

#retorna as 5 primeiras linhas
if df_kaglle is not None:
  display(df_kaglle.head())

Unnamed: 0,Unnamed: 1,Unnamed: 2,--2025-12-10 23:50:59-- https://raw.githubusercontent.com/paulalcssantos/Desafio-Pipeline-WoMakersCode/refs/heads/main/kaggle_survey_2022_mulheres_dados.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133,185.199.109.133,185.199.110.133,...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.,,,
HTTP request sent,awaiting response... 200 OK,,
Length: 103121 (101K) [text/plain],,,
Saving to: ‘kaggle_survey_2022_mulheres_dados.csv’,,,


In [10]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [11]:
# Robo especialista em extrair os arquivos do SQL
def extrair_dados_sql(caminho_banco):
    conexao = None
    try:
        logger.info(f"Iniciando a extração do banco de dados: {caminho_banco}")

        # conexão com o BD
        conexao = sqlite3.connect(caminho_banco)

        query = "SELECT * FROM PARTICIPANTES"

        # leitura do SQL
        df = pd.read_sql_query(query, conexao)

        logger.info("Extração do banco de dados concluída com sucesso!")
        return df

    except sqlite3.Error as e:
        logger.error(f"Erro ao conectar ou executar a consulta SQL: {str(e)}")
        return None

    except Exception as e:
        logger.error(f"Ocorreu um erro inesperado ao extrair os dados: {str(e)}")
        return None

    #bloco finally: boas praticas, para garantir que a conexão do banco sempre seja fechada é informada independente do try
    finally:
        if conexao:
            conexao.close()
            logger.info("Conexão com o banco de dados fechada.")

# TESTE DA FUNÇÃO
df_participantes = extrair_dados_sql("bootcampBI.db")

# Print as 5 primeiras linhas
if df_participantes is not None:
    display(df_participantes.head())



Unnamed: 0,ID_PARTICIPANTE,NOME,PAIS_ORIGEM
0,1,Maria,Brazil
1,2,Luana,Portugual
2,3,Camila,Brazil
3,4,Luiza,Argentina
4,5,Silvia,Colombia


#**2.2 Função de Carregamento de Dados**

Acabamos a etapa de extração

Agora vamos carregar os dados que acabamos de extrair e coloca-los no nosso data warehouse

In [12]:
def carregar_dados(df, nome_tabela, caminho_dw):

  #verificar de o data frame está vazio
  #se sim é por que a carga não foi realizada
    if df is None:
        logger.warning("DataFrame está vazio. Nenhum dado será carregado.")
        return

    try:
        logger.info(f"Iniciando o carregamento dos dados na tabela: {nome_tabela}")

        conexao_dw = sqlite3.connect(caminho_dw)

        #para salvar o bd no data frame usaremos o metodo to sql - tranforma a tabela em um dataframe
        df.to_sql(nome_tabela, conexao_dw, if_exists="replace", index=False)

        logger.info(f"Carga para a tabela '{nome_tabela}' concluída com sucesso!")

    except Exception as e:
        logger.error(f"Ocorreu um erro ao carregar os dados na tabela '{nome_tabela}': {str(e)}")

    finally:
        if conexao_dw:
            conexao_dw.close()
            logger.info("Conexão com o data warehouse fechada.")



In [13]:
#Executa a carga dos DAtaFrames extraídos
carregar_dados(df_kaglle, "kaggle_survey", "datawarehouse.db")
carregar_dados(df_participantes, "participantes", "datawarehouse.db")

# **3.Extração e Carregamento de API e JSON**

#**3.1 Extração de dados API**

**API - Application Program Interface**

*   É interface de um conjunto de aplicações, protocolos;
*   Permite que diversas aplicações trocem informações de forma segura e eficiente.

    Analogia: cliente -> garson -> cozinha
    Analogia: leva a requisição para um sistema -> porocessa (API- é a ponte) -> leva de volta para o sistema que fez a rerquisição
*   API usado do REST Countries

In [14]:
#Criando função
def extrair_dados_api(url_api):
  try:
    logger.info(f" Iniciando à requisição à API. {url_api}")

    resposta = requests.get(url_api)

    # verifica se o garson (API) trouxe a API, ou seja se a requisção deu certo.
    # o retorno sempre será numérico. Se for = a 200 querr dizer que a requisão deu certo
    # retorno = 404 erro, requisição não foi bem sucedida
    if resposta.status_code == 200:
      dados_json = resposta.json() #lista de dicionario caso a requisição funcione
      logger.info(f"Dados API extraídos com sucesso! {len(dados_json)} registros de países")
      return dados_json
    else:
      logger.error(f"Falha na requisição à API. Código de status: {resposta.status_code}")
      return None

  except Exception as e:
    logger.error(f"Ocorreu um erro ao extrair dados da API: {str(e)}")

url_paises = "https://restcountries.com/v3.1/all?fields=name,cca3,region"

#Teste da função
dados_paises = extrair_dados_api(url_paises)

#transformando a lista de Json no dataframe
if dados_paises is not None:
  df_paises = pd.json_normalize(dados_paises)
  display(df_paises.head())


Unnamed: 0,cca3,region,name.common,name.official,name.nativeName.eng.official,name.nativeName.eng.common,name.nativeName.dzo.official,name.nativeName.dzo.common,name.nativeName.ita.official,name.nativeName.ita.common,...,name.nativeName.ind.official,name.nativeName.ind.common,name.nativeName.fil.official,name.nativeName.fil.common,name.nativeName.hun.official,name.nativeName.hun.common,name.nativeName.nep.official,name.nativeName.nep.common,name.nativeName.khm.official,name.nativeName.khm.common
0,ATG,Americas,Antigua and Barbuda,Antigua and Barbuda,Antigua and Barbuda,Antigua and Barbuda,,,,,...,,,,,,,,,,
1,BTN,Asia,Bhutan,Kingdom of Bhutan,,,འབྲུག་རྒྱལ་ཁབ་,འབྲུག་ཡུལ་,,,...,,,,,,,,,,
2,ITA,Europe,Italy,Italian Republic,,,,,Repubblica italiana,Italia,...,,,,,,,,,,
3,TUV,Oceania,Tuvalu,Tuvalu,Tuvalu,Tuvalu,,,,,...,,,,,,,,,,
4,AIA,Americas,Anguilla,Anguilla,Anguilla,Anguilla,,,,,...,,,,,,,,,,


#**3.2 Extração de dados JASON**

**FUNÇÃO JASON**

Última função - último robo
Ele vai tratar dados semi estruturados. Simulando um arquivo sql.
Iremos ler o arquivo habilidades json, no qual simula um aquivo de BD no SQL, onde a estrutura dele é um pouco mais fexivel.
Iremos estão fazer um robo para lidar com este tipo de dados

In [15]:
#Criando função
def extrair_categorias_habilidades_json(caminho_arquivo):
  try:
    logger.info(f"Iniciando a extração dos dados do arquivo JSON: {caminho_arquivo}")

    #open garante que ele feche automaticamente no final
    with open(caminho_arquivo, "r") as arquivo:
      #load = le o conteudo do arquivo e tr4ansforma em uma lista ou biblioteca no payton
      dados_json = json.load(arquivo)

    logger.info(f"Extração do JSON concluída com sucesso!")
    return dados_json

  except FileNotFoundError:
    logger.error(f"Arquivo não encontrado: {caminho_arquivo}")
    return None

  except Exception as e:
    logger.error(f"Ocorreu um erro ao extrair os dados do JSON: {str(e)}")
    return None

#teste da função
dados_habilidades = extrair_categorias_habilidades_json("/content/habilidades_categorias.json")

if dados_habilidades is not None:
  df_habilidades = pd.json_normalize(dados_habilidades)
  display(df_habilidades.head())


ERROR:root:Ocorreu um erro ao extrair os dados do JSON: Expecting ',' delimiter: line 3 column 1 (char 50)


In [16]:
#tecnica para trasnformar os dados anteriores em tablea
def transformar_json_em_df(dados_json):
  if dados_json is None:
    logger.warning("Dados JSON estão vazios. Nenhum dado será transformado.")
    return None

  #se não tiver vazio, iremos converter o dicionario em tupla
  lista_categorias = []

  for categoria, habilidades in dados_json.items():
    for habilidade in habilidades:
      lista_categorias.append({"CATEGORIA": categoria, "HABILIDADE": habilidade})

  df_categorias = pd.DataFrame(lista_categorias)

  logger.info("Transformação do JSON em DataFrame concluída com sucesso!")

  return df_categorias

#teste da função
df_categorias = transformar_json_em_df(dados_habilidades)

if df_categorias is not None:
  display(df_categorias.head())



# **3.3 Carregamento**

In [17]:
#carregamento para datawharehouse
#podemos reaproveitar a função carregar_dados que foi usado anteriormente
carregar_dados(df_paises, "paises", "datawarehouse.db")
carregar_dados(df_categorias, "categorias", "datawarehouse.db")





#**4. Transformação com dbt**

É uma ferramente de linha de comando.

O arquivo profiles.yml fica oculto

In [18]:
!dbt init pipeline_mulheres_na_tecnologia

[0m23:51:28  Running with dbt=1.10.15
[0m23:51:28  Creating dbt configuration folder at /root/.dbt
[0m23:51:28  
Your new dbt project "pipeline_mulheres_na_tecnologia" was created!

For more information on how to configure the profiles.yml file,
please consult the dbt documentation here:

  https://docs.getdbt.com/docs/configure-your-profile

One more thing:

Need help? Don't hesitate to reach out to us via GitHub issues or on Slack:

  https://community.getdbt.com/

Happy modeling!

[0m23:51:28  Setting up your profile.
Which database would you like to use?
[1] sqlite

(Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)

Enter a number: 1
[0m23:52:53  Profile pipeline_mulheres_na_tecnologia written to /root/.dbt/profiles.yml using target's sample configuration. Once updated, you'll be able to start developing with dbt.


biblioteca yaml:
Configurando o arquivo oculto

linguagem de formatação usada nos arquivos de formatação do dbt

biblioteca os

In [19]:
import yaml
import os

In [20]:
# configurar caminho ABSOLUTO do banco
caminho_banco = "/content/pipeline_mulheres_na_tecnologia/datawarehouse.db"

#dicionario em payton que representa a estrutura do arquivo oculto
#mais facil e seguro
#configurações do dbt - perfil
profiles_config = {
    #nome utilizado no perfil. Deve ser igual ao nome de estrutura de pastas que foi dado anteriormente
    "pipeline_mulheres_na_tecnologia": {
        #define o tipo de desenvolviemnto utilizado (desenvolviemnto ou produção)
        "target": "dev",
        #exceção de configuração
        "outputs": {
            "dev": {
                #configuração mais importante é o tipo, no nosso caso é o sqlite
                "type": "sqlite",
               #denine a quantidade de tgarefas que se pode execultar pçor trás. O SQLITe só execulta 1
                "threads": 1,

                # Parâmetro para versões mais recentes
                "schemas_and_tables": {
                    "main": "/content/pipeline_mulheres_na_tecnologia/datawarehouse.db"
                },

                # Parâmetros para versões anteriores
                "database": "/content/pipeline_mulheres_na_tecnologia",
                "schema": "main",

                # Parâmetro opcional
                "schema_directory": "."
            }
        }
    }
}
# Criar pasta do dbt
dbt_profile_dir = os.path.expanduser("~/.dbt")
os.makedirs(dbt_profile_dir, exist_ok=True)

# Caminho do profiles.yml
profiles_path = os.path.join(dbt_profile_dir, "profiles.yml")

#converte o dicionario criado em arquivo yaml
with open(profiles_path, "w") as f:
    yaml.dump(profiles_config, f)

In [21]:
#verifica se a conexão com, o dbt está funcionando
%cd /content/pipeline_mulheres_na_tecnologia

/content/pipeline_mulheres_na_tecnologia


# **5. Criando Modelos de Staging com dbt**

Staging = area de preparação
Analogia = na cozinha seria a etapa de lavar os alimentos, prepara-los para cozinhar.

Isso significa:

*   Renomear colunas para um padrão limpo e consistente.
*   Converter os tipos de dados para os formatos corretos (ex: texto para número, texto para data).
*   Fazer castings e limpezas simples.


O objetivo da camada de staging NÃO é juntar tabelas ou aplicar regras de negócio complexas.

É apenas criar uma base limpa e confiável a partir de cada fonte bruta.

//

Fez o download do sata_warehouse,db e abriu no debver para visualizar como estão as tabelas.


Temos então as seguintes tabelas:
*   Categorias;
*   Kaggle_survey
*   Paises
*   Participantes

//

**ETAPAS FEITAS ANTES DO CÓDIGO**

1.   Criação de um arquivo de stagig
      Na pasta: pipline_mulheres_na_tecnologia -> models -> cria pasta com nomE "*staging*"
2.   Dentro da pasta staging será criado um arquivo de source.yml
     Vai funcionar para criar um diconário de dados e para textar a qualidade das fontes de dados   
     Funciona como se fosse um mapa


  OU SE PODE CRIAR VIA CODIGO

In [22]:
#CODIGO PARA CRIAR AS PASTAS

import os

# Definir caminho da pasta staging
pasta_staging = "/content/pipeline_mulheres_na_tecnologia/models/staging"

# Criar a pasta (caso não exista)
os.makedirs(pasta_staging, exist_ok=True)

# Caminho completo do arquivo a ser criado
arquivo_yml = os.path.join(pasta_staging, "source.yml")

# Criar arquivo vazio
with open(arquivo_yml, "w") as f:
    pass

print("Arquivo criado com sucesso em:", arquivo_yml)

Arquivo criado com sucesso em: /content/pipeline_mulheres_na_tecnologia/models/staging/source.yml


In [23]:
%%writefile /content/pipeline_mulheres_na_tecnologia/models/staging/source.yml
version: 2

#passando o conjunto de fontes. Onde estão as tabelas
#funciona como um mapa
sources:
  - name: dados_brutos
    # Onde estão os dados? No data warehouse
    database: "/content/pipeline_mulheres_na_tecnologia/datawarehouse.db"
    schema: main

    tables:
      - name: kaggle_survey
        description: "Dados brutos da pesquisa Kaggle, filtrado por mulheres na área de dados"

      - name: participantes
        description: "Dados das participantes do bootcamp"

      - name: paises
        description: "Dados dos países extraídos da API REST Countries"

      - name: categorias
        description: "Mapeamento das habilidades e categorias, extraído de arquivo JSON"

Overwriting /content/pipeline_mulheres_na_tecnologia/models/staging/source.yml


# **5.1 Staging para Kaggle Survey**

Criação do 1º modelo de staging para fazer o tratamento da 1ª coluna de salario

Criamos outro arquivo na pasta staging com o nome da tabela: stg_kaggle_survey.sql

In [24]:
#CODIDO PARA CRIAR ARQUIVO DENTRO DA PASTA STAGING

# Caminho completo do arquivo SQL
arquivo_sql = os.path.join(pasta_staging, "stg_kaggle_survey.sql")

# Criar arquivo vazio
with open(arquivo_sql, "w") as f:
    pass

print("Arquivo criado com sucesso em:", arquivo_sql)

Arquivo criado com sucesso em: /content/pipeline_mulheres_na_tecnologia/models/staging/stg_kaggle_survey.sql


In [25]:
#sobrescerver o arquivo que acabamos de criar:
%%writefile /content/pipeline_mulheres_na_tecnologia/models/staging/stg_kaggle_survey.sql

-- sobrescrever o arquivo que acabamos de criar
-- consulta SQL com as colunas que precisamos para fazermos a análise

SELECT
    PAIS,
    NIVEL_EDUCACIONAL,
    ANOS_PROGRAMANDO,
    CARGO_ATUAL,
    ANOS_USANDO_ML,

        -- Transformação da coluna de salário (tratando casos com '-' e strings vazias)
    CASE
        -- Se tiver "-" no meio, pegar apenas o primeiro número
        WHEN SALARIO_ANUAL_USD LIKE '%-%' THEN
            CAST(
                REPLACE(
                    SUBSTR(SALARIO_ANUAL_USD, 1, INSTR(SALARIO_ANUAL_USD, '-') - 1),
                ',', '') AS REAL
            )

        -- Se não tiver "-", converter direto
        WHEN SALARIO_ANUAL_USD IS NOT NULL THEN
            CAST(REPLACE(SALARIO_ANUAL_USD, ',', '') AS REAL)
        ELSE NULL
    END AS SALARIO_ANUAL_USD,

    LINGUAGENS_USADAS,
    BANCOS_DE_DADOS_UTILIZADOS,
    FERRAMENTAS_BI_USADOS

FROM {{ source('dados_brutos', 'kaggle_survey') }};

Overwriting /content/pipeline_mulheres_na_tecnologia/models/staging/stg_kaggle_survey.sql


In [26]:
!dbt run


[0m23:52:59  Running with dbt=1.10.15
[0m23:52:59  Encountered an error:
Runtime Error
  Credentials in profile "pipeline_mulheres_na_tecnologia", target "dev" invalid: 'schemas_and_paths' is a required property


# **5.2 Staging para participantes:**

O IDEAL AQUI É FAZER O MESMO COM O TRATAMENTO DAS OUTRAS CELULAS

participantes
habilidades e categorias

Dentro da pasta STAGING criou o arquivo
*   stg_participantes.sql
*   stg_paises.sql



In [27]:
#CODIDO PARA CRIAR ARQUIVO DENTRO DA PASTA STAGING

# Caminho completo do arquivo SQL
arquivo_sql = os.path.join(pasta_staging, "stg_participantes.sql")

# Criar arquivo vazio
with open(arquivo_sql, "w") as f:
    pass

print("Arquivo criado com sucesso em:", arquivo_sql)

Arquivo criado com sucesso em: /content/pipeline_mulheres_na_tecnologia/models/staging/stg_participantes.sql


In [28]:
%%writefile /content/pipeline_mulheres_na_tecnologia/models/staging/stg_participantes.sql

SELECT  ID_PARTICIPANTE,
        NOME_COMPLETO,
        PAIS_ORIGEM
FROM {{ source('dados_brutos', 'participantes') }};


Overwriting /content/pipeline_mulheres_na_tecnologia/models/staging/stg_participantes.sql


# **5.3 Staging para países:**

Dentro da pasta STAGING criou o arquivo
*   stg_paises.sql

In [29]:
#CODIDO PARA CRIAR ARQUIVO DENTRO DA PASTA STAGING

# Caminho completo do arquivo SQL
arquivo_sql = os.path.join(pasta_staging, "stg_paises.sql")

# Criar arquivo vazio
with open(arquivo_sql, "w") as f:
    pass

print("Arquivo criado com sucesso em:", arquivo_sql)

Arquivo criado com sucesso em: /content/pipeline_mulheres_na_tecnologia/models/staging/stg_paises.sql


In [30]:
%%writefile /content/pipeline_mulheres_na_tecnologia/models/staging/stg_paises.sql

#renomear as colinas
SELECT "name.common"    AS PAIS,
       cca3             AS CODIGO_PAIS,
       region           AS REGIAO
FROM {{ source('dados_brutos', 'paises') }}

Overwriting /content/pipeline_mulheres_na_tecnologia/models/staging/stg_paises.sql


# **6. Criando Modelos Finais (*Marts*) com dbt**

**Camada de Marts**:

Tabelas finais de fato para consumir os dados

Associação: prato pronto parfa ser servido

Será aplicado a regra de negocio

Será aplicado o joins

Será criado as visões que repsonderam as perguntas do projeto.

**Dimenssões**: um termo de modelagem que representa uma tabela que descreve 'quem, o quê, onde e quando'

Criando uma pasta para ser uma tabela dimenssão para responder as perguntas.

Pasta de model -> criar uma nova pasta para o marts

dentro da pasta criar um arquivo dim_desenvolvedoras.sql

In [31]:
#CODIDO PARA CRIAR AS PASTAS

import os

# Caminho da nova subpasta
pasta_marts = "/content/pipeline_mulheres_na_tecnologia/models/marts"

# Criar a pasta (caso não exista)
os.makedirs(pasta_marts, exist_ok=True)

# Caminho completo do arquivo SQL
arquivo_sql = os.path.join(pasta_marts, "dim_desenvolvedoras.sql")

# Criar arquivo vazio
with open(arquivo_sql, "w") as f:
    pass

print("Arquivo criado com sucesso em:", arquivo_sql)

Arquivo criado com sucesso em: /content/pipeline_mulheres_na_tecnologia/models/marts/dim_desenvolvedoras.sql


In [32]:
%%writefile /content/pipeline_mulheres_na_tecnologia/models/marts/dim_desenvolvedoras.sql

#se não colocar será configurado como bio
{{config(materialized='table')}}

WITH stg_kaggle AS (
  SELECT *FROM {{ref('stg_kaggle_survey'),}}
),
stg_paises AS (
  SELECT *FROM {{ref('stg_paises'),}}
)

SELECT  stg_kaggle.PAIS,
        stg_paises.CODIGO_PAIS,
        stg_paises.REGIAO
        stg_kaggle.NIVEL_EDUCACIONAL,
        stg_kaggle.ANOS_PROGRAMANDO,
        stg_kaggle.CARGO_ATUAL,
        stg_kaggle.ANOS_USANDO_ML,
        stg_kaggle.SALARIO_ANUAL_USD,
        stg_kaggle.LINGUAGENS_USADAS,
        stg_kaggle.BANCOS_DE_DADOS_UTILIZADOS
        stg_kaggle.FERRAMENTAS_BI_USADOS,
FROM stg_kaggle
  #junção com a tabela temporária.
  LEFT JOIN stg_paises ON stg_kaggle.PAIS = stg_paises.NOME_PAIS

Overwriting /content/pipeline_mulheres_na_tecnologia/models/marts/dim_desenvolvedoras.sql


In [33]:
#texte para confirmar se as alterações deram certo
!dbt run

[0m23:53:06  Running with dbt=1.10.15
[0m23:53:06  Encountered an error:
Runtime Error
  Credentials in profile "pipeline_mulheres_na_tecnologia", target "dev" invalid: 'schemas_and_paths' is a required property


# **7. Orquestração e Monitoramento do Pipeline com Prefect**

# **7.1 Orquestração com Prefect**

Atá agora montamos o paipline, extraimos e fizemos o carregamento. Desta forma, temos até agora scripts isolados (kaggole, json, transformação com o dbt, o data warehause)

A **orquestração** é juntar todas as peças para que possamos juntar o pipline e execultar tudo junto. Ou seja, a orquestração, ordena todas as tarefas que acontecem individualmente e execulta o pipline de uma forma definida e automática.

ORDENA PARA QUE OCORRA A EXECULÇÃO NO MOMENTO CERTO DE FORMA ORGANIZADA.

RESPONSABILIDADES:
1.   Gerenciar as dependências:
      garante que a tarefa de transformação (dbt) só começe depois que todas as tarefas de extração e carga terminarem com sucesso
2.   Otimizar o fluxo (Parelelismo): analisar se as tarefas dependem uma da outra e execultar o que não tiver relacionado em paralelo
2.   Monitorar o Processo (O Painel de Controle):
      painel de controle (os logs) onde podemoss ver o status de cada tarefa
2.   Lidar com falhas:
      o codigo tenta execultar até 3 vezes (bos definimos a quantidade de vezes)


Filosofia do Prefect é natural para quem programa em Payton. Ela possui dois conceitos principais que usamos como decoradores:
*   **@task**: Qualquer função Python que executa uma unidade de trabalho (como extrair um CSV ou rodar um comando dbt) pode ser transformada em uma tarefa monitorável apenas adicionando @task em cima dela.

      Temos varias funções execultadas separadas e podemos transforma-las em tarefas apenas adicionando o comando @task

*   **@flow** (Fluxo): É uma função Python especial que define a ordem em que as tarefas (@task) são executadas. É aqui que dizemos: “Execute a extração primeiro, e SÓ DEPOIS, execute a transformação"Item da lista

     Aqui conseguimos definir a ordem das tarefas.

In [34]:
#Célula completa: definição e execução do pipline orquestrado

# 1 Instalações
!pip install pandas prefect dbt-sqlite requests -q

# 2 - Imports
import pandas as pd
import requests
import json
import sqlite3
from prefect import task, flow, get_run_logger
import subprocess
import os
import logging


logging.basicConfig(
    level=logging. INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='pipeline.log',
    filemode='w'
)

logger = logging.getLogger()

print("Logging configurado com sucesso!")



Logging configurado com sucesso!


In [35]:
# TAREFAS DE EXTRAÇÃO E CARGA (EL)
# Cada função de extração e carga que já construnimos agora é uma @task do Prefect.

@task(retries=3, retry_delay_seconds=5)
def extrair_dados_kaggle(caminho_arquivo):
  try:
    logger.info(f"Inicando a extração do arquivo: {caminho_arquivo}")
    df = pd.read_csv(caminho_arquivo)
    logger.info(f"Extração do CSV concluída. {len(df)} linhas lidas e {len()} colunas.")
    logger.info(f"Tipos de dados das colunas: \n{df.dtypes}")
    return df
  except FileNotFoundError:
    logger.error(f"Arquivo não encontrado: {caminho_arquivo}")
    return None

@task
def extrair_dados_sql(caminho_banco):
  try:
    logger.info(f"Iniciando a extração do bando de dados:{caminho_banco}")
    conexao = sqlite3.connect(caminho_banco)
    query = "SELECT * FROM PARTICIPANTES"
    df = pd.read_sql_query(query, conexao)

    logger.info(f"extração do banco de dados concluída com sucesso!")
    return df

  except sqlite3.Error as e:
    logger.error(f"Ocorreu um erro ao conectar ao banco de dados")
    return None

  except Exception as e:
    logger.error(f"Ocorreu um erro inesperado ao extrair os dados: {e}")
    return None

  finally:
    if conexao:
      conexao.close()
      logger.info("Conexão ao banco de dados fechada.")

  @task
  def extrair_categorias_habilidades_json(caminho_arquivo):
    try:
      logger.info(f"Iniciando a extração do arquivo JSON:{caminho_arquivo}")

      with open(caminho_arquivo, 'r') as arquivo:
        dados_json = json.load(arquivo)

      logger.info(f"Extração do arquivo JSON concluída com sucesso!")

      if dados_json is None:
        logger.warning(f"Dados JSON vazios. Nada a ser transformado.")
        return None

      lista_categorias = []

      for categtoria, habilidades in dados_json.items():
        for habilidade in habilidades:
          lista_categorias.append({"CATEGORIA": categoria, "HABILIDADE": habilidade})

      df_categorias = pd.DataFrame(lista_categorias)

      logger.info(f"Transformação do arquivo JSON em DataFrame concluída com sucesso!")
      return df_categoria

    except FileNotFoundError:
      logger.error(f"Arquivo não encontrado: {caminho_arquivo}")
      return None

    except Exception as e:
      logger.error(f"Ocorreu um erro inesperado ao extrair os dados: {e}")
      return None

  @task
  def carregar_dados_spaises_api(url_api):
    try:
      logger.info(f"Iniciando a requisição à API: {url_api}")
      resposta = requests.get(url_api)

      if resposta.status_code == 200:
        dados_json = resposta.json()
        logger.info(f"Dados da API extraidos com sucesso. {len(dados_json)} registgros de países.")
        return dados_json
      else:
        logger.error(f"Falha na requisição à API. Código de status: {resposta.status_code}")
        return None
    except Exception as e:
      logger.error(f"Ocorreu um erro inesperado ao extrair os dados.")
      return None

@task
def carregar_dados(df, nome_tabela, caminho_dw):
  if df is None:
    logger.warning(f"DataFrame vazio. Nada a ser carregado.")
    return

  try:
    logger.info(f"Iniciando o carregamento dos dados na tabela {nome_tabela}")
    conexao_dw = sqlite3.connect(caminho_dw)
    df.to_sql(nome_tabela, conexao_dw, if_exists='replace', index=False)
    logger.info(f"Carga para a tabela {nome_tabela} concluído com sucesso!")
  except Exception as e:
    logger.error(f"Ocorreu um erro ao carregar os dados na tabela {nome_tabela}: {e}")
  finally:
    if conexao_dw:
      conexao_dw.close()
      logger.info("Conexão ao data warehouse fechada.")

In [36]:
# TAREFAS DE TRASNFORMAÇÃO (T)
# Tarefas que excecultam os comandos do dbt via terminal

@task
def executar_dbt_run():

    logger = get_run_logger()
    logger.info("Iniciando 'dbt run'...")

    try:
        resultado = subprocess.run(
            ["dbt", "run"],
            cwd="/content/pipeline_mulheres_na_tecnologia",
            capture_output=True,
            text=True,
            check=True
        )

        logger.info("dbt run finalizado com sucesso!")
        logger.info(resultado.stdout)
        return True

    except subprocess.CalledProcessError as e:
        logger.error("Erro ao executar DBT:")
        logger.error(e.stderr)
        return False

    except Exception as e:
        logger.error(f"Erro inesperado durante o DBT: {e}")
        return False

In [37]:
# O FLUXO PRINCIPAL (@flow)
# Orquestra a execução de todas as tarefas na ordem correta.",

@flow(name="Pipeline ETL - Mulheres na Tecnologia")
def pipeline_principal():
    logger = get_run_logger()
    logger.info("INICIANDO O PIPELINE DE ETL")

# Fase 1: extração e Carga (EL)
# O Prefect executa estas tarefas em paralelo
df_kaglle = extrair_dados_kaglle('/content/pipeline_mulheres_na_tecnologia/kaglle_survey_2022.csv')
df_participantes = extrair_dados_sql('/content/pipeline_mulheres_na_tecnologia/bootcampBI.db')
df_paises = pd.json_normalize(
    carregar_dados_spaises_api("https://restcountries.com/v3.1/all?fields=name,cca3,region")
    )
dados_habilidades = extrair_categorias_habilidades_json("/content/pipeline_mulheres_na_tecnologia/habilidades_categorias.json")


# As cargas dependem das extrações (ddependência implícita)
carga_kaggle = carregar_dados(df_kaglle, 'kaglle_survey', 'data_warehouse.db')
carga_participantes = carregar_dados(df_participantes, 'participantes', 'data_warehouse.db')
carga_paises = carregar_dados(df_paises, 'paises', 'data_warehouse.db')
carga_habilidades = carregar_dados(dados_habilidades, 'categorias', 'data_warehouse.db')

# Fase 2: Transformação(T)
# A transformação com dbt só pode começar depois que TODAS as cargas terminarem.
# Usamos 'wait_for' para criar essa dependência explícita.
dbt_run = executar_dbt_run(
    wait_for=[carga_kaggle, carga_participantes, carga_paises, carga_habilidades])

logger.info("PIPELINE DE ETL CONCLUÍDO COM SUCESSO!")

# EXECUÇÃO DO FLUXO
if __name__ == "__main__":
  pipeline_principal()

ERROR:root:Arquivo não encontrado: /content/pipeline_mulheres_na_tecnologia/kaglle_survey_2022.csv
INFO:prefect:Starting temporary server on http://127.0.0.1:8934
See https://docs.prefect.io/v3/concepts/server#how-to-guides for more information on running a dedicated Prefect server.


RuntimeError: Timed out while attempting to connect to ephemeral Prefect API server.