# Desgarga de datos

Los datos utilizados en este estuido es bastante elaborado.
Esto se debe a que la fuente de datos solo proporciona "snapshots" mensuales de los datos,
por lo que en primer lugar habrá que descargarse los datos de cada mes.
Además, la cantidad de datos es bastante gigantesca,
por lo que se hará preprocesamiento y aggregación de datos
de antemano para reducir el tamaño de los datos a la hora de utilizarlos en R.

Las únicas librerias necesarias para este notebook son `aiohttp`, `pandas` y `numpy`.

Si solo se quiere obtener todos los datos necesarios, basta con ejecutar todas las celdas de este notebook en orden.

In [None]:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import io
import os
import re

%autoawait asyncio

import aiohttp
import pandas as pd
import numpy as np
import zipfile as zf

data_root = './data/'
os.makedirs(data_root, exist_ok=True)

## descarga de la fuente

La fuente de datos es http://ratings.fide.com/download_lists.phtml. Los datos son sobre todos los jugadores de ajedrez que estan de alta en la FIDE (Federación Internacional de Ajedrez).
La FIDE es la organización que se encarga de organizar los torneos de ajedrez a nivel mundial, y es la que otorga los títulos de ajedrez (GM, IM, FM, etc).

Es notable que dado que hay 11 años de "snapshots", habra que descargarse y descomprimir aproximadamente 130 archivos. Esto en total son 2GB de datos.

Dado que cada descarga y descompensión puede tardar varios segundos, se hará uso de la programación asíncrona para acelerar el proceso.

In [None]:
# Cache de archivos descargados
months = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']

def time_range(start: tuple[int, str], end: tuple[int, str]):
	start_year, start_month = start
	end_year, end_month = end
	start_idx = months.index(start_month)
	end_idx = months.index(end_month)
	if start_year == end_year:
		for m in months[start_idx:end_idx + 1]:
			yield (start_year, m)
		return
	for m in months[start_idx:]:
		yield (start_year, m)
	for y in range(start_year + 1, end_year):
		for m in months:
			yield (y, m)
	for m in months[:end_idx + 1]:
		yield (end_year, m)

study_domain = set(time_range(start=(12, 'aug'), end=(23, 'jun')))

In [None]:
raw_csv_regex = re.compile(r'^(?P<month>\w{3})_(?P<year>\d{2})\.csv$')

all_files = os.listdir(data_root)

raw_csv_files = [
	(m.group(0), (int(m.group('year')), m.group('month')))
	for m in map(raw_csv_regex.match, all_files)
	if m
]

len(raw_csv_files)

In [None]:
existing_years = {y for _, y in raw_csv_files}
missing_years = study_domain - existing_years

print(f'Faltan {len(missing_years)} archivos por descargar')

In [None]:
attribute_names = ['fideid', 'name', 'country', 'sex', 'title', 'w_title', 'o_title', 'rating', 'games', 'k', 'birthday', 'flag']

# Los archivos zip pueden ser bastante grandes, por lo que su descompresión puede tomar un tiempo considerable.
# Para aliviar esto, se realiza la descompresión en un proceso separado, para que sea en paralelo saltandose el GIL.
def read_zip(bin: bytes, month, year) -> pd.DataFrame:
    filelike = io.BytesIO(bin)
    with zf.ZipFile(filelike) as zip_file:
        with zip_file.open(zip_file.namelist()[0]) as xml_file:
            print(f"Starting for {month}_{year}")
            df = pd.read_xml(xml_file, compression="zip")
            print(f"Finished for {month}_{year}")
            df.to_csv(os.path.join(data_root, f'{month}_{year}.csv'), index=False, columns=attribute_names)

# Se quiere hacer cada descarga en paralelo, por lo que se define un procedimiento asincrono que descarga un archivo, y entonces se llama a este procedimiento en paralelo.
async def download(session: aiohttp.ClientSession, executor: ProcessPoolExecutor, month, year):
    async with session.get(f'http://ratings.fide.com/download/standard_{month}{year}frl_xml.zip') as response:
        if response.status != 200:
            print(f'Error downloading {month}{year} (status: {response.status})')
            return

        zip_bin: bytes = await response.read()
        event_loop = asyncio.get_running_loop()
        await event_loop.run_in_executor(executor, read_zip, zip_bin, month, year)

