In [1]:
import time
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import joblib

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, udf
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression

# define plt & sns settings
sns.set_theme()
plt.rcParams["font.size"] = 20
plt.rcParams["axes.labelsize"] = 20
plt.rcParams["xtick.labelsize"] = 20
plt.rcParams["ytick.labelsize"] = 20
plt.rcParams["legend.fontsize"] = 20
plt.rcParams["legend.markerscale"] = 1.5
plt.rcParams["figure.figsize"] = (20, 10)
plt.rcParams["legend.title_fontsize"] = 20

In [2]:
TARGET_COL = 'ANNUAL_GROSS'

def fill_missing_values_with_mean(df):
    for column in df.columns:
        has_missing_values = df.filter(col(column).isNull()).count() > 0
        if has_missing_values:
            mean_value = df.select(mean(col(column))).collect()[0][0]
            tmp = df.na.fill({column : mean_value})
            df = tmp

    return df

def drop_low_correlation_cols(df):
    cols_to_drop = ['CrdR', 'Press_per', 'Clr', 'Dribble_Succ_per', 'Rec_per', 'Season', 'Current_Club', 'League']
    for column in cols_to_drop:
        tmp = df.drop(column)
        df = tmp

    return df

def Normalize(df):

    numeric_cols = [f.name for f in df.schema.fields if not isinstance(f.dataType, StringType)]
    copy = df

    # UDF for converting column type from vector to double type
    unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

    for column in numeric_cols:

        if column == TARGET_COL:
            continue

        # VectorAssembler Transformation - Converting column to vector type
        assembler = VectorAssembler(inputCols=[column], outputCol=column+'_Vect')

        # MinMaxScaler Transformation
        mmScaler = MinMaxScaler(inputCol=column+"_Vect", outputCol=column+"_Scaled")

        # Pipeline of VectorAssembler and MinMaxScaler
        pipeline = Pipeline(stages=[assembler, mmScaler])

        # Fitting pipeline on dataframe and drop original column
        tmp = pipeline.fit(copy).transform(copy).withColumn(column+"_Scaled", unlist(column+"_Scaled")).drop(column+"_Vect").drop(column)
        copy = tmp


    tmp = copy

    # Rename each transformed column to its original name
    for column in copy.columns:
        if column.endswith('_Scaled'):
            tmp = tmp.withColumnRenamed(column, column[:len(column)-7])


    return tmp

def Encoding(df):

    str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]

    copy = df

    # # define a UDF for LENGTH column to get the first char
    # year_only = udf(lambda x: int(x[0]))

    # convert every string column into int column
    for column in str_cols:
        indexer = StringIndexer(inputCol=column, outputCol=column+'_numeric')
        copy = indexer.fit(copy).transform(copy).drop(column)

    # # apply year_only function on LENGTH column
    # copy = copy.withColumn('LENGTH_numeric', year_only(copy.LENGTH)).drop('LENGTH')

    # Rename each transformed column to its original name
    for column in copy.columns:
        if column.endswith('_numeric'):
            copy = copy.withColumnRenamed(column, column[:len(column)-8])


    return copy

In [3]:
spark = SparkSession.builder.appName("FIFA_Clusters").getOrCreate()

url_18_19 = 'https://gist.githubusercontent.com/RonBless/a6aa9cd0570a09f6718115b20ec4b590/raw/ff7915725d5a7165c76b359fcff93691fdacf282/18_19.csv'
url_19_20 = 'https://gist.githubusercontent.com/RonBless/a6aa9cd0570a09f6718115b20ec4b590/raw/ff7915725d5a7165c76b359fcff93691fdacf282/19_20.csv'
url_20_21 = 'https://gist.githubusercontent.com/RonBless/a6aa9cd0570a09f6718115b20ec4b590/raw/ff7915725d5a7165c76b359fcff93691fdacf282/20_21.csv'

spark.sparkContext.addFile(url_18_19)
spark.sparkContext.addFile(url_19_20)
spark.sparkContext.addFile(url_20_21)

df18 = spark.read.csv("file://"+SparkFiles.get("18_19.csv"), header=True, inferSchema= True)
df19 = spark.read.csv("file://"+SparkFiles.get("19_20.csv"), header=True, inferSchema= True)
df20 = spark.read.csv("file://"+SparkFiles.get("20_21.csv"), header=True, inferSchema= True)

df18_names = df18.select("Player")
df19_names = df19.select("Player")
df20_names = df20.select("Player")

23/08/14 17:34:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Find names that appear in all three DataFrames
common_names = df18_names.intersect(df19_names).intersect(df20_names)

