In [1]:
import glob
import pandas as pd

data_files = glob.glob("chess_ratings/*.csv")
all_dfs = [pd.read_csv(file, sep='|') for file in data_files]

  all_dfs = [pd.read_csv(file, sep='|') for file in data_files]


In [2]:
data = pd.concat(all_dfs)
del all_dfs
data["Year / month"] = [f"{y} / {m if (len(str(m)) == 2) else ('0' + str(m))}" for y, m in zip(data["Year"], data["Mon"])]

In [3]:
data

Unnamed: 0,ID Number,Name,Fed,Sex,Tit,WTit,OTit,FOA,Rat,Gms,K,B-day,Flag,Year,Mon,Year / month
0,644498,,FRA,M,,,,,2008,1,20,0,,2017,12,2017 / 12
1,20504578,,MAW,M,,,,,1617,0,40,0,,2017,12,2017 / 12
2,35077023,A Chakravarthy,IND,M,,,,,1151,0,40,1986,i,2017,12,2017 / 12
3,10207538,"A E M, Doshtagir",BAN,M,,,,,1840,0,40,1974,i,2017,12,2017 / 12
4,10206612,"A K M, Sourab",BAN,M,,,,,1708,4,40,0,,2017,12,2017 / 12
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
352221,21836060,"Zyto, Karol",POL,M,,,,,1071,0,40,2006,,2019,12,2019 / 12
352222,34190632,"Zyubin, Sergey",RUS,M,,,,,1884,0,40,1975,i,2019,12,2019 / 12
352223,24232602,"Zyuzev, Pavel",RUS,M,,,,,1852,0,40,1993,i,2019,12,2019 / 12
352224,1189980,"Zywert, Kacper",POL,M,,,,,1565,0,20,1999,i,2019,12,2019 / 12


In [4]:
# unify flag columns
data.loc[data['Flag'] == 'wi', 'Flag'] = 'i'
data.loc[data['Flag'] == 'w', 'Flag'] = ''

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "20g") \
    .appName('chess-app') \
    .getOrCreate()

In [6]:
string_cols = ['Name', 'Tit', 'WTit', 'OTit', 'FOA', 'Flag']

data[string_cols] = data[string_cols].fillna('')

In [7]:
#smol = data[data["ID Number"] > 100000000]
#smol = data.drop(columns=['Name', 'Fed', 'Sex', 'B-day', 'Year / month'])
smol = data

df = spark.createDataFrame(smol)
df

DataFrame[ID Number: bigint, Name: string, Fed: string, Sex: string, Tit: string, WTit: string, OTit: string, FOA: string, Rat: bigint, Gms: bigint, K: bigint, B-day: bigint, Flag: string, Year: bigint, Mon: bigint, Year / month: string]

In [8]:
df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+---------+--------------------+---+---+---+----+----+---+----+---+---+-----+----+----+---+------------+
|ID Number|                Name|Fed|Sex|Tit|WTit|OTit|FOA| Rat|Gms|  K|B-day|Flag|Year|Mon|Year / month|
+---------+--------------------+---+---+---+----+----+---+----+---+---+-----+----+----+---+------------+
|332244031|Acevedoarango, Ed...|COL|  M|   |    |    |   |1477|  0| 40| 1968|    |2017| 12|   2017 / 12|
|351043662|    Ackermann, Bernd|FRA|  M|   |    |    |   |1505|  0| 40| 1964|    |2017| 12|   2017 / 12|
|373200321|     Albonico, Carlo|ITA|  M|   |    |    |   |1622|  0| 40| 1957|   i|2017| 12|   2017 / 12|
|324226816|Alves, Marcos Vil...|BRA|  M|   |    |    |   |1607|  0| 40| 1987|    |2017| 12|   2017 / 12|
|383266891|        Alzaid, Zaid|KUW|  M|   |    |    |   |1308|  5| 20| 1972|    |2017| 12|   2017 / 12|
+---------+--------------------+---+---+---+----+----+---+----+---+---+-----+----+----+---+------------+
only showing top 5 rows



                                                                                

In [8]:
import numpy as np


def get_mean(df):
    return df.mapValues(lambda v: (v, 1)) \
             .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
             .mapValues(lambda v: v[0] / v[1]) \


def get_sum(df):
    return df.reduceByKey(lambda a, b: a + b)


def get_string(df):    
    return df.reduceByKey(lambda a, b: (a if len(a) > len(b) else b))


def zip_apply(f, x, y):
    if isinstance(x, list):
        return [f(a, b) for a, b in zip(x, y)]
    return f(x, y)


