# **MVP — Emissão de CO2  **

**Autor:** _YURI BARBOSA DOS SANTOS RIBEIRO_  

**Data:** 17/11/2025

**Matrícula:** 4052025001302


**Dataset**: [CO2 Emission by countries Year wise (1750-2022)](https://www.kaggle.com/datasets/moazzimalibhatti/co2-emission-by-countries-year-wise-17502022)

## 1 - Coleta de dados


Nesta etapa, foi escolhida uma base de dados pública, vinda do kaggle, buscando analisar a evolução histórica das emissões de CO₂ por país ao longo do tempo, identificando os principais emissores, tendências por década e outras análises.

O dataset escolhido foi:

CO2 Emission by Countries Year wise (1750–2022)

Autor: moazzimalibhatti

Link: https://www.kaggle.com/datasets/moazzimalibhatti/co2-emission-by-countries-year-wise-17502022

Essa base contém registros anuais de emissões de CO₂ por país, desde 1750 até 2022, permitindo análises temporais, comparações entre países e agregações por década, por exemplo.

### 1.1 Criação do database e metadados da origem

In [0]:
%sql

CREATE DATABASE IF NOT EXISTS mvp_co2;  -- Cria o banco de dados "mvp_co2" caso não exista
USE mvp_co2;                            -- Define "mvp_co2" como o database ativo (contexto)


In [0]:
# %python
from pyspark.sql import Row                          # Importa Row para criar registros "manuais" em DataFrame Spark

spark.sql("USE mvp_co2")                             # Define o database ativo no Spark (mesmo efeito do USE no SQL)

meta = [                                             # Cria uma lista com um registro de metadados
    Row(                                             # Cria uma linha com campos nomeados
        fonte="Kaggle – CO2 Emission by Countries Year Wise (1750–2022)",  # Nome descritivo da fonte
        url="https://www.kaggle.com/datasets/moazzimalibhatti/co2-emission-by-countries-year-wise-17502022",  # Link do dataset
        descricao="Emissões anuais de CO₂ por país entre 1750 e 2022.",    # Descrição do conteúdo
        licenca="Uso acadêmico, conforme termos da Kaggle e da fonte original.", # Nota sobre licença/uso
    )
]

df_meta = spark.createDataFrame(meta)                # Converte a lista de Row em um DataFrame Spark
df_meta.write.mode("overwrite").saveAsTable("meta_origem_dados")  # Salva (sobrescrevendo) como tabela no catálogo

display(df_meta)                                     # Exibe o DataFrame no notebook para conferência


# 2 - Coleta

Uma vez definido o conjunto de dados, foi necessário coletá-lo e armazená-lo na nuvem, dentro do ambiente Databricks. Optou-se por utilizar a biblioteca kagglehub, que faz o download autorizado dos arquivos diretamente da infraestrutura da Kaggle.

In [0]:
# %python

%pip install kagglehub                             # Instala a biblioteca KaggleHub
import kagglehub                                   # Importa a biblioteca KaggleHub
import pandas as pd                                # Importa a biblioteca Pandas com pd
import os, glob                                    # Manipulação de caminhos do sistema (os) e busca arquivos por padrão (glob)
import re                                          # Importa biblioteca de expressões regulares

spark.sql("USE mvp_co2")                           # Garante que o database do MVP está ativo

def clean_column(col: str) -> str:                 # Define função para padronizar nomes de colunas

    return re.sub(r'[ ,;{}()\n\t=]', '_', col)     # Substitui caracteres inválidos por "_" 


def etl_bronze_from_kaggle():                      # Define função etl bronze direto do kaggle

    path = kagglehub.dataset_download(             # Baixa o dataset e retorna o diretório onde foi salvo
        "moazzimalibhatti/co2-emission-by-countries-year-wise-17502022"
    )

    # Localiza o arquivo CSV
    csv_files = glob.glob(os.path.join(path, "*.csv"))
    csv_file = csv_files[0]

    # Lê o CSV com pandas
    pdf = pd.read_csv(csv_file, encoding="latin1")

    # Padroniza nomes de colunas
    pdf.columns = [clean_column(c) for c in pdf.columns]

    # Converte para Spark DataFrame
    df_bronze = spark.createDataFrame(pdf)

    # Carga na camada BRONZE
    df_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze_co2_emission")

    return df_bronze


# Executa a etapa de coleta/bronze e mostra as 10 primeiras linhas
df_bronze = etl_bronze_from_kaggle()
display(df_bronze.limit(10))


## 3 - Modelagem

Nessa estapa foi construído um modelo de dados em esquema de estrela, normalmente utilizado em Data Warehouse, por facilitar consultas analíticas, permitir agregações eficientes por país, ano e década, organizar dados em dimensões e fato, etc.

O modelo é composto por:

- Tabela fato (fact_co2_emissions): contém as medidas de emissões de CO₂ (co2_emission), por país e ano, além da década.

- Dimensão de país (dim_country): uma lista única de países, com chave surrogate country_id.

- Dimensão de ano (dim_year): possui ano como chave year_key e a década correspondente.

Além disso, foi construído um Catálogo de Dados, contendo, no mínimo:

- nome da tabela,

- nome da coluna,

- tipo de dado,

- valores mínimos e máximos para campos numéricos,

- categorias possíveis (quando a cardinalidade é baixa).

### 3.1 - camada SILVER (limpeza) e modelo estrela (dimensões + fato)

In [0]:
# %python

from pyspark.sql.functions import col, monotonically_increasing_id # Importa col() para referenciar colunas e gerar IDs únicos automaticamente

spark.sql("USE mvp_co2")                              # Garante database ativo no Spark

def etl_silver():                                     # Define função etl silver

    df_bronze = spark.table("bronze_co2_emission")    # Lê a tabela Bronze do catálogo como DataFrame Spark

    df_silver = (                                     # Inicia a transformação para a camada Silver
        df_bronze
        .select(                                      # Seleciona apenas colunas necessárias e renomeia
            col("Country").alias("country"),          # Renomeia Country -> country
            col("Year").alias("year"),                # Renomeia Year -> year
            col("CO2_emission__Tons_").alias("co2_emission") # Renomeia CO2_emission__Tons_ -> co2_emission
        )
        .withColumn("year", col("year").cast("int"))  # Converte 'year' para inteiro
        .withColumn("co2_emission", col("co2_emission").cast("double")) # Converte emissão para double
        .where(col("country").isNotNull() & col("year").isNotNull())    # Remove registros sem país ou ano
    )

    df_silver.write.format("delta").mode("overwrite").saveAsTable("silver_co2_emission") # Formata como Delta, sobrescreve caso existente e salva como tabela
    return df_silver                                   # Retorna o DataFrame Silver


def etl_gold():
    """
    TRANSFORMAÇÃO + CARGA GOLD (dimensões + fato)
    - Cria dim_country (países).
    - Cria dim_year (anos e décadas).
    - Constrói fact_co2_emissions por meio de junções.
    """
    df_silver = spark.table("silver_co2_emission")     # Lê a tabela Silver do catálogo como DataFrame Spark

    # Dimensão de países
    df_dim_country = (                                 # Inicia construção da dimensão de países
        df_silver
        .select("country").distinct()                  # Seleciona somente a coluna país mantendo apenas países únicos
        .withColumn("country_id", monotonically_increasing_id()) # Cria chave surrogate para cada país
        .select("country_id", "country")               # Organiza colunas na ordem final
    )
    df_dim_country.write.format("delta").mode("overwrite").saveAsTable("dim_country") # Formata como Delta, sobrescreve caso existente e salva como tabela

    # Dimensão de anos
    df_dim_year = (                                    # Inicia construção da dimensão de ano
        df_silver
        .select("year").distinct()                     # Seleciona anos únicos
        .withColumn("decade", (col("year")/10).cast("int") * 10) # Calcula década 
        .select(col("year").alias("year_key"), "decade") # Renomeia ano para year_key e mantém decade
    )
    df_dim_year.write.format("delta").mode("overwrite").saveAsTable("dim_year") # Formata como Delta, sobrescreve caso existente e salva como tabel

    # Tabela fato
    df_fact = (                                        # Constrói a tabela fato
        df_silver.alias("s")                           # Alias para Silver como 's'
        .join(df_dim_country.alias("c"), "country")    # Junção de Silver com dim_country pela chave 'country'
        .join(df_dim_year.alias("y"), col("s.year") == col("y.year_key")) # Junção com Silver com dim_year pelo ano
        .select(                                       # Seleciona apenas as chaves e a medida para a fato
            "c.country_id",
            "y.year_key",
            "y.decade",
            "s.co2_emission"
        )
    )
    df_fact.write.format("delta").mode("overwrite").saveAsTable("fact_co2_emissions") # Formata como Delta, sobrescreve caso existente e salva como tabel

    return df_fact                                     # Retorna o DataFrame Fato


# Executa Silver e Gold para materializar o modelo estrela
df_silver = etl_silver()
df_fact   = etl_gold()

# Mostra 5 linhas de cada DataFrame
display(df_silver.limit(5))
display(df_fact.limit(5))


### 3.2 - Catálogo de Dados

In [0]:
# %python
from pyspark.sql.functions import min as spark_min, max as spark_max, countDistinct, col # Agregações para catálogo
from pyspark.sql.types import StructType, StructField, StringType # Tipos para schema fixo STRING

spark.sql("USE mvp_co2")                                 # Garante database ativo no Spark

df_fact = spark.table("fact_co2_emissions")              # Lê a tabela fato para catalogar
schema = df_fact.schema                                  # Define o Schema

# Definimos o schema manualmente como STRING para evitar conflitos de tipo
schema_catalogo = StructType([                           
    StructField("tabela", StringType(), True),
    StructField("coluna", StringType(), True),
    StructField("tipo", StringType(), True),
    StructField("valor_min", StringType(), True),
    StructField("valor_max", StringType(), True),
    StructField("categorias", StringType(), True)
])

rows = []                                                # Lista que vai acumular linhas do catálogo

for field in df_fact.schema.fields:                      # Itera por cada coluna do schema da fato
    name = field.name                                    # Nome da coluna
    dtype = field.dataType.simpleString()                # Tipo Spark em string 

    if dtype in ["double", "int", "bigint", "float", "smallint"]: # Se for numérica
        stats = df_fact.select(                          # Calcula min e max para coluna numérica
            spark_min(col(name)).alias("min"),
            spark_max(col(name)).alias("max")
        ).collect()[0]                                   # Coleta o resultado (uma linha)

        rows.append((                                    # Adiciona linha ao catálogo
            "fact_co2_emissions",                        # Nome da tabela catalogada
            name,                                        # Nome da coluna
            dtype,                                       # Tipo
            str(stats["min"]),                           # Converte mínimo para string
            str(stats["max"]),                           # Converte máximo para string
            None
        ))
    
    else:                                                # Se não for numérica
        distinct = df_fact.select(countDistinct(col(name)).alias("n")).collect()[0]["n"] # Conta número de valores distintos
        categorias = None                                # Inicialmente sem categorias listadas

        if distinct <= 20:                               # Se baixa cardinalidade, lista categorias
            categorias = ", ".join([r[name] for r in df_fact.select(name).distinct().collect()]) #extrai os valores únicos de uma coluna do Spark e os transforma em uma única frase (string) separada por vírgulas.

        rows.append((                                    # Adiciona linha ao catálogo
            "fact_co2_emissions",                        # Nome da tabela catalogada
            name,                                        # Nome da coluna
            dtype,                                       # Tipo
            None,
            None,
            categorias
        ))

df_catalogo = spark.createDataFrame(rows, schema_catalogo) # Cria DataFrame com schema fixo

df_catalogo.write.mode("overwrite").saveAsTable("catalogo_dados") # Salva catálogo como tabela

display(df_catalogo)                                    # Exibe o catálogo  


## 4 - Carga e qualidade dos dados

Nesta etapa, será feita a carga dos dados para o Data Warehouse/Data Lake. 

A pipeline de ETL foi implementada em três níveis:

- Extração (E): realizada através da função etl_bronze_from_kaggle, utilizando a biblioteca kagglehub para buscar os dados diretamente na Kaggle e carregá-los na camada BRONZE.

- Transformação (T): implementada nas funções etl_silver e etl_gold, que tratam e padronizam os dados (tipos, nulos, renomeação de colunas) e constroem dimensões e a tabela fato.

- Carga (L): concluída pelo salvamento das tabelas em formato Delta (bronze_co2_emission, silver_co2_emission, dim_country, dim_year, fact_co2_emissions), todas gerenciadas pelo catálogo do Databricks.

Em seguida foi realizada uma análise da qualidade dos dados em cada etapa relevante (Silver e Fato). 

De modo geral, o conjunto de dados apresenta boa qualidade, já que se trata de um dataset curado. Porém, foram removidas filas com país ou ano nulos na etapa Silver, para evitar impacto nas análises.

In [0]:
# %python

spark.sql("USE mvp_co2")

# Executa a pipeline completa de ETL:
df_bronze = etl_bronze_from_kaggle()                   # Extração + Bronze
df_silver = etl_silver()                               # Transformação + Silver
df_fact   = etl_gold()                                 # Transformação + Gold (dimensões + fato)


### 4.1 - Percentual de nulos por coluna (Silver e Fato)

In [0]:
# %python
from pyspark.sql.functions import count, when          # Funções para contagem condicional

def null_profile(table_name: str):                     # Função para perfil de nulos
    print(f"=== {table_name} ===")                     # Exibe o nome da tabela antes
    df = spark.table(table_name)                       # Lê tabela como DataFrame
    total = df.count()                                 # Total de linhas 
    display(                                           # Exibe um DataFrame com % nulos por coluna
        df.select([
            (count(when(col(c).isNull(), c)) / total).alias(c) # nulos/total para cada coluna
            for c in df.columns                        # para todas as colunas
        ])
    )

spark.sql("USE mvp_co2")

# Proporção de nulos na camada SILVER e na FATO
null_profile("silver_co2_emission")
null_profile("fact_co2_emissions")


### 4.2 - Estatísticas básicas (year e co2_emission)

In [0]:
# %python
df_s = spark.table("silver_co2_emission")                 # Lê Silver

display(                                                  # Exibe estatísticas descritivas
    df_s.select("year", "co2_emission")                   # Seleciona colunas numéricas
        .summary("count", "min", "max", "mean", "stddev") # Calcula resumo estatístico
)


## 5 - Análise dos dados

Nesta última etapa, procuramos fazer diversas análises utilizando consultas SQL sobre o modelo estrela. 

### 5.1 - Top 10 países que mais emitiram CO₂ na história

In [0]:
%sql

SELECT                                    -- Seleciona 
  c.country,                              -- País
  SUM(f.co2_emission) AS total_emission   -- Soma total de emissões no período
FROM fact_co2_emissions f                 -- vindo da Tabela fato
JOIN dim_country c                        -- Junta com dimensão de países 
  ON f.country_id = c.country_id          -- Pela chave surrogate
GROUP BY c.country                        -- Agrupa por país
ORDER BY total_emission DESC              -- Ordena do maior para o menor
LIMIT 10;                                 -- Limita aos 10 primeiros


Os resultados indicam a concentração da responsabilidade climática nos países que se industrializaram precocemente. É possível observar que os Estados Unidos lideram o ranking, com países europeus como Reino Unido e Alemanha logo em seguida, refletindo o impacto da Revolução Industrial e do uso intensivo de combustíveis fósseis ao longo do século XIX e XX. 

A presença da China em posição inferior, apesar de seu peso econômico atual, indica sua entrada mais tardia no processo industrial.

### 5.2 - Evolução de emissão global por década

In [0]:
%sql

SELECT                                                -- Seleciona
  decade,                                             -- Década (derivada)
  SUM(co2_emission) AS total_emission                 -- Soma total da década
FROM fact_co2_emissions                               -- vindo da Tabela fato
GROUP BY decade                                       -- Agrupa por década
ORDER BY decade;                                      -- Ordena cronologicamente


Os resultados mostram um crescimento progressivo e acelerado a partir do processo de industrialização, iniciado no século XVIII. 

Até meados do século XIX, o aumento das emissões ocorre de forma gradual, refletindo a expansão inicial do uso de carvão e da mecanização industrial. Porém, a partir do século XX, tem-se um aumento significativo, associado à industrialização em larga escala, ao crescimento populacional e à expansão do consumo energético. 

O salto mais expressivo ocorre após a década de 1950, indicando a consolidação de um modelo econômico altamente dependente de combustíveis fósseis, com impactos ambientais cumulativos de longo prazo.

### 5.3 - Top 10 países que mais emitiram CO2 a partir de 1970

In [0]:
%sql

SELECT                                          -- Seleciona
  c.country,                                    -- País
  SUM(f.co2_emission) AS emission_1970_on       -- Soma total da década
FROM fact_co2_emissions f                       -- vindo da Tabela fato
JOIN dim_country c                              -- Junta com dimensão de países 
  ON f.country_id = c.country_id                -- Relaciona pela chave surrogate do país
WHERE f.year_key >= 1970                        -- Onde o ano é maior ou igual a 1970
GROUP BY c.country                              -- Agrupa por país
ORDER BY emission_1970_on DESC                  -- Ordena da maior para a menor emissão
LIMIT 10;                                       -- Limita aos 10 primeiros 


Se compararmos estes resultados aos mostrados em 5.1, podemos notar uma reconfiguração parcial do ranking global de emissores. 

Embora os Estados Unidos permaneçam como o maior emissor no período, a China assume posição de destaque, refletindo sua rápida industrialização e expansão econômica a partir do final do século XX. Economias tradicionais europeias ainda apresentam valores elevados, mas com crescimento mais moderado. 

A entrada da Índia no ranking evidencia o papel crescente das economias emergentes no cenário contemporâneo das emissões globais.

### 5.4 - Comparação entre dois países ao longo do tempo

In [0]:
%sql

SELECT                                         -- Seleciona
  f.year_key AS year,                          -- Ano da emissão
  c.country,                                   -- Nome do país
  f.co2_emission                               -- Valor da emissão de CO₂
FROM fact_co2_emissions f                      -- Dados vindos da tabela fato de emissões
JOIN dim_country c                             -- Junta com a dimensão de países
  ON f.country_id = c.country_id               -- Relaciona pela chave surrogate do país
WHERE c.country IN ('Brazil', 'United States') -- Onde o país é Brasil ou Estados Unidos
ORDER BY year, country;                        -- Ordena primeiro pelo ano e depois pelo país


Os resultados mostram trajetórias de industrialização e impacto ambiental bem distintas entre Estados Unidos e Brasil. Enquanto os Estados Unidos apresentam emissões crescentes desde o início do século XIX, refletindo sua industrialização precoce e contínua expansão econômica, o Brasil mantém valores nulos até o início do século XX, com crescimento gradual a partir de então.

Mesmo com aumento industrial nas últimas décadas, as emissões brasileiras continuam significativamente inferiores às norte-americanas, em termos de totais.

### 5.5 - Maior percentual de participação de emissão em 2020

In [0]:
%sql

WITH ultimo_ano AS (                             -- CTE que identifica o ano mais recente disponível
  SELECT MAX(year_key) AS year_max               -- Obtém o maior valor de ano na tabela fato
  FROM fact_co2_emissions                        -- A partir da tabela fato de emissões de CO₂
),
totais AS (                                      -- CTE que filtra os dados apenas para o último ano
  SELECT                                         -- Seleciona
    f.country_id,                                -- País
    f.year_key,                                  -- Ano da emissão
    f.co2_emission                               -- Valor da emissão de CO₂
  FROM fact_co2_emissions f                      -- Dados vindos da tabela fato
  JOIN ultimo_ano u                              -- Junta com a CTE do último ano
    ON f.year_key = u.year_max                   -- Mantém somente registros do ano mais recente
),
soma_global AS (                                 -- CTE que calcula o total mundial de emissões
  SELECT SUM(co2_emission) AS total_global       -- Soma todas as emissões do último ano
  FROM totais                                    -- A partir dos dados filtrados por ano
)
SELECT
  c.country,                                     -- Nome do país
  t.year_key AS year,                            -- Ano da emissão (renomeado para year)
  t.co2_emission,                                -- Emissão de CO₂ do país
  ROUND((t.co2_emission / s.total_global) * 100, 2) AS perc_global  -- Percentual da emissão do país em relação ao total global
FROM totais t                                    -- Dados de emissões por país no último ano
CROSS JOIN soma_global s                         -- Junta com o total global (valor único)
JOIN dim_country c                               -- Junta com a dimensão de países
  ON t.country_id = c.country_id                 -- Relaciona pela chave surrogate do país
ORDER BY perc_global DESC                        -- Ordena do maior para o menor percentual global
LIMIT 15;                                        -- Retorna apenas os 15 países com maior participação


Esse resultado reforça a diferença entre emissões acumuladas e emissões atuais, onde os Estados Unidos concentram a maior parcela individual, seguidos pela China, demonstrando a centralidade dessas economias na dinâmica climática recente. Os países europeus apresentam percentuais menores, possivelmente associados à adoção de políticas de transição energética e eficiência ambiental. 



### 5.6 - Menores aumentos de emissão entre décadas (1990 vs 2010)

In [0]:
%sql

WITH por_decada AS (                             -- CTE que agrega as emissões por país e década
  SELECT                                         -- Seleciona
    c.country,                                   -- Nome do país
    f.decade,                                    -- Década da emissão (ex: 1990, 2010)
    SUM(f.co2_emission) AS total_decade          -- Soma total de CO₂ emitido na década
  FROM fact_co2_emissions f                      -- Dados da tabela fato de emissões
  JOIN dim_country c                             -- Junta com a dimensão de países
    ON f.country_id = c.country_id               -- Relaciona pela chave surrogate do país
  GROUP BY c.country, f.decade                   -- Agrupa por país e década
),
duas_decadas AS (                                -- CTE que isola e compara duas décadas específicas
  SELECT
    country,                                     -- País
    SUM(CASE WHEN decade = 1990 
             THEN total_decade END) AS dec_1990, -- Emissão total da década de 1990
    SUM(CASE WHEN decade = 2010 
             THEN total_decade END) AS dec_2010  -- Emissão total da década de 2010
  FROM por_decada                                -- Usa os dados já agregados por década
  WHERE decade IN (1990, 2010)                   -- Considera apenas as décadas de interesse
  GROUP BY country                               -- Agrupa por país
)
SELECT
  country,                                       -- Nome do país
  dec_1990,                                      -- Emissão na década de 1990
  dec_2010,                                      -- Emissão na década de 2010
  (dec_2010 - dec_1990) AS diferenca             -- Diferença absoluta entre as décadas
FROM duas_decadas                                -- Dados consolidados das duas décadas
WHERE dec_1990 IS NOT NULL                       -- Garante que o país tenha dados em 1990
  AND dec_2010 IS NOT NULL                       -- Garante que o país tenha dados em 2010
ORDER BY diferenca ASC                           -- Ordena da maior redução (valor mais negativo)
LIMIT 10;                                        -- Retorna os 10 países com maior redução


Os resultados indicam que países com os menores aumentos absolutos de emissões entre 1990 e 2010 são majoritariamente pequenos territórios ou ilhas com baixa atividade industrial.

### 5.7 - Maiores economias e suas emissões

In [0]:
%sql

SELECT                                    -- Seleciona 
  c.country,                              -- País
  SUM(f.co2_emission) AS total_emission   -- Soma total de emissões no período
FROM fact_co2_emissions f                 -- vindo da Tabela fato
JOIN dim_country c                        -- Junta com dimensão de países 
  ON f.country_id = c.country_id          -- Pela chave surrogate
WHERE c.country IN ('United States',      -- Onde o país é um dos da lista informada
'China', 'Germany', 'Japan', 'India', 
'Russia', 'United Kingdom','France', 
'Italy','Canada')
GROUP BY c.country                        -- Agrupa por país
ORDER BY total_emission DESC              -- Ordena do maior para o menor

Os resultados evidenciam a forte correlação entre desenvolvimento econômico, industrialização e impacto ambiental. 

Os Estados Unidos apresentam o maior volume de emissões históricas, seguidos por economias europeias como Reino Unido e Alemanha, cuja industrialização ocorreu de forma precoce e intensiva em combustíveis fósseis. A China, apesar de sua industrialização mais recente, já figura entre os maiores emissores acumulados, refletindo seu rápido crescimento econômico.