In [543]:
!hdfs dfs -ls /home/kzaharov-369865/

Found 6 items
-rw-r--r--   3 kzaharov-369865 kzaharov-369865   28004109 2023-06-17 09:30 /home/kzaharov-369865/games.csv
-rw-r--r--   3 kzaharov-369865 kzaharov-369865         12 2023-05-22 08:01 /home/kzaharov-369865/hello-file.txt
-rw-r--r--   3 kzaharov-369865 kzaharov-369865   84908201 2023-06-16 16:35 /home/kzaharov-369865/players.csv
-rw-r--r--   3 kzaharov-369865 kzaharov-369865  360088309 2023-06-07 20:06 /home/kzaharov-369865/players_new.csv
-rw-r--r--   3 kzaharov-369865 kzaharov-369865    2526182 2023-06-16 16:36 /home/kzaharov-369865/teams.csv
-rw-r--r--   3 kzaharov-369865 kzaharov-369865       2595 2023-05-22 11:03 /home/kzaharov-369865/test.csv


In [542]:
# !hdfs dfs -rm -R /home/kzaharov-369865/players_new.csv

23/06/17 09:31:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /home/kzaharov-369865/players1.csv


In [1]:
import os
import socket
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import types as T, functions as F, SparkSession
from pyspark.sql.window import Window
from pyspark import StorageLevel
from jinja2 import Environment, FileSystemLoader
import datetime
import json

LOGIN = "kzaharov-369865"  # Your gateway.st login
APP_NAME = "2"  # Any name for your Spark-app

NORMALIZED_APP_NAME = APP_NAME.replace('/', '_').replace(':', '_').replace(' ', '_').replace('\\', '_')

APPS_TMP_DIR = os.path.join(os.getcwd(), "tmp")
APPS_CONF_DIR = os.path.join(os.getcwd(), "conf")
APPS_LOGS_DIR = os.path.join(os.getcwd(), "logs")
LOG4J_PROP_FILE = os.path.join(APPS_CONF_DIR, "pyspark-log4j-{}.properties".format(NORMALIZED_APP_NAME))
LOG_FILE = os.path.join(APPS_LOGS_DIR, 'pyspark-{}.log'.format(NORMALIZED_APP_NAME))
EXTRA_JAVA_OPTIONS = (
    "-Dlog4j.configuration=file://{} "
    "-Dspark.hadoop.dfs.replication=1 "
    "-Dhttps.protocols=TLSv1.0,TLSv1.1,TLSv1.2,TLSv1.3"
    .format(LOG4J_PROP_FILE)
)

LOCAL_IP = socket.gethostbyname(socket.gethostname())

