Importando Biblioteca

In [0]:
# Pyspark API python para apache spark
from pyspark.sql import functions as F 


Carregando o DataSet

In [0]:
# Import do dataset de emissões e remoções de carbono em Python
dados_nacionais = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("workspace.dados.dados_nacionais_final")
)

In [0]:
# Import do dataset de créditos de carbono em Python
dados_creditos = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("workspace.dados.data_mundial_bank")
)

In [0]:
# Carrega a versão otimizada dos dados.(leitura mais rapida)
dados_nacionais = spark.read.table("workspace.dados.dados_nacionais_final")
dados_creditos = spark.read.table("workspace.dados.data_mundial_bank")


Conhecendo o tipo de dados do dataset

In [0]:
# Tamanho do Dataset de emissões e remoções
quantidade_de_colunas = len(dados_nacionais.columns)
quantidade_de_linhas = dados_nacionais.count()

print(f"Quantidade de Linhas: {quantidade_de_linhas}")
print(f"Quantidade de Colunas: {quantidade_de_colunas}")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5342418914219572>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Tamanho do Dataset[39;00m
[0;32m----> 2[0m quantidade_de_colunas [38;5;241m=[39m [38;5;28mlen[39m(dados_nacionais[38;5;241m.[39mcolumns)
[1;32m      3[0m quantidade_de_linhas [38;5;241m=[39m dados_nacionais[38;5;241m.[39mcount()
[1;32m      5[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mQuantidade de Linhas: [39m[38;5;132;01m{[39;00mquantidade_de_linhas[38;5;132;01m}[39;00m[38;5;124m"[39m)

File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:289[0m, in [0;36mDataFrame.columns[0;34m(self)[0m
[1;32m    287[0m [38;5;129m@property[39m
[1;32m    288[0m [38;5;28;01mdef[39;00m [38;5;21mcolumns[39m([38;5;28mself[39m) [38;5;241m-[39m[

Converter de formato largo para longo

In [0]:
# Identificar colunas de ano
anos = [c for c in dados_nacionais.columns if c.isdigit()]

# Explodir anos em linhas
dados_nacionais_long = dados_nacionais.select(
    "Emissão/Remoção/Bunker", "Gás", "Setor de emissão",
    "Categoria emissora", "Sub-categoria emissora",
    "Produto ou sistema", "Detalhamento", "Recorte",
    "Atividade geral", "Estado", "Bioma",
    F.explode(F.array([
        F.struct(F.lit(a).alias("Ano"), F.col(a).alias("Valor")) for a in anos
    ])).alias("temp")
).select(
    "*", "temp.Ano", "temp.Valor"
).drop("temp")

# Converter tipos
dados_nacionais_long = dados_nacionais_long.withColumn("Ano", F.col("Ano").cast("int"))
dados_nacionais_long = dados_nacionais_long.withColumn("Valor", F.round(F.col("Valor").cast("double"),2))

# Filtrar valores válidos
dados_nacionais_long = dados_nacionais_long.filter(F.col("Valor").isNotNull())

display(dados_nacionais_long.limit(100))




Limpeza dos dados

In [0]:
# Remoção de duplicatas
colunas_chave = [
    "Emissão/Remoção/Bunker",
    "Gás",
    "Setor de emissão",
    "Categoria emissora", 
    "Sub-categoria emissora", 
    "Produto ou sistema", 
    "Detalhamento", 
    "Recorte", 
    "Atividade geral",
    "Estado", 
    "Bioma",
    "Ano", 
    "Setor de emissão", 
    "Sub-categoria emissora"
]

dados_nacionais_sem_duplicatas = dados_nacionais_long.dropDuplicates(subset=colunas_chave)

linhas_duplicadas = dados_nacionais_long.count() - dados_nacionais_sem_duplicatas.count()
print("Linhas Duplicadas Removidas:", linhas_duplicadas)
display(dados_nacionais_sem_duplicatas)



Substituição de zero por null

In [0]:
# Lista das colunas numéricas para substituir o 0 por 'null'.

colunas_numericas = [
    col_name for col_name, col_type in dados_nacionais_sem_duplicatas.dtypes
    if col_type in (
        'int', 'double', 'float', 'long', 'short', 'decimal'
    )
]
# Contagem dos zeros por linha
expressoes_count = [
    F.when(F.col(c) == 0, 1).otherwise(0) for c in colunas_numericas
]

if not expressoes_count:
    print("Nenhuma coluna numérica encontrada para substituir '0's.")
else:
    zeros_array = F.array(*expressoes_count)
    zeros_por_linha = F.aggregate(
        zeros_array,
        F.lit(0),
        lambda acc, x: acc + x
    ).alias("zeros_por_linha")
    contagem_dados_nacionais_sem_duplicatas = dados_nacionais_sem_duplicatas.select(
        zeros_por_linha
    ).agg(
        F.sum("zeros_por_linha").alias("total_substituicoes")
    )
    total_substituicoes = contagem_dados_nacionais_sem_duplicatas.first()["total_substituicoes"]
    total_substituicoes = total_substituicoes if total_substituicoes is not None else 0

    total_linhas = dados_nacionais_sem_duplicatas.count()
    total_colunas = len(dados_nacionais_sem_duplicatas.columns)
    total_celulas = total_linhas * total_colunas
    
    # Porcentagem de substituição
    if total_celulas > 0:
        porcentagem = (total_substituicoes / total_celulas) * 100
    else:
        porcentagem = 0

    print("--- Estatísticas da Substituição ---")
    print(f"Número total de substituições (células com 0): {total_substituicoes}")
    print(f"Total de células no DataFrame: {total_celulas} ({total_linhas} linhas x {total_colunas} colunas)")
    print(f"Porcentagem de células modificadas: {porcentagem:.6f} %")

#Substituição de zero por null
dados_nacionais_sem_duplicatas = dados_nacionais_sem_duplicatas.na.replace(0, None)

display(dados_nacionais_sem_duplicatas)



In [0]:
# Filtrar os anos de 2013 a 2023
dados_filtrados = dados_nacionais_sem_duplicatas.filter(
    (F.col("Ano") >= 2013) & (F.col("Ano") <= 2023)
)

display(dados_filtrados)




Estatísticas e Tamanho do Dataset Tratado pra realizar análises e retirar insights

In [0]:
# Tamanho do Dataset Tratado e Filtrado
quantidade_de_colunas_1 = len(dados_filtrados.columns)
quantidade_de_linhas_1 = dados_filtrados.count()

print(f"Quantidade de Linhas: {quantidade_de_linhas_1}")
print(f"Quantidade de Colunas: {quantidade_de_colunas_1}")



In [0]:
# Mostra o tipo de dado (schema) de cada coluna
dados_filtrados.printSchema()



**Tabela dados_filtrados limpa e tratada para carregar no Power BI**

In [0]:
# Estatísticas completas, incluindo quartis e mediana (50%)
dados_filtrados.summary().show()



Criar tabela com Soma de Emissão e Remoção por Ano com filtro no Gás e coluna de Emissão/Remoção/Bunker

In [0]:
# Tabela com os dados filtrados pelo gas CO2e (t) GWP-AR6

filtro_gas = "CO2e (t) GWP-AR6"

dados_filtrados_com_gas = dados_filtrados.filter(F.col("Gás") == filtro_gas)

display(dados_filtrados_com_gas)



In [0]:
# Filtros a serem aplicados na agregação

valores_emissao = ["Emissão", "Bunker"]
coluna_categoria = "Emissão/Remoção/Bunker" 

soma_multiplos_anos = (
    dados_filtrados_com_gas
        .groupBy("Ano")
        .agg(
            # Emissão ou Bunker → Emissão
            F.round(
                F.sum(
                    F.when(
                        F.col(coluna_categoria).isin(["Emissão", "Bunker"]),
                        F.col("Valor").cast("double")
                    )
                ), 2
            ).alias("Emissão"),

            # Remoção → Remoção
            F.round(
                F.sum(
                    F.when(
                        F.col(coluna_categoria) == "Remoção",
                        F.col("Valor").cast("double")
                    )
                ), 2
            ).alias("Remoção")
        )
        .sort("Ano", ascending=False)
)

display(soma_multiplos_anos)



In [0]:
# Estatísticas completas, incluindo quartis e mediana (50%)
soma_multiplos_anos.summary().show()



Emissões e Remoções por Bioma

In [0]:
# Filtros aplicados para obter as emissões e remoções por biomas
(
    dados_filtrados_com_gas
    .groupBy("Bioma")
    .agg(
        # soma emissões
        F.round(
            F.sum(
                F.when(F.col(coluna_categoria).isin(valores_emissao), 
                       F.col("Valor").cast("double"))
                 .otherwise(0)
            ), 2
        ).alias("Emissão"),

        # soma remoções
        F.round(
            F.sum(
                F.when(F.col(coluna_categoria) == "Remoção", 
                       F.col("Valor").cast("double"))
                 .otherwise(0)
            ), 2
        ).alias("Remoção"),
    )
    .orderBy(F.desc("Emissão"))
    .display()
)



Emissões e Remoções por estado

In [0]:
# Total de emissões por estado, de todos os gases de 2013 - 2023
(
    dados_filtrados_com_gas 
    .groupBy("Estado")
    .agg(
        # soma emissões
        F.sum(
            F.when(
                F.col(coluna_categoria).isin(valores_emissao),
                F.col("Valor").cast("double")
            )
        ).alias("Emissão"),

        # soma remoções
        F.sum(
            F.when(
                F.col(coluna_categoria) == "Remoção",
                F.col("Valor").cast("double")
            )
        ).alias("Remoção")
    )
    .withColumn("Emissão", F.round(F.col("Emissão"), 2))
    .withColumn("Remoção", F.round(F.col("Remoção"), 2))
    .orderBy(F.desc("Emissão"))
    .display()
)



Emissões por gás

In [0]:
# Total de emissões por gás, de todos os gases de 2013 - 2023
(
    dados_filtrados
    .filter(F.col(coluna_categoria).isin(valores_emissao))  
    .groupBy("Gás")
    .agg(
        F.round(
            F.sum("Valor").cast("double"), 
            2
        ).alias("total_emissao")
    )
    .orderBy(F.desc("total_emissao"))
    .display()
)



Ranking de setores emissores

In [0]:
# Rankeamento dos setores emissores de gases
(
    dados_filtrados_com_gas
    .groupBy("Setor de emissão")
    .agg(
        # total de emissões (Emissão + Bunker)
        F.round(
            F.sum(
                F.when(
                    F.col(coluna_categoria).isin(valores_emissao),
                    F.col("Valor").cast("double")
                ).otherwise(0)
            ), 2
        ).alias("total_emissao"),

        # total de remoções
        F.round(
            F.sum(
                F.when(
                    F.col(coluna_categoria) == "Remoção",
                    F.col("Valor").cast("double")
                ).otherwise(0)
            ), 2
        ).alias("total_remocao")
    )
    .orderBy(F.desc("total_emissao"))
    .display()
)



Evolução temporal por bioma

In [0]:
# Evolução temporal por bioma, apenas emissões de todos os gases, 2013 - 2023
(
    dados_filtrados_com_gas
    .filter(F.col(coluna_categoria).isin(valores_emissao))  
    .groupBy("Ano", "Bioma")
    .agg(
        F.round(
            F.sum("Valor").cast("double"), 
            2
        ).alias("total_emissao")
    )
    .orderBy("Ano", "Bioma") 
    .display()
)




Balanço líquido, se houver linhas de remoção

In [0]:
# Calcular saldo por Bioma 
dados_filtrados_balanco_bioma = (
    dados_filtrados_com_gas
    .groupBy("Bioma")
    .agg(
        F.round(
            F.sum("Valor").cast("double"),
            2
        ).alias("saldo_carbono")
    )
)

display(
    dados_filtrados_balanco_bioma.orderBy(F.desc("saldo_carbono"))
)



In [0]:
# Calcular saldo por Estado 
dados_saldo = (
    dados_filtrados_com_gas
    .groupBy("Estado")
    .agg(
        F.round(
            F.sum("Valor").cast("double"),
            2  
        ).alias("saldo_carbono")
    )
)

display(
    dados_saldo.orderBy(F.desc("saldo_carbono"))
)


