<a href="https://colab.research.google.com/github/Bosnic/BetCadufe-soccer/blob/main/champions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Importa as bibliotecas necessárias
import requests
import json
import pandas as pd
from datetime import datetime, date, timedelta
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

In [None]:
# Autenticação no Google Colab
from google.colab import auth
auth.authenticate_user()


In [None]:
# Define as variáveis de ambiente
API_KEY = '95968922439b247c097e74f896df90b1' # Chave da API da API-Sports
API_URL = 'https://v3.football.api-sports.io' # URL base da API da API-Sports
PROJECT_ID = 'betcadufe-soccer' # ID do projeto no BigQuery
DATASET_NAME = 'soccer' # Nome do dataset no BigQuery
FULL_LOAD_DATE = '2022-01-01' # Data para carga completa inicial
LEAGUE = 2 # ID da liga na API-Sports
SEASON = 2022 # Ano da temporada

# Define o cabeçalho da requisição para a API
HEADERS = {
    'x-rapidapi-key': API_KEY
}

In [None]:
# Define os endpoints da API a serem consumidos
endpoints = [
    {
        "table": "past_fixtures", # Nome da tabela no BigQuery
        "write_disposition": "WRITE_APPEND", # Define o método de escrita no BigQuery (append)
        "path": "fixtures", # Caminho do endpoint na API-Sports
        "quality_control": False, # Flag para controle de qualidade (não utilizado neste caso)
        "params": { # Parâmetros da requisição para a API-Sports
            "league": LEAGUE,
            "season": SEASON
        },
        "incremental_load_params": { # Parâmetros para a carga incremental
            "from": "YYYY-MM-DD", # Data de início para a carga incremental
            "to": "YYYY-MM-DD" # Data de fim para a carga incremental
        },
        "fields": [], # Campos a serem selecionados na resposta da API (não utilizado neste caso)
        "nested_fields": [ # Campos aninhados a serem extraídos da resposta da API
            "fixture.id",
            "fixture.referee",
            "fixture.timezone",
            "fixture.date",
            "fixture.timestamp",
            "fixture.periods.first",
            "fixture.periods.second",
            "fixture.venue.id",
            "fixture.venue.name",
            "fixture.venue.city",
            "fixture.status.long",
            "fixture.status.short",
            "fixture.status.elapsed",
            "league.id",
            "league.name",
            "league.country",
            "league.logo",
            "league.flag",
            "league.season",
            "league.round",
            "teams.home.id",
            "teams.home.name",
            "teams.home.logo",
            "teams.home.winner",
            "teams.away.id",
            "teams.away.name",
            "teams.away.logo",
            "teams.away.winner",
            "goals.home",
            "goals.away",
            "score.halftime.home",
            "score.halftime.away",
            "score.fulltime.home",
            "score.fulltime.away",
            "score.extratime.home",
            "score.extratime.away",
            "score.penalty.home",
            "score.penalty.away"
        ],
        "repeatable_fields": [] # Campos que se repetem na resposta da API (não utilizado neste caso)
    },
    # Próximos endpoints seguem a mesma estrutura do anterior
    {
        "table": "future_fixtures",
        "write_disposition": "WRITE_TRUNCATE", # Define o método de escrita no BigQuery (truncate)
        "path": "fixtures",
        "quality_control": False,
        "params": {
            "league": LEAGUE,
            "season": SEASON,
            "to": "2099-12-31"
        },
        "incremental_load_params": {
            "from": "YYYY-MM-DD",
        },
        "fields": [],
        "nested_fields": [
            "fixture.id",
            "fixture.timezone",
            "fixture.date",
            "fixture.timestamp",
            "fixture.venue.id",
            "fixture.venue.name",
            "fixture.venue.city",
            "fixture.status.long",
            "fixture.status.short",
            "league.id",
            "league.name",
            "league.country",
            "league.logo",
            "league.flag",
            "league.season",
            "league.round",
            "teams.home.id",
            "teams.home.name",
            "teams.home.logo",
            "teams.away.id",
            "teams.away.name",
            "teams.away.logo"
        ],
        "repeatable_fields": []
    },
    {
        "table": "players",
        "write_disposition": "WRITE_TRUNCATE", # Define o método de escrita no BigQuery (truncate)
        "path": "players",
        "quality_control": True, # Flag para controle de qualidade (não utilizado neste caso)
        "params": {
            "league": LEAGUE,
            "season": SEASON,
            "page": 1 # Número da página de resultados da API
        },
        "fields": [],
        "nested_fields": [
            "player.id",
            "player.name",
            "player.firstname",
            "player.lastname",
            "player.age",
            "player.birth.date",
            "player.birth.place",
            "player.nationality",
            "player.height",
            "player.weight",
            "player.injured",
            "player.photo"
        ],
        "repeatable_fields": []
    },
]

