In [22]:
import pathlib
from typing import List
import json
import gzip
import argparse

import aiohttp
import asyncio    
import logging
import logging.config
import yaml
import os

In [18]:
def read_config(config_file="config.yml"):
	"""Read project configuration from a yaml file.

	Args:
			config_file (str, optional): Path to the config file. Defaults to "config.yml".

	Returns:
			Dict: The parsed config in a python dict
	"""
	with open(config_file) as config_file:
		config = yaml.load(config_file, yaml.Loader)
		return config


In [19]:
config = read_config("../config.yaml")['acquire']

In [21]:
MARKET_VALUES_API = "https://www.transfermarkt.com/ceapi/marketValueDevelopment/graph/"
TRANSFERS_API = "https://www.transfermarkt.co.uk/ceapi/transferHistory/list/"
USER_AGENT = config['scrapy_config']['USER_AGENT']
USER_AGENT

'transfermarkt-datasets/1.0 (https://github.com/dcaribou/transfermarkt-datasets)'

In [23]:
# get the player ids from the players asset from transfermarkt-scraper source
def get_player_ids(season: int) -> List[int]:
    """Get the player ids from the players asset from transfermarkt-scraper source.

    Returns:
        List[int]: List of player ids
    """

    # set output directory
    output_dir = f"../data/raw/{season}/"
    
    # create the path if it does not exist
    os.makedir(players_asset_path, exist_ok=True, parents=True)
    players_asset_path = os.path.join(output_dir, "players.json.gz")

    # read lines from a zipped file
    with gzip.open(players_asset_path, mode="r") as z:
        players = [json.loads(line) for line in z.readlines()]

    player_ids = [
        int(player["href"].split("/")[-1])
        for player in players
    ]
    logging.info(f"Fetched {len(player_ids)} player ids from {players_asset_path}")

    return player_ids

In [24]:
# helper function to fetch data from API
async def fetch_data(session, url, player_id):
    """Fetch data from the API for a given URL and player ID.

    Args:
        session (aiohttp.ClientSession): The aiohttp session
        url (str): The API URL
        player_id (int): The player ID

    Returns:
        dict: The API response and player ID
    """
    headers = {
        'Content-Type': 'application/json',
        'User-Agent': USER_AGENT
    }

    async with session.get(url=url, headers=headers, ssl=False) as response:
        try:
            json = await response.json()
            return {"response": json, "player_id": player_id}
        except aiohttp.ContentTypeError as e:
            logging.error(f"Failed to fetch data for player {player_id}: {e}")
            return {"response": None, "player_id": player_id}


In [25]:
# for each player id, get the market value data from the API
async def get_market_values(player_ids: List[int]) -> List[dict]:
    """Get the market value data from the API for each player id.

    Args:
        player_ids (List[int]): List of player ids

    Returns:
        List[dict]: List of dicts with market value data
    """

    logging.info(f"Requesting market values for {len(player_ids)} players")

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, MARKET_VALUES_API + str(player_id), player_id) for player_id in player_ids]

        # Use asyncio.gather to execute the tasks concurrently
        responses = await asyncio.gather(*tasks)

    return responses

In [26]:
# for each player id, get the transfer history data from the API
async def get_transfers(player_ids: List[int]) -> List[dict]:
    """Get the transfer history data from the API for each player id.

    Args:
        player_ids (List[int]): List of player ids

    Returns:
        List[dict]: List of dicts with transfer history data
    """

    logging.info(f"Requesting transfer history for {len(player_ids)} players")

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, TRANSFERS_API + str(player_id), player_id) for player_id in player_ids]

        # Use asyncio.gather to execute the tasks concurrently
        responses = await asyncio.gather(*tasks)

    return responses


In [27]:
def persist_data(data: List[dict], path: str) -> None:
    """Persist the data to a file.

    Args:
        data (List[dict]): List of dicts with data to persist
        path (str): Path where to store the data
    """
    with open(path, "w") as f:
        f.writelines(json.dumps(item) + "\n" for item in data)

In [15]:
def run_for_season(season: int) -> None:
    """Run all steps for a given season.

    Args:
        season (int): The season to process
    """
    target_market_values_path = f"data/raw/transfermarkt-api/{season}/market_values.json"
    target_transfers_path = f"data/raw/transfermarkt-api/{season}/transfers.json"

    logging.info(f"Starting player data acquisition for season {season}")

    # create target directories if they do not exist
    pathlib.Path(target_market_values_path).parent.mkdir(parents=True, exist_ok=True)
    pathlib.Path(target_transfers_path).parent.mkdir(parents=True, exist_ok=True)

    # get player IDs for the season
    player_ids = get_player_ids(season)

    # collect market values and transfers for players in SEASON
    market_values = asyncio.run(get_market_values(player_ids))
    transfers = asyncio.run(get_transfers(player_ids))

    # filter out player ids in responses that are not in the original list
    transfers = [item for item in transfers if item["player_id"] in player_ids]

    logging.info(f"Persisting market values and transfers for season {season}")

    # persist market values and transfers to files
    persist_data(market_values, target_market_values_path)
    persist_data(transfers, target_transfers_path)