with ProcessPoolExecutor(max_workers=5) as executor:
    async with aiohttp.ClientSession() as session:
        all_futs = [download(session, executor, month, year) for year, month in missing_years]
        print(f'Downloading {len(all_futs)} files')
        grouped = [
            all_futs[i:i + 5]
            for i in range(0, len(all_futs), 5)
        ]
        for g in grouped:
            await asyncio.gather(*g)

# Aggregación de datos

Ya que 2 GB de datos no son fáciles de manejar, se hará una agregación de datos para reducir el tamaño de los datos.

La aggregación principal que se realizará es agrupar por país y año y mes, en vez de por jugador. Esto debería reducir el tamaño de los datos por varias ordenes de magnitud.

In [None]:
files = os.listdir(data_root)
raw_csv_files = [
	(m.group(0), (int(m.group('year')), m.group('month')))
	for m in map(raw_csv_regex.match, files)
	if m
]

len(raw_csv_files)

In [None]:
from collections import defaultdict
from datetime import datetime as dt

def to_datetime(p):
	return dt(2000 + p[0], months.index(p[1]) + 1, 1)


In [None]:
from math import comb
agg_dict = defaultdict(list)

l = list()

for file, period in raw_csv_files:
	file_path = os.path.join(data_root, file)
	date = to_datetime(period)
	df = pd.read_csv(file_path)
	df = df.loc[lambda d: d['rating'] > 0, :]
	df = df.loc[lambda d: d['flag'].isna() | ~d['flag'].str.contains('i').replace(np.nan, False), :]

	total_mean = df['rating'].mean()
	lambda_reciprocal = total_mean / 400 * np.log(10)

	for country, group in df.groupby('country'):


		agg_dict['country'].append(country)
		agg_dict['date'].append(date)

		# Agregados de número de jugadores
		agg_dict['count'].append(len(group))

		# Agregados de titulo
		title_gm = sum(group['title'] == 'GM')
		title_wgm = sum(group['title'] == 'WGM')
		title_im = sum(group['title'] == 'IM')
		title_wim = sum(group['title'] == 'WIM')
		title_fm = sum(group['title'] == 'FM')
		title_wfm = sum(group['title'] == 'WFM')
		title_cm = sum(group['title'] == 'CM')
		title_wcm = sum(group['title'] == 'WCM')

		total_title = title_gm + title_wgm + title_im + title_wim + title_fm + title_wfm + title_cm + title_wcm

		agg_dict['title_total'].append(total_title)


		# Agregados de rating
		agg_dict['mean'].append(group['rating'].mean())

		ratings = group['rating']
		top_100_mean = ratings.sort_values(ascending=False).head(100).mean()
		agg_dict['top_100_mean'].append(top_100_mean)

		k = len(group)
		expected_best = lambda_reciprocal * sum([1/i for i in range(1, k+1)])
		normalized_top_100 = top_100_mean - expected_best
		agg_dict['normalized_top_100'].append(normalized_top_100)
		l.append({
			'country': country,
			'date': int(date.timestamp()),
			'count': len(group),
			'title_total': total_title,
			'mean': group['rating'].mean(),
			'top_100_mean': top_100_mean,
			'normalized_top_100': normalized_top_100,
		})

agg_df = pd.DataFrame(agg_dict)

agg_df.to_csv(os.path.join(data_root, 'agg.csv'), index=False)

with open("agg.json", "w") as f:
	json.dump(l, f)

# OECD

Los datos vienen de https://stats.oecd.org/index.aspx?DataSetCode=HSL, son descargados y este script sirve para convertirlos a un formato más manejable.

In [None]:
df = pd.read_csv("well_being.csv")
df.head()

In [None]:
from collections import defaultdict
d = defaultdict(list)

indicators = [
	'social_support',
	'feeling_safe_at_night',
	'employment_rate',
	'life_expectancy_at_birth',
	'earnings',
	'housing_affordability'
]

g = df.groupby(['LOCATION', 'TIME', 'Indicator'])
for (country, year, indicator), group in g:
	indicator_normalized = indicator.replace(' ', '_').lower()
	if indicator_normalized not in indicators:
		continue
	d["country"].append(country)
	d["year"].append(year)
	d["variable"].append(indicator)
	d["value"].append(group['Value'].mean())

df_wb = pd.DataFrame(d).reset_index(drop=True)

df_wb.to_csv(os.path.join(data_root, 'well_being.csv'), index=False)

In [None]:
import json

l = []
for (country, year, indicator), group in g:
	l.append({
		'country': country,
		'year': int(year),
		'variable': indicator,
		'value': float(group['Value'].mean())
	})

with open("well_being.json", 'w') as f:
	json.dump(l, f)