# Define os endpoints da API a serem consumidos de forma iterativa,
# ou seja, para cada item do endpoint principal, este endpoint será chamado
iterable_endpoints = {
    "past_fixtures": [ # Define os endpoints iteráveis para o endpoint "past_fixtures"
        {
            "table": "fixturesStatistics", # Nome da tabela no BigQuery
            "write_disposition": "WRITE_APPEND", # Define o método de escrita no BigQuery (append)
            "path": "fixtures/statistics", # Caminho do endpoint na API-Sports
            "query_param": { # Define o parâmetro da requisição para a API-Sports que será usado para iterar sobre os dados do endpoint principal
                "fixture": "fixture.id" # O valor do campo "fixture.id" do endpoint principal será usado como valor para o parâmetro "fixture" da requisição
            },
            "fixed_params": {}, # Parâmetros fixos da requisição para a API-Sports
            "fields": ["fixture"], # Campos a serem selecionados na resposta da API
            "nested_fields": [ # Campos aninhados a serem extraídos da resposta da API
                "team.id",
                "team.name",
                "team.logo"
            ],
            "repeatable_fields": [ # Campos que se repetem na resposta da API
                "statistics"
            ]
        },
        {
            "table": "fixturesLineups", # Nome da tabela no BigQuery
            "write_disposition": "WRITE_APPEND", # Define o método de escrita no BigQuery (append)
            "path": "fixtures/lineups", # Caminho do endpoint na API-Sports
            "query_param": { # Define o parâmetro da requisição para a API-Sports que será usado para iterar sobre os dados do endpoint principal
                "fixture": "fixture.id" # O valor do campo "fixture.id" do endpoint principal será usado como valor para o parâmetro "fixture" da requisição
            },
            "fixed_params": {}, # Parâmetros fixos da requisição para a API-Sports
            "fields": [ # Campos a serem selecionados na resposta da API
                "fixture",
                "formation"
            ],
            "nested_fields": [ # Campos aninhados a serem extraídos da resposta da API
                "team.id"
            ],
            "repeatable_fields": [ # Campos que se repetem na resposta da API
                "startXI"
            ]
        }
    ]
}

In [None]:
# Função para buscar os dados da API
def fetch_data(path, params, headers):
    """
    Busca dados da API em várias páginas.

    Args:
        path (str): O caminho do endpoint da API.
        params (dict): Os parâmetros da consulta da API.
        headers (dict): Os cabeçalhos da requisição da API.

    Returns:
        list: Uma lista de dicionários contendo os dados da API.
    """
    all_data = [] # Inicializa uma lista vazia para armazenar todos os dados da API
    while True: # Loop infinito para buscar dados de todas as páginas
        response = requests.get(f"{API_URL}/{path}", headers=headers, params=params) # Faz a requisição para a API
        data = response.json() # Converte a resposta da API para um dicionário Python
        print(response.text) # Imprime a resposta da API no console
        if 'response' in data and data['response']: # Verifica se a resposta da API contém dados
            all_data.extend(data['response']) # Adiciona os dados da página atual à lista de todos os dados
            if 'page' in params: # Verifica se a requisição foi feita com paginação
                params['page'] += 1 # Incrementa o número da página para a próxima requisição
            else:
                break # Sai do loop se não houver paginação
        else:
            break # Sai do loop se não houver dados na resposta da API
    return all_data # Retorna a lista de todos os dados da API