def reduce_by_months(df, what, agg_func, agg_func_name='', n_months=1):
    idmap = {c: i for i, c in enumerate(df.columns)}
    name = f"{what}:{agg_func_name}"
        
    if n_months > 12:
        raise ValueError("Invalid months: ", n_months)
    elif n_months == 12:
        key_func = lambda x: (x[idmap['ID Number']], x[idmap['Year']])
        out_key_func = lambda x: (x[0][0], f"{name}/{x[0][1]}//yearly", x[1])
    else:
        key_func = lambda x: (x[idmap['ID Number']], x[idmap['Year']], x[idmap['Mon']] // n_months)
        out_key_func = lambda x: (x[0][0], f"{name}/{x[0][1]}/{int(x[0][2] * n_months)}/{n_months}", x[1])
    
    if isinstance(what, list):
        res = df.rdd.map(lambda x: (key_func(x), tuple([x[idmap[w]] for w in what])))
    else:
        res = df.rdd.map(lambda x: (key_func(x), x[idmap[what]]))

    res = agg_func(res)
    res = res.map(out_key_func) \
             .toDF(['ID Number', 'Colname', name])
    
    return res

In [26]:
red = reduce_by_months(df, 'Rat', get_mean, agg_func_name='mean', n_months=12) \
        .groupBy('ID Number') \
        .pivot('Colname') \
        .max('Rat:mean')

red.show(5)

+---------+---------------------+---------------------+---------------------+
|ID Number|Rat:mean/2017//yearly|Rat:mean/2018//yearly|Rat:mean/2019//yearly|
+---------+---------------------+---------------------+---------------------+
|345279188|               1333.0|   1357.6666666666667|   1373.0833333333333|
|327137030|               1996.0|               1996.0|               1996.0|
|651017121|                 null|                 null|               1341.0|
|551023512|                 null|               1187.0|               1187.0|
|551026147|                 null|               1489.0|   1545.6666666666667|
+---------+---------------------+---------------------+---------------------+
only showing top 5 rows



In [11]:
red = reduce_by_months(df, 'Gms', get_sum, agg_func_name='sum', n_months=6) \
        .groupBy('ID Number') \
        .pivot('Colname') \
        .max('Gms:sum')

red.show(5)

+---------+---------+--------+---------+--------+--------+---------+--------+
|ID Number|2017/12/6|2018/0/6|2018/12/6|2018/6/6|2019/0/6|2019/12/6|2019/6/6|
+---------+---------+--------+---------+--------+--------+---------+--------+
|551023512|     null|       8|        0|       0|       0|        0|       0|
|327137030|        0|       0|        0|       0|       0|        0|       0|
|551061635|     null|    null|     null|    null|      10|        0|      10|
|551026147|     null|    null|        0|       5|       6|        0|       0|
|345279188|        0|       3|        0|      12|       4|        0|      13|
+---------+---------+--------+---------+--------+--------+---------+--------+
only showing top 5 rows



In [17]:
red = reduce_by_months(df, 'Gms', get_sum, agg_func_name='sum', n_months=12) \
        .groupBy('ID Number') \
        .pivot('Colname') \
        .max('Gms:sum')

red.show(5)

+---------+--------------------+--------------------+--------------------+
|ID Number|Gms:sum/2017//yearly|Gms:sum/2018//yearly|Gms:sum/2019//yearly|
+---------+--------------------+--------------------+--------------------+
|345279188|                   0|                  15|                  17|
|327137030|                   0|                   0|                   0|
|551026147|                null|                   5|                   6|
|551023512|                null|                   8|                   0|
|551061635|                null|                null|                  20|
+---------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [30]:
red = reduce_by_months(df, 'Tit', get_string, agg_func_name='str', n_months=12) \
        .groupBy('ID Number') \
        .pivot('Colname').count()

red.show(5)

+---------+--------------------+--------------------+--------------------+
|ID Number|Tit:str/2017//yearly|Tit:str/2018//yearly|Tit:str/2019//yearly|
+---------+--------------------+--------------------+--------------------+
|345279188|                   1|                   1|                   1|
|327137030|                   1|                   1|                   1|
|651017121|                null|                null|                   1|
|551026147|                null|                   1|                   1|
|551023512|                null|                   1|                   1|
+---------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [20]:
df.drop_duplicates().show(5)

+---------+------------------+---+---+---+----+----+---+----+---+---+-----+----+----+---+------------+
|ID Number|              Name|Fed|Sex|Tit|WTit|OTit|FOA| Rat|Gms|  K|B-day|Flag|Year|Mon|Year / month|
+---------+------------------+---+---+---+----+----+---+----+---+---+-----+----+----+---+------------+
|137122537|   Tonoyan, Robert|FID|  M|   |    |    |   |1880|  2| 20| 1994|    |2018|  4|   2018 / 04|
|551012243|Jourd`hui, Trystan|FRA|  M|   |    |    |   |1184|  0| 40| 2004|    |2018|  5|   2018 / 05|
|366196523|  Kulkarni, Rakesh|IND|  M|   |    |    |AIM|1340|  0| 40| 1984|    |2018|  5|   2018 / 05|
|551000067|  Pironti, Nicolas|FRA|  M|   |    |    |   |1353|  1| 40| 1958|    |2018|  5|   2018 / 05|
|551013886|    Poirson, Simon|FRA|  M|   |    |    |   |1333|  0| 40| 2003|    |2018|  5|   2018 / 05|
+---------+------------------+---+---+---+----+----+---+----+---+---+-----+----+----+---+------------+
only showing top 5 rows



In [9]:
# TODO move this to utils

def parse_column(c):
    year = c[:4]
    if 'yearly' in c:
        return year, None
    
    mon = c.split('/')[1]
    return year, mon


def drop_conditions(c, last_year=2019, last_month=None):
    c = parse_column(c)
    if last_month is None and c[0] >= last_year:
        return True
    
    if last_month is None:
        return False
    
    return c[0] >= last_year and c[1] >= last_month
    

def x_y_split(df, last_year=2019, last_month=None):
    drop_cols = [c for c in df.columns if '201' in c and drop_conditions(c)]
    
    for c in drop_cols:
        c = parse_column(c)
        
        if c[0] != last_year:
            continue
        if last_month is None or c[1] != last_month:
            continue

        y_col = c
        
    x_data = df.drop(columns=drop_cols)
    y_data = df[y_col]
    
    return x_data, y_data

In [10]:
name_to_func = {
    'sum': get_sum,
    'mean': get_mean,
    'str': get_string
}

def get_dataset(df, select_data, n_months=12):
    base_df = df.drop_duplicates().select('ID Number', 'Sex', 'K', 'B-day', 'Flag')
    
    for what, func in select_data:
        red = reduce_by_months(df, what, name_to_func[func], agg_func_name=func, n_months=n_months) \
            .groupBy('ID Number') \
            .pivot('Colname')
        
        red = red.count() if func == 'str' else red.max(f'{what}:{func}')
        print(f"Counts before join: base_df {base_df.count()}, reduced {red.count()}")
        base_df = base_df.join(red, "ID Number")
        print(f"Counts after join: base_df {base_df.count()}")
        
        red.unpersist()
        del red
        
    return base_df

In [11]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pdf = get_dataset(df, [('Gms', 'sum'), ('Tit', 'str')])

Counts before join: base_df 8081358, reduced 354525
Counts after join: base_df 8081358
Counts before join: base_df 8081358, reduced 354525
Counts after join: base_df 8081358


In [12]:
pdf.show(5)
pdf = pdf.toPandas()

+---------+---+---+-----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|ID Number|Sex|  K|B-day|Flag|Gms:sum/2017//yearly|Gms:sum/2018//yearly|Gms:sum/2019//yearly|Tit:str/2017//yearly|Tit:str/2018//yearly|Tit:str/2019//yearly|
+---------+---+---+-----+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   100064|  M| 40| 2000|   i|                   0|                   0|                   0|                   1|                   1|                   1|
|   100064|  M| 40| 2000|   i|                   0|                   0|                   0|                   1|                   1|                   1|
|   100064|  M| 40| 2000|   i|                   0|                   0|                   0|                   1|                   1|                   1|
|   100064|  M| 40| 2000|   i|                   0|       

In [13]:
pdf

Unnamed: 0,ID Number,Sex,K,B-day,Flag,Gms:sum/2017//yearly,Gms:sum/2018//yearly,Gms:sum/2019//yearly,Tit:str/2017//yearly,Tit:str/2018//yearly,Tit:str/2019//yearly
0,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
1,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
2,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
3,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
4,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...
8081353,651006324,M,40,1959,,,,7.0,,,1.0
8081354,651006324,M,40,1959,,,,7.0,,,1.0
8081355,651006324,M,40,1959,,,,7.0,,,1.0
8081356,651019221,M,40,1948,,,,7.0,,,1.0


In [29]:
spark.stop()

In [31]:
pdf

Unnamed: 0,ID Number,Sex,K,B-day,Flag,Gms:sum/2017//yearly,Gms:sum/2018//yearly,Gms:sum/2019//yearly,Tit:str/2017//yearly,Tit:str/2018//yearly,Tit:str/2019//yearly
0,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
1,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
2,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
3,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
4,100064,M,40,2000,i,0.0,0.0,0.0,1.0,1.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...
8081353,651006324,M,40,1959,,,,7.0,,,1.0
8081354,651006324,M,40,1959,,,,7.0,,,1.0
8081355,651006324,M,40,1959,,,,7.0,,,1.0
8081356,651019221,M,40,1948,,,,7.0,,,1.0


In [None]:
# TODO
#  - odeber sparku mem
#  - z B-day udelej vek/vekovy skupiny
#  - inactivity flag jeste udelat
#  - fce co udela Gms,... proste vsecky featury per n months
#  - ML part