# Filter each DataFrame to keep only the common names
df18_filtered = df18.join(common_names, on="Player", how="inner")
df19_filtered = df19.join(common_names, on="Player", how="inner")
df20_filtered = df20.join(common_names, on="Player", how="inner")

In [5]:
df18_filtered_sorted = df18_filtered.sort(df18_filtered.Player.asc())
df19_filtered_sorted = df19_filtered.sort(df19_filtered.Player.asc())
df20_filtered_sorted = df20_filtered.sort(df20_filtered.Player.asc())

In [6]:
df18 = fill_missing_values_with_mean(df18_filtered_sorted)
df19 = fill_missing_values_with_mean(df19_filtered_sorted)

In [7]:
df18 = drop_low_correlation_cols(df18)
df19 = drop_low_correlation_cols(df19)

In [8]:
df18_norm = Normalize(df18)
df19_norm = Normalize(df19)

In [9]:
df18_norm_enc = Encoding(df18_norm)
df19_norm_enc = Encoding(df19_norm)

In [10]:
(train18, test18) = df18_norm_enc.randomSplit([0.8, 0.2], seed=42)
(train19, test19) = df19_norm_enc.randomSplit([0.8, 0.2], seed=42)

In [11]:
features18 = train18.columns
features18.remove(TARGET_COL)

features19 = train19.columns
features19.remove(TARGET_COL)

In [12]:
vectorAssembler = VectorAssembler(inputCols = features18, outputCol = 'features')
vdf18 = vectorAssembler.transform(train18)
vdf18 = vdf18.select(['features', TARGET_COL])

vectorAssembler = VectorAssembler(inputCols = features19, outputCol = 'features')
vdf19 = vectorAssembler.transform(train19)
vdf19 = vdf18.select(['features', TARGET_COL])

dataset = vdf18.union(vdf19)

In [13]:
lr = LinearRegression(labelCol=TARGET_COL, maxIter=100, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel18 = lr.fit(dataset)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel18.coefficients))
print("Intercept: %s" % str(lrModel18.intercept))

# Summarize the model over the training set and print out some metrics
training18Summary = lrModel18.summary
print("numIterations: %d" % training18Summary.totalIterations)
print("objectiveHistory: %s" % str(training18Summary.objectiveHistory))
training18Summary.residuals.show()
print("RMSE: %f" % training18Summary.rootMeanSquaredError)
print("r2: %f" % training18Summary.r2)

23/08/14 17:35:59 WARN DAGScheduler: Broadcasting large task binary with size 1395.5 KiB



[Stage 1859:>                                                       (0 + 2) / 2]

23/08/14 17:36:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/08/14 17:36:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS



                                                                                

23/08/14 17:36:23 WARN DAGScheduler: Broadcasting large task binary with size 1399.4 KiB


                                                                                