# Função para buscar dados de endpoints iteráveis
def fetch_iterable_data(main_data, iterable_endpoint):
    """
    Busca dados de um endpoint iterável para cada item em um DataFrame principal.

    Args:
        main_data (pd.DataFrame): O DataFrame principal contendo dados de um endpoint anterior.
        iterable_endpoint (dict): As configurações do endpoint iterável.

    Returns:
        list: Uma lista de dicionários contendo os dados do endpoint iterável para cada item no DataFrame principal.
    """
    all_data = [] # Inicializa uma lista para armazenar todos os dados
    for _, item in main_data.iterrows(): # Itera sobre cada linha do DataFrame principal
        query_params = {key: item[value.replace('.', '__')] for key, value in iterable_endpoint["query_param"].items()} # Cria os parâmetros de consulta para o endpoint iterável com base nos valores da linha atual do DataFrame principal
        params = query_params.copy() # Copia os parâmetros de consulta
        params.update(iterable_endpoint['fixed_params']) # Atualiza os parâmetros de consulta com os parâmetros fixos do endpoint iterável
        data = fetch_data(iterable_endpoint['path'], params, HEADERS) # Busca os dados do endpoint iterável

        iterated_data = [{**query_params, **item} for item in data] # Combina os parâmetros de consulta com os dados da resposta
        all_data.extend(iterated_data) # Adiciona os dados combinados à lista de todos os dados
    return all_data # Retorna a lista de todos os dados

# Função para preparar o DataFrame
def prepare_dataframe(data, fields, nested_fields, repeatable_fields):
    """
    Prepara um DataFrame a partir de uma lista de dicionários, extraindo campos aninhados e repetíveis.

    Args:
        data (list): A lista de dicionários contendo os dados.
        fields (list): Uma lista de nomes de campos a serem incluídos no DataFrame.
        nested_fields (list): Uma lista de nomes de campos aninhados a serem extraídos.
        repeatable_fields (list): Uma lista de nomes de campos repetíveis a serem extraídos.

    Returns:
        pd.DataFrame: O DataFrame preparado.
    """
    for item in data: # Itera sobre cada dicionário na lista de dados
        for field in repeatable_fields: # Itera sobre cada campo repetível
            if field in item: # Verifica se o campo repetível existe no dicionário atual
                for sub_item in item[field]: # Itera sobre cada item na lista do campo repetível
                    for key in sub_item.keys(): # Itera sobre cada chave no item da lista do campo repetível
                        sub_item[key] = str(sub_item[key]) # Converte o valor da chave para string

    meta_fields = fields + repeatable_fields # Cria uma lista de campos que serão usados como metadados no DataFrame

    df = pd.json_normalize(data, sep='__', meta=meta_fields) # Cria um DataFrame a partir da lista de dicionários, usando "__" como separador para campos aninhados

    all_fields = fields + nested_fields + repeatable_fields # Cria uma lista de todos os campos desejados no DataFrame
    column_names = [col.replace('.', '__') for col in all_fields] # Substitui "." por "__" nos nomes dos campos para corresponder ao formato do DataFrame

    existing_columns = [col for col in column_names if col in df.columns] # Cria uma lista de colunas que existem tanto na lista de campos desejados quanto no DataFrame

    df = df[existing_columns] # Seleciona apenas as colunas existentes no DataFrame
    return df # Retorna o DataFrame

# Função para criar um conjunto de dados no BigQuery caso ele não exista
def create_dataset_if_not_exists(client, dataset_name):
    """
    Cria um conjunto de dados no BigQuery se ele não existir.

    Args:
        client: O cliente do BigQuery.
        dataset_name (str): O nome do conjunto de dados.
    """
    try:
        client.get_dataset(dataset_name) # Tenta obter o conjunto de dados
    except NotFound:
        print(f"Dataset {dataset_name} not found. Creating...") # Imprime uma mensagem se o conjunto de dados não for encontrado
        client.create_dataset(dataset_name) # Cria o conjunto de dados