for directory in [APPS_CONF_DIR, APPS_LOGS_DIR, APPS_TMP_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

env = Environment(loader=FileSystemLoader('/opt'))
template = env.get_template("pyspark_log4j.properties.template")
template.stream(logfile=LOG_FILE).dump(LOG4J_PROP_FILE)


spark = (
    SparkSession
    .builder
    .appName(APP_NAME)
    
    # Master URI/configuration
    .master("k8s://https://10.32.7.103:6443")
    
    .config("spark.driver.host", LOCAL_IP)
    
    # Web-UI port for your Spark-app
    .config("spark.ui.port", "4040")
    .config("spark.driver.bindAddress", "0.0.0.0")
    
    # How many CPU cores allocate to driver process
    .config("spark.driver.cores", "2")
    
    # How many RAM allocate to driver process
    .config("spark.driver.memory", "4g")
    
    # How many executors to create
    .config("spark.executor.instances", "3")
    
    # How many CPU cores allocate to each executor
    .config("spark.executor.cores", '2')
    
    # How many RAM allocate to each executor
    .config("spark.executor.memory", "4g")
    
    # How many extra RAM allocate to each executor pod to handle with JVM overheads
    # Total pod RAM = 'spark.executor.memory' + ('spark.executor.memory' * 'spark.kubernetes.memoryOverheadFactor')
    .config("spark.kubernetes.memoryOverheadFactor", "0.2")
    
    # How many RAM from the pool allocate to store the data
    # Additional info: https://spark.apache.org/docs/latest/tuning.html#memory-management-overview
    .config("spark.memory.fraction", "0.6")
    .config("spark.memory.storageFraction", "0.5")
    
    .config("spark.network.timeout", "180s")
    .config("spark.sql.autoBroadcastJoinThreshold", "-1")
    .config("spark.driver.extraJavaOptions", EXTRA_JAVA_OPTIONS)
    
    # Namespace to create executor pods. You are allowed to create pods only in your own namespace
    .config("spark.kubernetes.namespace", LOGIN)
    
    # Extra labels to your driver/executor pods in Kubernetes
    .config("spark.kubernetes.driver.label.appname", APP_NAME)
    .config("spark.kubernetes.executor.label.appname", APP_NAME)
    
    # Spark executor image
    .config("spark.kubernetes.container.image", f"node03.st:5000/spark-executor:{LOGIN}")

    .config("spark.kubernetes.container.image.pullPolicy", "Always")
    
    # If true - delete completed/failed pods. 
    # If your executors goes down you can set 'false' to check logs and troubleshoot your app.
    .config("spark.kubernetes.executor.deleteOnTermination", "true")
    
    .config("spark.local.dir", "/tmp/spark")
    .getOrCreate()
)

In [12]:
# !hdfs dfs -cat /home/kzaharov-369865/games.zip | gzip -d | hdfs dfs -put - /home/kzaharov-369865/games

In [11]:
# !unzip hdfs dfs -cat /home/kzaharov-369865/games.zip

In [46]:
# spark.stop()

In [2]:
# df = spark.read.load('/home/kzaharov-369865/players_new.csv', format='csv',\
#                      chunksize=10, header=True, maxColumns=98000)

In [10]:
# df.printSchema()

In [9]:
# df.select(F.col('height')).show(2)

In [8]:
# cols = (F.col('height'), 'firstName', 'active', 'jersey', 'id', 'weight', 'dateOfBirth', 'age', 'slug', 'debutYear',\
#        F.col('collegeAthlete.draft.round'), F.col('collegeAthlete.draft.year'), 'collegeAthlete.draft.selection',\
#        'status.abbreviation', 'experience.years',\
#        'position.name', 'position.id', 'team.id.team', 'team.start.season', 'draft.team.id.team',\
#        'college.shortName', 'collegeAthlete.jersey', 'collegeAthlete.height', 'collegeAthlete.weight',\
#        'position.parent.abbreviation', 'collegeAthlete.birthPlace.city',\
#        'statistics.splits.categories.1.stats.0.value', 'statistics.splits.categories.1.stats.0.displayValue',\
#        'statistics.splits.categories.1.stats.37.displayValue')

# df = df.select(*cols)

In [315]:
import pandas as pd
import numpy as np
from pyspark.ml.feature import Imputer

In [316]:
# df = spark.read.load('/home/kzaharov-369865/test.csv', format='csv', header=False)
# df = spark.read.csv('/home/kzaharov-369865/test.csv', header=False)

In [317]:
teams = spark.read.csv('/home/kzaharov-369865/teams.csv', header=True, inferSchema=True)

In [322]:
def add_comma_to_dot_names(s: str) -> str:
    split_s = list(s)
    flag = False
    res = s
    
    for i in split_s:
        if i == '.':
            flag = True
            
    if flag:
        res = '`' + ''.join(list(s)) + '`'
        
    return res

In [499]:
def rename_dot_columns(columns: str) -> str:
    new_columns = []
    
    for col in columns:
        flag = False
        m = [0]
        
        split_col = list(col)
        for i in range(len(split_col)):
            if split_col[i] == '.':
                flag = True
                m.append(i)
        m.append(len(split_col))
        
        if flag:
            temp = []
            for j in range(len(m) - 1):
                temp.append(split_col[m[j]:m[j+1]])
            
            temp2 = [''.join(temp[0])]
            for i in temp[1:]:
                i.pop(0)
                temp2.append(''.join(i))
            rename_col = ''.join(temp2)
            
        else:
            rename_col = col
            
        new_columns.append(rename_col)

    return new_columns

In [528]:
def clear_columns(data, ths: list = [31, 1]):

    float_attr = np.where(np.array(data.dtypes).T[1] == 'double')[0]
    int_attr = np.where(np.array(data.dtypes).T[1] == 'int')[0]
    bool_attr= np.where(np.array(data.dtypes).T[1] == 'boolean')[0]
    str_attr = np.where(np.array(data.dtypes).T[1] == 'string')[0]
    

    if np.any(float_attr):
        float_columns = np.array(data.dtypes)[float_attr].T[0]
    else:
        float_columns = np.array([])

    if np.any(int_attr):
        int_columns = np.array(data.dtypes)[int_attr].T[0]
    else:
        int_columns = np.array([])

    if np.any(bool_attr):
        bool_columns = np.array(data.dtypes)[bool_attr].T[0]
    else:
        bool_columns = np.array([])

    if np.any(str_attr):
        str_columns = np.array(data.dtypes)[str_attr].T[0]
    else:
        str_columns = np.array([])
        
        
    float_columns = list(map(lambda x: add_comma_to_dot_names(x), float_columns))
    int_columns = list(map(lambda x: add_comma_to_dot_names(x), int_columns))
    bool_columns = list(map(lambda x: add_comma_to_dot_names(x), bool_columns))
    str_columns = list(map(lambda x: add_comma_to_dot_names(x), str_columns))
    
    #float attributes
    data_float_cols = data.select(*float_columns)
    data_float_cols_describe = data_float_cols.describe()
    
    data_float_cols_std = data_float_cols_describe.filter(F.col('summary') == 'stddev')
    data_float_cols_count = data_float_cols_describe.filter(F.col('summary') == 'count')

    data_float_cols_std_dict = data_float_cols_std.collect()[0].asDict()
    data_float_cols_count_dict = data_float_cols_count.collect()[0].asDict()

    del data_float_cols_std_dict['summary']
    del data_float_cols_count_dict['summary']
    
    f1 = [key for (key, value) in data_float_cols_std_dict.items() if value != '0.0']
    f2 = [key for (key, value) in data_float_cols_count_dict.items() if int(value) >= ths[0]]
    non_const_float = np.intersect1d(f1, f2)
    
    #integer attributes
    data_int_cols = data.select(*int_columns)
    data_int_cols_describe = data_int_cols.describe()
    
    data_int_cols_std = data_int_cols_describe.filter(F.col('summary') == 'stddev')
    data_int_cols_count = data_int_cols_describe.filter(F.col('summary') == 'count')

    data_int_cols_std_dict = data_int_cols_std.collect()[0].asDict()
    data_int_cols_count_dict = data_int_cols_count.collect()[0].asDict()

    del data_int_cols_std_dict['summary']
    del data_int_cols_count_dict['summary']
    
    i1 = [key for (key, value) in data_int_cols_std_dict.items() if value != '0.0']
    i2 = [key for (key, value) in data_int_cols_count_dict.items() if int(value) >= ths[1]]
    non_const_int = np.intersect1d(i1, i2)
        
    #string attributes
    counts = []
    for i in str_columns:
        counts.append([i, data.select(i).distinct().count()])
        
    non_const_str = np.array(counts).T[0][np.where(np.array(counts).T[1].astype(int) > 1)[0]]

    
    #boolean attributes
    data_bool_cols = data.select(*bool_columns)
    
    
    
    #FILL NAN VALUES
    
    #Float
    data_float = data.select(*list(map(lambda x: add_comma_to_dot_names(x), non_const_float)))
    non_const_float = rename_dot_columns(non_const_float) 
    data_float = data_float.toDF(*non_const_float)
    
    fill_float = Imputer(
            inputCols = data_float.columns,
            outputCols = data_float.columns
        ).setStrategy("median")
    data_float_clean = fill_float.fit(data_float).transform(data_float)
    
    
    #Integer
    data_int = data.select(*list(map(lambda x: add_comma_to_dot_names(x), non_const_int)))
    non_const_int = rename_dot_columns(non_const_int) 
    data_int = data_int.toDF(*non_const_int)
    
    fill_int = Imputer(
            inputCols = data_int.columns,
            outputCols = data_int.columns
        ).setStrategy("median")
    data_int_clean = fill_int.fit(data_int).transform(data_int)
    
    
    
#     data.loc[:, non_const_float_players] = data[non_const_float_players].fillna(data[non_const_float_players].median().fillna(0))
#     data.loc[:, non_const_int_players] = data[non_const_int_players].fillna(data[non_const_int_players].median().fillna(0))
#     data.loc[:, non_const_str_players] = data[non_const_str_players].fillna(data[non_const_str_players].describe().loc['top'])

#     data = data[np.hstack([non_const_float_players, non_const_int_players, bool_columns_players, non_const_str_players])]



#     data = data_float_clean.unionByName(data_int_clean)

    cols = np.hstack([data_float_clean.columns, data_int_clean.columns])
    data = pd.concat([data_float_clean.toPandas(), data_int_clean.toPandas(), data_bool_cols.toPandas()], axis=1)
    data = spark.createDataFrame(data)
    
    return data

In [522]:
clean_teams = clear_columns(teams)
clean_teams = clean_teams.drop('statistics.season.$ref', 'statistics.seasonType.$ref', 'displayName')

In [523]:
print(clean_teams.count(), len(clean_teams.columns))

999 465


In [579]:
clean_teams = clean_teams.withColumnRenamed('id', 'team_id')

## Clear players

In [524]:
players = spark.read.csv('/home/kzaharov-369865/players.csv', header=True, inferSchema=True)

In [529]:
clean_players = clear_columns(players, ths=[10000, 31])
clean_players = clean_players.drop('Unnamed: 0', 'college.id', 'collegeAthlete.college.id')

In [530]:
print(clean_players.count(), len(clean_players.columns))

17748 231


In [589]:
clean_players = clean_players.withColumnRenamed('teamidteam', 'team_id')

## Merge players with teams

In [593]:
pd_players = clean_players.toPandas()

In [600]:
pd_teams = clean_teams.toPandas()

In [602]:
players_w_teams = pd.merge(pd_players, pd_teams, on=['team_id'], how='left')

In [605]:
players_w_teams.shape

(556288, 695)

## Merge teams with games

In [606]:
games = spark.read.csv('/home/kzaharov-369865/games.csv', header=True, inferSchema=True)

In [607]:
games = clear_columns(games)

In [608]:
print(games.count(), len(games.columns))

15936 335


In [None]:
final_df = pd.merge(players_w_teams, games.toPandas(), on=['team_id'], how='left')

In [None]:
final_df.to_csv('matches.csv', index=False)