In [0]:
!pip install python-dotenv

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import lit
import pandas as pd
import requests, time, threading
from datetime import datetime
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import os

In [0]:
load_dotenv()
CLIENT_ID = os.getenv("CLIENT_ID")
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN")

In [0]:
offset = 0
step = 100
max_offset = 1000

def get_igdb_data():
    global offset

    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {ACCESS_TOKEN}"
    }

    body = f"""
        fields id, name, total_rating, total_rating_count, first_release_date,
        game_type, rating, rating_count, summary, genres.name, platforms.name;
        sort total_rating desc;
        where total_rating != null & total_rating_count > 100;
        limit {step};
        offset {offset};
    """

    url = "https://api.igdb.com/v4/games"
    response = requests.post(url, headers=headers, data=body)
    
    if response.status_code != 200:
        print(f"Erro IGDB: {response.status_code} - {response.text}")
        return pd.DataFrame()

    data = response.json()
    pdf = pd.json_normalize(data)

    if "id" in pdf.columns:
        pdf["id"] = pdf["id"].astype(str)

    # Converter listas em strings
    pdf["genres"] = pdf["genres"].apply(lambda x: ", ".join([g["name"] for g in x]) if isinstance(x, list) else None)
    pdf["platforms"] = pdf["platforms"].apply(lambda x: ", ".join([p["name"] for p in x]) if isinstance(x, list) else None)

    return pdf

def stream_igdb_data():
    global offset

    while True:
        pdf = get_igdb_data()

        offset += step
        if offset >= max_offset:
            offset = 0

        if not pdf.empty:
            expected_columns = [
                "id", "name", "total_rating", "total_rating_count", "first_release_date",
                "game_type", "rating", "rating_count", "summary", "genres", "platforms"
            ]

            for col in expected_columns:
                if col not in pdf.columns:
                    pdf[col] = None

            for col in ["total_rating", "total_rating_count", "rating", "rating_count"]:
                pdf[col] = pd.to_numeric(pdf[col], errors='coerce')

            pdf = pdf[expected_columns]

            spark_df = spark.createDataFrame(pdf)
            spark_df = spark_df.withColumn("ingestion_time", lit(datetime.now().isoformat()))

            spark_df.write.mode("append").format("delta").save("/mnt/raw/igdb/games")

            print(f"Ingested {len(pdf)} records at {datetime.now().isoformat()}")
        else:
            print("Nenhum dado retornado da API.")

        time.sleep(60)

# Iniciar ingestão contínua
threading.Thread(target=stream_igdb_data).start()

# Leitura contínua do Delta
df_stream = spark.readStream.format("delta").load("/mnt/raw/igdb/games")

df_stream.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()


In [0]:
# Este código foi usado pois haviam erros na criação da tabela e no salvamento dos dados, esta etapa criou uma tabela com os dados corretos e permitiu que o streaming funcionasse corretamente

# pdf = get_igdb_data()
# if not pdf.empty:
#     pdf["id"] = pdf["id"].astype(str)
#     spark_df = spark.createDataFrame(pdf)
#     spark_df = spark_df.withColumn("ingestion_time", lit(datetime.now().isoformat()))
#     spark_df.write.mode("overwrite").format("delta").save("/mnt/raw/igdb/games")


In [0]:
display(spark.read.format("delta").load("/mnt/raw/igdb/games"))