# Função para obter o esquema de uma tabela no BigQuery
def get_table_schema(client, dataset_name, table_name):
    """
    Obtém o esquema de uma tabela no BigQuery.

    Args:
        client: O cliente do BigQuery.
        dataset_name (str): O nome do conjunto de dados.
        table_name (str): O nome da tabela.

    Returns:
        google.cloud.bigquery.schema.SchemaField: O esquema da tabela.
    """
    table_ref = client.dataset(dataset_name).table(table_name) # Cria uma referência à tabela
    table = client.get_table(table_ref) # Obtém a tabela
    return table.schema # Retorna o esquema da tabela

# Função para carregar dados no BigQuery
def load_data_to_bigquery(client, dataset_name, table_name, data, write_disposition, partition_column=None, clustering_fields=None):
    """
    Carrega dados em uma tabela no BigQuery.

    Args:
        client: O cliente do BigQuery.
        dataset_name (str): O nome do conjunto de dados.
        table_name (str): O nome da tabela.
        data (pd.DataFrame): O DataFrame contendo os dados a serem carregados.
        write_disposition (str): A disposição de gravação para o trabalho de carregamento.
        partition_column (str, optional): O nome da coluna de partição.
        clustering_fields (list, optional): Uma lista de nomes de campos para agrupamento em cluster.
    """
    table_ref = client.dataset(dataset_name).table(table_name) # Cria uma referência à tabela

    try:
        table = client.get_table(table_ref) # Tenta obter a tabela
        schema = table.schema # Obtém o esquema da tabela
    except NotFound:
        schema = None # Define o esquema como None se a tabela não for encontrada
        print(f"Schema para a tabela {DATASET_NAME}.{table_name} não encontrada.") # Imprime uma mensagem se o esquema não for encontrado

    job_config = bigquery.LoadJobConfig(
        write_disposition=write_disposition, # Define a disposição de gravação
    )
    if schema:
        job_config.schema = schema # Define o esquema se ele for encontrado
    else:
        job_config.autodetect = True # Habilita a detecção automática de esquema se o esquema não for encontrado

    json_str = data.to_json(orient='records', date_format='iso') # Converte o DataFrame para JSON
    job = client.load_table_from_json(json.loads(json_str), table_ref, job_config=job_config) # Cria um trabalho de carregamento para carregar os dados do JSON na tabela
    job.result() # Aguarda a conclusão do trabalho de carregamento
    print(f"Foram carregadas {len(data)} linhas em {dataset_name}.{table_name}") # Imprime uma mensagem com o número de linhas carregadas

# Função para obter a última data de atualização da tabela de controle
def get_last_update(client, now):
    """
    Obtém a última data de atualização da tabela de controle.

    Args:
        client: O cliente do BigQuery.
        now (datetime): A data e hora atuais.

    Returns:
        datetime: A última data de atualização.
    """
    table_name = f'{PROJECT_ID}.{DATASET_NAME}.updates' # Define o nome da tabela de controle

    try:
        client.get_table(table_name) # Tenta obter a tabela de controle
    except NotFound:
        print(f"Table {table_name} not found. Initializing with FULL_LOAD_DATE: {FULL_LOAD_DATE}") # Imprime uma mensagem se a tabela de controle não for encontrada
        full_load_date = datetime.strptime(FULL_LOAD_DATE, '%Y-%m-%d') # Converte a data de carregamento completo para datetime
        initial_data = [{'updated_at': full_load_date}] # Cria uma lista de dicionários com a data de carregamento completo
        load_data_to_bigquery(client, DATASET_NAME, 'updates', pd.DataFrame(initial_data), 'WRITE_TRUNCATE') # Carrega a data de carregamento completo na tabela de controle
        return full_load_date # Retorna a data de carregamento completo

    query = f'SELECT MAX(updated_at) AS last_update FROM `{table_name}`' # Define a consulta para obter a última data de atualização

    try:
        query_job = client.query(query) # Executa a consulta
        results = query_job.result() # Obtém os resultados da consulta

        for row in results: # Itera sobre os resultados da consulta
            print(f"Ultima data de atualização: {row.last_update}") # Imprime a última data de atualização
            return row.last_update if row.last_update else now - timedelta(days=1) # Retorna a última data de atualização ou a data atual menos 1 dia se a última data de atualização for None
    except Exception as e:
        print(f"An error occurred while executing the query: {e}") # Imprime uma mensagem se ocorrer um erro ao executar a consulta
        return None # Retorna None se ocorrer um erro

