In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

appName = "Fifa_dataset"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [2]:
from pyspark.sql.functions import lit

#converting all the data into one dataframe
dfs = []
for year in range(15, 23):
    file_path = f'players_{year}.csv'
    df = spark.read.csv(file_path, header= True, inferSchema = True)
    df = df.withColumn("year", lit(year))
    print(df.count())
    dfs.append(df)
    
df_final = dfs[0]
for df in dfs[1:]:
    df_final = df_final.union(df)

print(df_final.count() ) 
    #"C:\Subjects\Subjects\Systems and tool chains\Systems and Tool Chains Project\data\players_15.csv"
    #"C:/Subjects/Subjects/Systems and tool chains/Systems and Tool Chains Project/data/players_2015.csv"

16155
15623
17596
17954
18085
18483
18944
19239
142079


In [3]:
#df_final.show(3, vertical=True)
#df_final.count()
from pyspark.sql.functions import monotonically_increasing_id
df_final = df_final.withColumn("Serial_no", monotonically_increasing_id())
#df_final.filter(df_final["row_id"] == 13000).show(vertical=True)
df_final.show(1, vertical=True)
df_final.count()

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

142079

In [4]:
db_properties={}
#update your db username
db_properties['username']="postgres"
#update your db password
db_properties['password']="SaiSravan@12"
#make sure you got the right port number here
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
#make sure you had the Postgres JAR file in the right location
db_properties['driver']="org.postgresql.Driver"
db_properties['table']= "fifa.Fifa_Dataset"
#db_properties['Schemas'] = 'fifa'

df_final.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

In [5]:
df_final = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df_final.show(1, vertical=True)


-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [6]:
df_final.count()

142079

In [7]:
#constraints checking 
df_final.agg({'overall': 'min', 
               'potential': 'min',
            'club_jersey_number':'min',
             'pace': 'min', 
               'shooting': 'min',
            'passing':'min',
             'dribbling': 'min', 
               'potential': 'min',
            'defending':'min',
             'physic': 'min', 
               'attacking_crossing': 'min',
            'attacking_finishing':'min',
             'attacking_heading_accuracy': 'min', 
               'attacking_short_passing': 'min',
            'attacking_volleys':'min',
             'skill_dribbling': 'min', 
               'skill_curve': 'min',
            'skill_fk_accuracy':'min',
             'skill_long_passing': 'min', 
               'skill_ball_control': 'min',
            'movement_acceleration':'min',
             'movement_sprint_speed': 'min', 
               'movement_agility': 'min',
            'movement_reactions':'min',
              
             'movement_balance': 'min', 
               'power_shot_power': 'min',
            'power_jumping':'min',
             'power_stamina': 'min', 
               'power_strength': 'min',
            'power_long_shots':'min',
             'mentality_aggression': 'min', 
               'mentality_interceptions': 'min',
            'mentality_positioning':'min',
             'mentality_vision': 'min', 
               'mentality_penalties': 'min',
            'mentality_composure':'min',
             'defending_marking_awareness': 'min', 
               'defending_standing_tackle': 'min',
            'defending_sliding_tackle':'min',
             'goalkeeping_diving': 'min', 
               'goalkeeping_handling': 'min',
            'goalkeeping_kicking':'min',
             'goalkeeping_positioning': 'min', 
               'goalkeeping_reflexes': 'min',
            'goalkeeping_speed':'min'}).show(vertical=True)

-RECORD 0-------------------------------
 min(mentality_interceptions)     | 3   
 min(skill_curve)                 | 4   
 min(mentality_positioning)       | 2   
 min(mentality_composure)         | 11  
 min(attacking_short_passing)     | 7   
 min(shooting)                    | 14  
 min(power_stamina)               | 10  
 min(club_jersey_number)          | 1   
 min(skill_long_passing)          | 5   
 min(power_jumping)               | 13  
 min(power_strength)              | 12  
 min(defending_standing_tackle)   | 2   
 min(mentality_vision)            | 5   
 min(skill_fk_accuracy)           | 3   
 min(skill_dribbling)             | 2   
 min(movement_agility)            | 11  
 min(goalkeeping_kicking)         | 1   
 min(dribbling)                   | 22  
 min(movement_balance)            | 10  
 min(pace)                        | 21  
 min(mentality_aggression)        | 2   
 min(movement_reactions)          | 20  
 min(movement_sprint_speed)       | 11  
 min(passing)   

