
# Projeto Prático - Pipeline Python para Dados via APIs

## Objetivo do projeto

#### Construir um pipeline de extração → tratamento → disponibilização de dados usando Python (ou PySpark/Databricks, conforme preferência). Cada participante deve extrair dados de pelo menos duas APIs públicas distintas, realizar transformações e entregá-los em formato consumível via GIT (CSV / Parquet / tabela em Snowflake/S3 / REST endpoint, Dashboard e entre outros de sua preferência) disponibilizando os dados da melhor maneira possível, aplicando práticas de Engenharia de Dados (orquestração, particionamento, testes de qualidade, documentação).

##### Desenvolvido por: Douglas Santos Azambuja

In [0]:
# importar Bibliotecas 

import requests
import pandas as pd
import pyspark as spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

In [0]:
# URL da API
url = "https://disease.sh/v3/covid-19/countries"

# Faz a chamada GET
response = requests.get(url, headers={"accept": "application/json"})

# Verifica se deu certo
if response.status_code == 200:
    data = response.json()
    
    # Normaliza JSON aninhado para DataFrame
    df_covid = pd.json_normalize(data)

    # Criar coluna com a data e hora da importação, usei o timedetal para retornar o horário mais compativel com o nosso do Brasil
    df_covid["dt_import_covid"] = pd.to_datetime(datetime.now() - timedelta(hours=3))
    
    # Exibe as primeiras linhas
    display(df_covid)
else:
    print("Erro:", response.status_code, response.text)

In [0]:
# Verificando o tipo de dados

df_covid.info()

In [0]:
# Verificando a quantidade de campos nulos

df_covid.isnull().sum()

In [0]:
## Importando a segunda API com dos dados relacionados a quantidade de vacinas de covid aplicadas

# URL da API
url = "https://disease.sh/v3/covid-19/vaccine/coverage/countries?lastdays=1"

# Faz a chamada GET
response = requests.get(url, headers={"accept": "application/json"})

# Verifica se deu certo
if response.status_code == 200:
    data = response.json()
    
    # Normaliza JSON aninhado para DataFrame
    df_vacina = pd.json_normalize(data)

    # Criar coluna com a data e hora da importação, usei o timedetal para retornar o horário mais compativel com o nosso do Brasil
    df_vacina["dt_import_vacina"] = pd.to_datetime(datetime.now() - timedelta(hours=3))
    
    # Exibe as primeiras linhas
    display(df_vacina)
else:
    print("Erro:", response.status_code, response.text)

In [0]:
# Verificando se tem campos nulos

df_vacina.isnull().sum()

In [0]:
# Renomeando o nome do campo para vaccina

df_vacina = df_vacina.rename(columns={"timeline.7/21/25": "vaccina"})

display(df_vacina)

In [0]:
# Fazendo um left join pela coluna "country"

df_consolidado = pd.merge(df_covid, df_vacina, on='country', how='left')

# Quando fiz o merge o tipo da coluna foi alterado para float
df_consolidado["vaccina"] = df_consolidado["vaccina"].astype("Int64")

display(df_consolidado)

In [0]:
df_nvaccina = pd.merge(df_covid,df_vacina,on="country",how="left",indicator=True)

# Filtra apenas os que não foram encontrados em df_vacina
df_nvaccina = df_nvaccina[df_nvaccina["_merge"] == "left_only"]

# Remove a coluna auxiliar se quiser
df_nvaccina = df_nvaccina.drop(columns=["_merge","vaccina","dt_import_vacina"])

display(df_nvaccina)

In [0]:
# Selecionando apenas as colunas que acredito serem mais relevantes para o estudo

df_consolidado1 = df_consolidado[["continent","country","population" ,"cases", "deaths","vaccina","recovered"]]

display(df_consolidado1)

In [0]:
# Acabei encontrando dois países com dados vazios na coluna continent

# Remover apenas linhas onde 'continent' está vazio ou nulo

df_consolidado1 = df_consolidado1[df_consolidado1["continent"].notna() & (df_consolidado1["continent"] != "")]

display(df_consolidado1)

In [0]:
# Separando alguns percentis de vacina para analise

df_consolidado1["percentage_vaccina"] = ((df_consolidado1["vaccina"] / df_consolidado1["population"]) * 100).round(2)

display(df_consolidado1)

In [0]:
# Separando alguns percentis de mortes para analise

df_consolidado1["percentage_deaths"] = ((df_consolidado1["deaths"] / df_consolidado1["cases"]) * 100).round(2)

display(df_consolidado1)

In [0]:
# Se parando apenas o continent Europa

df_europe = df_consolidado1[df_consolidado1["continent"] == "Europe"]

display(df_europe)

In [0]:
# Ordena pela coluna 'vacinados' do maior para o menor e pega os 10 primeiros

top10_vaccina_euro = df_europe.sort_values(by="percentage_vaccina", ascending=False).head(10) 

display(top10_vaccina_euro)

In [0]:
# Ordena pela coluna 'deaths' do maior para o menor e pega os 10 primeiros

top10_deaths_euro = df_europe.sort_values(by="percentage_deaths", ascending=False).head(10)

display(top10_deaths_euro)

In [0]:
# Gráfico de barras
plt.figure(figsize=(12,5))
bars = plt.bar(top10_vaccina_euro["country"], top10_vaccina_euro["percentage_vaccina"], color='darkgreen')

for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, height + 0, f'{height}%', ha='center', va='bottom')

plt.title("Top países da Europa por percentual de vacinados")
plt.xlabel("País")
plt.ylabel("Percentual de vacinados (%)")
plt.ylim(0, 600)  # eixo y de 0 a 100%
plt.show()

In [0]:
# Gráfico de barras
plt.figure(figsize=(12,5))
bars = plt.bar(top10_deaths_euro["country"], top10_deaths_euro["percentage_deaths"], color='darkred')

for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, height + 0, f'{height}%', ha='center', va='bottom')
    
plt.title("Top países da Europa por percentual de mortos")
plt.xlabel("País")
plt.ylabel("Percentual de mortos (%)")
plt.ylim(0, 6)  # eixo y de 0 a 100%
plt.show()

In [0]:
# Converte para Spark DataFrame
df_spark_consolidado = spark.createDataFrame(df_consolidado1)

# Salva no catálogo/ schema correto
df_spark_consolidado.write.mode("overwrite").saveAsTable(
    "estudo.default.covid_vacinados_parquet"
)

# Salva em parquet dentro do Workspace (visível na UI)
df_consolidado1.to_parquet("/Workspace/Users/doazambuja@gmail.com/projetos-desenvolvimento-pessoal/notebooks/projeto-upskill/dados_covid.parquet", index=False)

In [0]:
# Consultando a tabela 

df_consulta = spark.table("estudo.default.covid_vacinados_parquet")
df_consulta.show()