# Função para registrar a data e hora da atualização
def log_update(client,now):
    """
    Registra a data e hora da atualização na tabela de controle.

    Args:
        client: O cliente do BigQuery.
        now (datetime): A data e hora atuais.
    """
    table_name = f'{PROJECT_ID}.{DATASET_NAME}.updates' # Define o nome da tabela de controle
    updated_at = [{'updated_at': now}] # Cria uma lista de dicionários com a data e hora atuais
    load_data_to_bigquery(client, DATASET_NAME, 'updates', pd.DataFrame(updated_at), 'WRITE_APPEND') # Carrega a data e hora atuais na tabela de controle

# Função para atualizar os parâmetros de data para a carga incremental
def incremental_params_update(table, incremental_load_params, params, last_update, now):
    """
    Atualiza os parâmetros de data para a carga incremental.

    Args:
        table (str): O nome da tabela.
        incremental_load_params (dict): O dicionário de parâmetros de carga incremental.
        params (dict): O dicionário de parâmetros da API.
        last_update (datetime): A última data de atualização.
        now (datetime): A data e hora atuais.

    Returns:
        dict: O dicionário de parâmetros da API atualizado.
    """
    if table == "past_fixtures": # Verifica se a tabela é "past_fixtures"
        params.update({ # Atualiza os parâmetros da API com as datas de início e fim da carga incremental
            'from': '2022-01-01',  # Ajuste para o início do ano de 2022
            'to': '2022-12-31'     # Ajuste para o final do ano de 2022
        })
    elif table == "future_fixtures": # Verifica se a tabela é "future_fixtures"
        params.update({ # Atualiza os parâmetros da API com a data de início da carga incremental
            'from': '2022-01-01',  # Ajuste para o início do ano de 2022
            'to': '2022-12-31'     # Ajuste para o final do ano de 2022
        })
    else:
        params # Retorna os parâmetros da API sem alterações se a tabela não for "past_fixtures" nem "future_fixtures"
    return params # Retorna os parâmetros da API atualizados