In [8]:
df_final.agg({'overall': 'max', 
               'potential': 'max',
            'club_jersey_number':'max',
             'pace': 'max', 
               'shooting': 'max',
            'passing':'max',
             'dribbling': 'max', 
               'potential': 'max',
            'defending':'max',
             'physic': 'max', 
               'attacking_crossing': 'max',
            'attacking_finishing':'max',
             'attacking_heading_accuracy': 'max', 
               'attacking_short_passing': 'max',
            'attacking_volleys':'max',
             'skill_dribbling': 'max', 
               'skill_curve': 'max',
            'skill_fk_accuracy':'max',
             'skill_long_passing': 'max', 
               'skill_ball_control': 'max',
            'movement_acceleration':'max',
             'movement_sprint_speed': 'max', 
               'movement_agility': 'max',
            'movement_reactions':'max',
              
             'movement_balance': 'max', 
               'power_shot_power': 'max',
            'power_jumping':'max',
             'power_stamina': 'max', 
               'power_strength': 'max',
            'power_long_shots':'max',
             'mentality_aggression': 'max', 
               'mentality_interceptions': 'max',
            'mentality_positioning':'max',
             'mentality_vision': 'max', 
               'mentality_penalties': 'max',
            'mentality_composure':'max',
             'defending_marking_awareness': 'max', 
               'defending_standing_tackle': 'max',
            'defending_sliding_tackle':'max',
             'goalkeeping_diving': 'max', 
               'goalkeeping_handling': 'max',
            'goalkeeping_kicking':'max',
             'goalkeeping_positioning': 'max', 
               'goalkeeping_reflexes': 'max',
            'goalkeeping_speed':'max'}).show(vertical=True)



-RECORD 0-------------------------------
 max(mentality_interceptions)     | 93  
 max(skill_curve)                 | 94  
 max(mentality_positioning)       | 96  
 max(mentality_composure)         | 96  
 max(attacking_short_passing)     | 95  
 max(shooting)                    | 94  
 max(power_stamina)               | 97  
 max(club_jersey_number)          | 99  
 max(skill_long_passing)          | 95  
 max(power_jumping)               | 97  
 max(power_strength)              | 98  
 max(defending_standing_tackle)   | 94  
 max(mentality_vision)            | 96  
 max(skill_fk_accuracy)           | 95  
 max(skill_dribbling)             | 97  
 max(movement_agility)            | 96  
 max(goalkeeping_kicking)         | 95  
 max(dribbling)                   | 96  
 max(movement_balance)            | 97  
 max(pace)                        | 97  
 max(mentality_aggression)        | 96  
 max(movement_reactions)          | 96  
 max(movement_sprint_speed)       | 97  
 max(passing)   

# Task 2

can impute some values such as shooting, passing, dribbling, defending, and physic $\n$
can drop rows which arent needed such as nation_id, nation_position, nation_jersey_number and nation_logo_url

In [9]:
#contract ending in 2023
df_2023 = df_final.filter(df_final.year == 22)

In [10]:
df_2023.show(3,vertical=True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | RW, ST, CF           
 overall                     | 93                   
 potential                   | 93                   
 value_eur                   | 7.8E7                
 wage_eur                    | 320000.0             
 age                         | 34                   
 dob                         | 1987-06-24           
 height_cm                   | 170                  
 weight_kg                   | 72                   
 club_team_id                | 73.0                 
 club_name                   | Paris Saint-Germain  
 league_name                 | French Ligue 1       
 league_level                | 1                    
 club_position               | RW             

In [11]:
#Task2 Question1
df_2023 = df_2023.filter(df_2023.club_contract_valid_until == 2023)
def no_of_clubs(X):
    club_counts = df_2023.groupBy("club_name").count()
    top_X_clubs = club_counts.orderBy("count", ascending=False)
    #top_X_clubs.select("club_name").show(X)
    top_x_clubs = top_X_clubs.limit(X)
    top_x_clubs.show(X, vertical=True)

#no_of_clubs(25)

X = int(input("Enter the number of Top teams you want to see: "))
no_of_clubs(X)

Enter the number of Top teams you want to see: 15
-RECORD 0-------------------------
 club_name | En Avant de Guingamp 
 count     | 19                   
-RECORD 1-------------------------
 club_name | Club Atlético Lanús  
 count     | 17                   
-RECORD 2-------------------------
 club_name | Lechia Gdańsk        
 count     | 17                   
-RECORD 3-------------------------
 club_name | Barnsley             
 count     | 16                   
-RECORD 4-------------------------
 club_name | Bengaluru FC         
 count     | 16                   
-RECORD 5-------------------------
 club_name | Kasimpaşa SK         
 count     | 16                   
-RECORD 6-------------------------
 club_name | Al Tai               
 count     | 15                   
-RECORD 7-------------------------
 club_name | KAA Gent             
 count     | 15                   
-RECORD 8-------------------------
 club_name | Zagłębie Lubin       
 count     | 15                   
-RECO

In [12]:
df_final.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: date (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = true)
 

In [13]:
#typecasting
from pyspark.sql.functions import col
casted_types_df = df_final.withColumn("dob2", col("dob").cast("string")).drop("dob")
casted_types_df = casted_types_df.withColumn("club_joined2", col("club_joined").cast("string")).drop("club_joined")

#checking for NA values
from pyspark.sql.functions import *

null_counts_plays_df = casted_types_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) \
                        for c in casted_types_df.columns])