Coefficients: [-9052.491394041243,213277.50167201037,-188591.46362888237,105538.32862182349,31879.35020020141,3935.461870670422,-89792.63163089102,-13460.90275935595,0.0,-32920.009970435436,66220.73379350257,-52099.96157120458,-6603.551847789801,23844.08675588463,-6992.044846572539,-14543.591616654492,-333926.7628543401,91754.48733184997,433672.78103456896,-467440.78493532125,174610.24961366595,11390.661160907051,30931.01538517174,63566075.36133205,44039.702848446184,-8166.337968881266,3414.321679282163,-1340.713851913851,2503.770884701617,-3.432966906658719,-790.0204687753464,-790.0204687753464]
Intercept: 61339.806921695956
numIterations: 100
objectiveHistory: [0.5, 0.4048562195127415, 0.22307727987000184, 0.1359720748766768, 0.08540280616408918, 0.05062320928443028, 0.03022671388784658, 0.01948185076758977, 0.01429026824274292, 0.01173855335915408, 0.010047811162743805, 0.008082459723451767, 0.005292773616089741, 0.004604963212488235, 0.003893154131690176, 0.002074339750664723, 0.00


[Stage 1877:>                                                       (0 + 1) / 1]

+-------------------+
|          residuals|
+-------------------+
| 1744.0112953690405|
| 3673.0776926113685|
|  25372.50067537486|
| 27616.518169920164|
|-26372.467521783197|
|-147.04842934495537|
|  1717.156290305953|
|    22953.034057051|
| -17444.16716856684|
| -38647.59101147298|
|  -5898.21343120758|
| -18060.61832763377|
| -6788.469839933154|
|-3590.8500175218796|
| -22021.66416239168|
|-13261.064974018838|
|  11873.30507538229|
| -3150.849411241128|
|  37005.56945533812|
|-30731.531878367008|
+-------------------+
only showing top 20 rows

RMSE: 22808.906513
r2: 0.999984



                                                                                

In [14]:
model_path = "reg_model"
lrModel18.write().overwrite().save(model_path)


[Stage 1881:>                                                       (0 + 1) / 1]

                                                                                

In [15]:
lrModel18

LinearRegressionModel: uid=LinearRegression_0c927c19f4ac, numFeatures=32

In [16]:
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load(model_path)
loaded_model

LinearRegressionModel: uid=LinearRegression_0c927c19f4ac, numFeatures=32

In [115]:
# ====================================================== Consumer Part ==================================================================

In [116]:
from confluent_kafka import Consumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml import PipelineModel
from pyspark import SparkFiles
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegressionModel

In [117]:
conf = {'bootstrap.servers': "localhost:9092",
        'group.id': "foo",
        'auto.offset.reset': 'smallest'}

consumer = Consumer(conf)

In [118]:
running = True
def kafka_consumer(consumer):
    output_path = ""
    try:
        if not os.path.exists(output_folder):
            # Create the directory
            os.makedirs(output_folder)
            print(f"Directory '{output_folder}' created.")
        else:
            print(f"Directory '{output_folder}' already exists.")
        consumer.subscribe(topic)
        while running:
            msg = consumer.poll(timeout=3.0)
            if msg is None:
                break
                
            if msg.error():
                if msg.error():
                    raise Exception(msg.error())
                    
            print("starting prediction for ", msg.value().decode('utf-8'))
            prediction = runModel(msg.value().decode('utf-8'))
            if prediction is not None:
                prediction.show()
                output_path = output_folder + '/' + msg.value().decode('utf-8') + '.txt'
                writeToFile(output_path, prediction)

            else:
                print("The player is not valid for this time period.")



    finally:
        # Close down consumer to commit final offsets.
        print('Finished task')
        consumer.close()

def shutdown():
    running = False

In [119]:
def runModel(name):
    data = checkIfValidName(name)
    print(data)
    if data is not None:
        return loaded_model.transform(data)
    else:
        print("Error, the moesdel was not trained on the specified player")


# Make sure we have previous resutlts for that player
def checkIfValidName(name):
    url = 'https://gist.githubusercontent.com/Aviad-Hedvat/4de2aa4fa83a418fe4ab2ea3995ded36/raw/d8ec5823d09bc040c1e9f4988d7ad75202c4e30c/dataset.csv'
    spark.sparkContext.addFile(url)
    df = spark.read.csv("file://"+SparkFiles.get("dataset.csv"), header=True, inferSchema= True)
    tmp = df.filter(f'Name == "{name}"')
    if tmp.count() > 0:
        features = tmp.columns
        features.remove('Name')
        vectorAssembler = VectorAssembler(inputCols = features, outputCol = 'features')
        vdf = vectorAssembler.transform(tmp)
        vdf = vdf.select('features')
        return vdf
    else:
        return None


    
def writeToFile(output_path, prediction):
    try:
        with open(output_path, 'w') as file:
            file.write(str(prediction.select('prediction').collect()[0][0]))
        print(f"Prediction written to '{output_path}' successfully.")
    except Exception as e:
        print(f"An error occurred: {e}")
        
def defineSchema():
    return StructType([
        StructField('features', VectorUDT(), True),
        StructField('ANNUAL_GROSS', IntegerType(), True)
    ])


In [120]:
spark = SparkSession.builder\
        .appName("FIFA_Consumer")\
        .getOrCreate()
output_folder = "Predictions"
model_path = "reg_model"
topic = ["prediction"]
kafka_consumer(consumer)

Directory 'Predictions' already exists.
starting prediction for  Adam Ounas
23/08/14 18:16:51 WARN SparkContext: The path https://gist.githubusercontent.com/Aviad-Hedvat/4de2aa4fa83a418fe4ab2ea3995ded36/raw/d8ec5823d09bc040c1e9f4988d7ad75202c4e30c/dataset.csv has been added already. Overwriting of added paths is not supported in the current version.
1
DataFrame[features: vector]
+--------------------+-----------------+
|            features|       prediction|
+--------------------+-----------------+
|[0.273,0.309,0.28...|2950224.501814125|
+--------------------+-----------------+

Prediction written to 'Predictions/Adam Ounas.txt' successfully.
Finished task