In [None]:
# Função principal que orquestra a extração, transformação e carregamento dos dados
def main(request=None):
    """
    Função principal que orquestra a extração, transformação e carregamento dos dados.

    Args:
        request: A requisição HTTP.

    Returns:
        tuple: Uma tupla contendo a mensagem de sucesso e o código de status HTTP.
    """
    client = bigquery.Client(project=PROJECT_ID) # Cria um cliente do BigQuery
    create_dataset_if_not_exists(client, DATASET_NAME) # Cria o conjunto de dados se ele não existir
    now = datetime.now() # Obtém a data e hora atuais
    # now = datetime(2022, 6, 1, 0, 0) # Para testes, defina uma data específica
    last_update = get_last_update(client, now) # Obtém a última data de atualização
    main_endpoints = endpoints.copy() # Faz uma cópia da lista de endpoints principais
    main_iterable_endpoints = iterable_endpoints.copy() # Faz uma cópia do dicionário de endpoints iteráveis

    for endpoint in main_endpoints: # Itera sobre cada endpoint principal
        params = endpoint.get('params', {}) # Obtém os parâmetros do endpoint
        table = endpoint.get('table') # Obtém o nome da tabela
        incremental_load_params = endpoint.get('incremental_load_params') # Obtém os parâmetros de carga incremental
        path = endpoint.get('path') # Obtém o caminho do endpoint
        fields = endpoint.get('fields') # Obtém os campos a serem selecionados
        nested_fields = endpoint.get('nested_fields') # Obtém os campos aninhados
        repeatable_fields = endpoint.get('repeatable_fields') # Obtém os campos repetíveis
        write_disposition = endpoint.get('write_disposition') # Obtém a disposição de gravação
        quality_control = endpoint.get('quality_control', False) # Obtém a flag de controle de qualidade

        if incremental_load_params: # Verifica se há parâmetros de carga incremental
            params = incremental_params_update(table, incremental_load_params, params, last_update, now) # Atualiza os parâmetros da API com as datas de início e fim da carga incremental

        raw_data = fetch_data(path, params, HEADERS) # Busca os dados brutos da API

        if not raw_data:
            print(f"Endpoint {table} não retornou dados.")  # Diagnóstico do porquê a tabela não está sendo criada
        else:
            print(f"Dados obtidos para o endpoint {table}.")

        if raw_data: # Verifica se há dados brutos
            prepared_data = prepare_dataframe(raw_data, fields, nested_fields, repeatable_fields) # Prepara o DataFrame
            load_data_to_bigquery(client, DATASET_NAME, table, prepared_data, write_disposition) # Carrega os dados no BigQuery

            if table in main_iterable_endpoints: # Verifica se há endpoints iteráveis para o endpoint principal atual
                for iterable_endpoint in main_iterable_endpoints[table]: # Itera sobre cada endpoint iterável
                    iterable_table = iterable_endpoint.get('table') # Obtém o nome da tabela do endpoint iterável
                    iterable_fields = iterable_endpoint.get('fields') # Obtém os campos a serem selecionados do endpoint iterável
                    iterable_nested_fields = iterable_endpoint.get('nested_fields') # Obtém os campos aninhados do endpoint iterável
                    iterable_repeatable_fields = iterable_endpoint.get('repeatable_fields') # Obtém os campos repetíveis do endpoint iterável
                    iterable_write_disposition = iterable_endpoint.get('write_disposition') # Obtém a disposição de gravação do endpoint iterável

                    print(f"Buscando os dados iteráveis para o endpoint: {iterable_table}") # Imprime uma mensagem informando que está buscando dados iteráveis
                    detailed_data = fetch_iterable_data(prepared_data, iterable_endpoint) # Busca os dados do endpoint iterável

                    if detailed_data: # Verifica se há dados do endpoint iterável
                        prepared_iterable_data = prepare_dataframe(detailed_data, iterable_fields, iterable_nested_fields, iterable_repeatable_fields) # Prepara o DataFrame do endpoint iterável
                        load_data_to_bigquery(client, DATASET_NAME, iterable_table, prepared_iterable_data, iterable_write_disposition) # Carrega os dados do endpoint iterável no BigQuery
                    else:
                        print(f"Não foram encontrados dados para o endpoint: {iterable_table}") # Imprime uma mensagem se não houver dados do endpoint iterável
        else:
            print(f"Não foram encontrados dados para o endpoint: {table}") # Imprime uma mensagem se não houver dados brutos

    log_update(client, now) # Registra a data e hora da atualização

    return "success", 200 # Retorna uma mensagem de sucesso e o código de status HTTP 200

main() # Executa a função principal se o script for executado como um programa principal

Ultima data de atualização: 2024-11-08 16:53:36.929000+00:00
{"get":"fixtures","parameters":{"league":"2","season":"2022","from":"2022-01-01","to":"2022-12-31"},"errors":[],"results":185,"paging":{"current":1,"total":1},"response":[{"fixture":{"id":865411,"referee":"R. Saggi","timezone":"UTC","date":"2022-06-21T13:00:00+00:00","timestamp":1655816400,"periods":{"first":1655816400,"second":1655820000},"venue":{"id":831,"name":"V\u00edkingsv\u00f6llur","city":"Reykjav\u00edk"},"status":{"long":"Match Finished","short":"FT","elapsed":90,"extra":null}},"league":{"id":2,"name":"UEFA Champions League","country":"World","logo":"https:\/\/media.api-sports.io\/football\/leagues\/2.png","flag":null,"season":2022,"round":"Preliminary Round"},"teams":{"home":{"id":2249,"name":"La Fiorita","logo":"https:\/\/media.api-sports.io\/football\/teams\/2249.png","winner":false},"away":{"id":3342,"name":"Inter Club d'Escaldes","logo":"https:\/\/media.api-sports.io\/football\/teams\/3342.png","winner":true}},

('success', 200)