null_counts_plays_df.show(truncate=False, vertical=True)

-RECORD 0-----------------------------
 sofifa_id                   | 0      
 player_url                  | 0      
 short_name                  | 0      
 long_name                   | 0      
 player_positions            | 0      
 overall                     | 0      
 potential                   | 0      
 value_eur                   | 1897   
 wage_eur                    | 1622   
 age                         | 0      
 height_cm                   | 0      
 weight_kg                   | 0      
 club_team_id                | 1630   
 club_name                   | 1630   
 league_name                 | 1630   
 league_level                | 2015   
 club_position               | 1630   
 club_jersey_number          | 1630   
 club_loaned_from            | 133774 
 club_contract_valid_until   | 1630   
 nationality_id              | 0      
 nationality_name            | 0      
 nation_team_id              | 133635 
 nation_position             | 133635 
 nation_jersey_number    

In [14]:
casted_types_df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: double (nullable = true)
 |-- wage_eur: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: double (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |-- nationality_name: string (nullable = true)
 |-- nation_team_id: double

In [15]:
df_final = df_final.na.drop(subset=['club_name'])
df_final.count()

140449

In [16]:
#Task2 Question 2
#club with highest average number of players older than 27 years accross all years
filtered_df = df_final.filter(df_final.age > 27)
grouped_df = filtered_df.groupBy("club_name","year")
sorted_df = filtered_df.orderBy("club_name")
#sorted_df = sorted_df.orderBy("year", ascending = True)
#sorted_df.show(20, vertical=True)
club_names = filtered_df.select('club_name').distinct().collect()
dictionary = {}
for name in club_names:
    df = sorted_df.filter(sorted_df.club_name == name['club_name'])
    df = df.orderBy('year', ascending = True)
    distinct_years = df.select('year').distinct().orderBy('year')
    dictionary[name['club_name']] = (df.count() / distinct_years.count())
    #distinct_values.show()
    #print(distinct_years.count())
    #print(df.count())
    
    #df.show(1, vertical = True)
    
print(dictionary)

{'Palermo': 7.2, 'Yeovil Town': 5.4, 'CD Everton de Viña del Mar': 8.25, 'Shonan Bellmare': 11.0, 'Göztepe SK': 10.0, 'FC Voluntari': 8.333333333333334, 'Salford City': 7.666666666666667, 'Santiago Wanderers': 6.0, '1. FC Union Berlin': 9.125, 'Carpi': 3.8, 'Crucero del Norte': 14.0, 'Sagan Tosu': 12.333333333333334, 'Guangzhou FC': 12.5, 'Jiangsu FC': 11.0, 'Club Independiente Santa Fe': 9.571428571428571, 'SC Paderborn 07': 6.714285714285714, 'Puebla FC': 12.75, 'Karlsruher SC': 8.125, 'Cheltenham Town': 5.714285714285714, 'Argentinos Juniors': 9.166666666666666, 'Shandong Taishan': 13.5, 'San Lorenzo de Almagro': 10.125, 'SC Magna Wiener Neustadt': 6.0, 'SpVgg Unterhaching': 8.0, 'Shelbourne FC': 8.0, 'Universidad Católica': 10.875, 'GFC Ajaccio': 8.8, 'FC Luzern': 6.375, 'Club Athletico Paranaense': 12.666666666666666, 'AIK': 11.0, 'SC Heerenveen': 2.25, 'Sandefjord Fotball': 6.4, 'SKA Khabarovsk': 15.0, 'Fortaleza': 8.0, 'CD Universidad de Concepción': 12.0, 'GIF Sundsvall': 6.8, 

In [17]:
sorted_dict = dict(sorted(dictionary.items(), key=lambda item: item[1], reverse=True))
print(sorted_dict)

{'Dorados de Sinaloa': 19.0, 'Matsumoto Yamaga FC': 19.0, 'Shanghai Shenhua FC': 18.5, 'Qingdao FC': 18.0, 'Club Deportivo Jorge Wilstermann': 17.5, 'Altay SK': 17.0, 'Guaireña FC': 17.0, 'İstanbul Başakşehir FK': 16.625, 'BB Erzurumspor': 16.5, 'Club Olimpia': 16.5, 'Sport Huancayo': 16.5, 'Beijing Renhe FC': 16.0, 'Paraná': 16.0, 'Sandecja Nowy Sącz': 16.0, 'CFR Cluj': 15.666666666666666, 'Extremadura UD': 15.5, 'Parma': 15.166666666666666, 'SKA Khabarovsk': 15.0, 'Joinville': 15.0, 'Criciúma': 15.0, 'Ventforet Kofu': 15.0, 'Padova': 15.0, 'US Cremonese': 15.0, 'Jeonbuk Hyundai Motors': 14.75, 'Júbilo Iwata': 14.75, 'Beijing Guoan FC': 14.75, 'Sport Club do Recife': 14.666666666666666, 'Gazişehir Gaziantep F.K.': 14.666666666666666, 'Fútbol Club Juárez': 14.666666666666666, 'AFC Hermannstadt': 14.5, 'Changchun Yatai FC': 14.5, 'Club Always Ready': 14.5, 'Club Atlético Nacional Potosí': 14.5, 'Associação Chapecoense de Futebol': 14.4, 'Shimizu S-Pulse': 14.2, 'Patronato': 14.166666666

In [22]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType, DoubleType
def top_Y_clubs(Y):
    sorted_list_values = list(sorted_dict.values())
    print_sorted = sorted_list_values[:Y]
    Z = Y - 1
    while (sorted_list_values[Z + 1] == sorted_list_values[Y]):
        print_sorted.append(sorted_list_values[Z + 1])
        Z = Z + 1
    length = len(print_sorted)
    list_of_dicts = []
    for key, value in sorted_dict.items():
        #print(key)
        #print(value)
        list_of_dicts.append({key: value})
    #rdd = spark.sparkContext.parallelize(list_of_dicts)
    schema = StructType([
        StructField("Club_name", StringType(), True),
        StructField("Average", DoubleType(), True)
    ])

    rdd = spark.sparkContext.parallelize(list_of_dicts)
    df = spark.createDataFrame(rdd.map(lambda x: (list(x.keys())[0], list(x.values())[0])), schema)
    df = df.limit(length)

    df.show(length, vertical = True)
    
Y_val = int(input("Enter the required number of clubs: "))
top_Y_clubs(Y_val)

Enter the required number of clubs: 6
-RECORD 0-------------------------
 Club_name | Dorados de Sinaloa   
 Average   | 19.0                 
-RECORD 1-------------------------
 Club_name | Matsumoto Yamaga FC  
 Average   | 19.0                 
-RECORD 2-------------------------
 Club_name | Shanghai Shenhua FC  
 Average   | 18.5                 
-RECORD 3-------------------------
 Club_name | Qingdao FC           
 Average   | 18.0                 
-RECORD 4-------------------------
 Club_name | Club Deportivo Jo... 
 Average   | 17.5                 
-RECORD 5-------------------------
 Club_name | Altay SK             
 Average   | 17.0                 
-RECORD 6-------------------------
 Club_name | Guaireña FC          
 Average   | 17.0                 



In [23]:
#task2 - 3 question
from pyspark.sql.types import StructType, StructField, FloatType, StringType, DoubleType, IntegerType
df_nation_position = df_final.groupBy("year", "nation_position").agg(count("*").alias("count"))
#df_nation_position.show(vertical=True)
sorted_df2 = df_nation_position.orderBy("year")
years = [15, 16, 17, 18, 19, 20, 21, 22]
dict_nation_position = {}
for year in years:
    
    sorted_df2_i = sorted_df2.filter(sorted_df2.year == year)
    #sorted_df2_i.show(vertical=True)
    sorted_df2_final = sorted_df2_i.orderBy(desc("count"))
    s = sorted_df2_final.na.drop()
    s = s.where(s.nation_position != 'SUB')
    #s.show(vertical=True)
    first_value = s.collect()[0]['nation_position']
    dict_nation_position[year] = first_value
    
#df_nation_position.show(10, vertical= True)
#print(dict_nation_position)
list_of_dicts2 = []
for key, value in dict_nation_position.items():
    list_of_dicts2.append({key: value})
#print(list_of_dicts2)
schema = StructType([
        StructField("Year", IntegerType(), True),
        StructField("Pos", StringType(), True), 
    
    ])

rdd = spark.sparkContext.parallelize(list_of_dicts2)
df = spark.createDataFrame(rdd.map(lambda x: (list(x.keys())[0], list(x.values())[0])), schema)

df.show()

+----+---+
|Year|Pos|
+----+---+
|  15| GK|
|  16|RCB|
|  17| GK|
|  18|RCB|
|  19| GK|
|  20|LCB|
|  21|LCB|
|  22|LCB|
+----+---+

