In [2]:
#Teste myenv
import pandas as pd

# Projeto

In [None]:
import requests
import sqlite3
from plyer import notification
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

In [None]:
# Função para exibir alerta usando plyer
def alerta():
    notification.notify(
        title='Erro na API!',
        message='Ocorreu um erro ao acessar a API.',
        timeout=5  
    )

In [None]:
# URL base da API
base_url = 'https://pokeapi.co/api/v2/pokemon'

In [None]:
# Função para buscar dados da API
def buscar_dados():
    try:
        response = requests.get(base_url)
        if response.status_code == 200:
            data = response.json()
            return data
        else:
            alerta()
            return None
    except requests.exceptions.RequestException as e:
        alerta()
        return None

In [None]:
# Configurando uma sessão do Spark
spark = SparkSession.builder \
    .appName("Extracao e Tratamento de Dados de Pokemon") \
    .getOrCreate()

In [None]:
# Função para extrair dados brutos e retornar um DataFrame PySpark
def extrair_dados_brutos():
    data = buscar_dados()
    if data:
        pokemons = data['results'][:10]  # 10 primeiros pokémons
        dados_brutos = []

        for pokemon in pokemons:
            response = requests.get(pokemon['url'])
            if response.status_code == 200:
                pokemon_data = response.json()
                abilities = [ability['ability']['name'].capitalize() for ability in pokemon_data['abilities']]
                types = [t['type']['name'].capitalize() for t in pokemon_data['types']]
                dados_brutos.append({
                    'ID': pokemon_data['id'],
                    'Nome': pokemon_data['name'].capitalize(),
                    'Altura': pokemon_data['height'],
                    'Peso': pokemon_data['weight'],
                    'Habilidades': abilities,
                    'Tipos': types
                })
        df = spark.createDataFrame(dados_brutos)
        return df
    else:
        return None

In [None]:
# Função para processar os dados usando PySpark
def processar_dados_com_spark():
    # Extraindo dados brutos
    df = extrair_dados_brutos()
    if df is not None:
        # Tabela 1: Dados Básicos
        df_dados_basicos = df.select('ID', 'Nome', 'Altura', 'Peso')

        # Tabela 2: Habilidades dos Pokémon
        df_habilidades = df.select('ID', explode(col('Habilidades')).alias('Habilidade'))

        # Tabela 3: Tipos dos Pokémon
        df_tipos = df.select('ID', explode(col('Tipos')).alias('Tipo'))

        # Exibindo DataFrames tratados
        print("\nTabela 1: Dados Básicos")
        df_dados_basicos.show()

        print("\nTabela 2: Habilidades dos Pokémon")
        df_habilidades.show()

        print("\nTabela 3: Tipos dos Pokémon")
        df_tipos.show()

        # Salvando dados tratados em um banco de dados SQLite
        salvar_dados_no_sqlite(df_dados_basicos, 'pokemon_db.sqlite', 'pokemon_dados_basicos')
        salvar_dados_no_sqlite(df_habilidades, 'pokemon_db.sqlite', 'pokemon_habilidades')
        salvar_dados_no_sqlite(df_tipos, 'pokemon_db.sqlite', 'pokemon_tipos')

    else:
        print("Não foi possível extrair dados da API.")

    # Encerrando a sessão do Spark
    spark.stop()

In [None]:
# Função para salvar dados em um banco de dados SQLite
def salvar_dados_no_sqlite(dataframe, db_file, table_name):
    # Salvar DataFrame Spark no SQLite
    dataframe.write.format("jdbc") \
        .option("url", "jdbc:sqlite:" + db_file) \
        .option("dbtable", table_name) \
        .option("mode", "overwrite") \
        .save()


In [None]:
# Chamada da função principal para execução com PySpark
processar_dados_com_spark()