In [None]:
#Task-I

In [1]:
# Uncomment the following lines if you are using Windows!
import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [16]:
from pyspark.sql.functions import lit, monotonically_increasing_id
years = range(15,23)
players = []

for year in years:
    df = spark.read.csv(f"players_{year}.csv", header = True, inferSchema = True)
    df = df.withColumn('year',lit(year))
    df = df.withColumn('unique_id',monotonically_increasing_id())
    players.append(df)
    
merged_df = players[0]
for df in players[1:]:
    merged_df = merged_df.union(df)

# merged_df.show(vertical=True)


In [27]:
table_name = "FIFA_Players"
spark.sql("CREATE SCHEMA IF NOT EXISTS fifa")
fully_qualified_table_name = "fifa." + table_name

db_properties={}
db_properties['username']="postgres"
db_properties['password']="gilgamesh"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['table']="fully_qualified_table_name"
db_properties['driver']="org.postgresql.Driver"


merged_df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()


In [28]:
#Task-II

In [35]:
from pyspark.sql.functions import col
df_2022 = merged_df.filter(merged_df["year"] == 22)
def x_clubs(df_2022, x):
    contract_counts = df_2022.filter(df_2022["club_contract_valid_until"] == 2023) \
                                  .groupBy("club_name") \
                                  .count() \
                                  .sort(col("count").desc())

    top_clubs = contract_counts.limit(x)
    return top_clubs

In [37]:
# df_2022.show(vertical=True)
top_x_clubs = x_clubs(df_2022, 10)
top_x_clubs.show()

+--------------------+-----+
|           club_name|count|
+--------------------+-----+
|En Avant de Guingamp|   19|
| Club Atlético Lanús|   17|
|       Lechia Gdańsk|   17|
|            Barnsley|   16|
|        Kasimpaşa SK|   16|
|        Bengaluru FC|   16|
|        FC Barcelona|   15|
|  SV Wehen Wiesbaden|   15|
|          CA Osasuna|   15|
|      Zagłębie Lubin|   15|
+--------------------+-----+



In [42]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

year_counts = merged_df.filter(merged_df["age"] > 27) \
                        .groupby("club_name", "year") \
                        .agg(F.count("*").alias("count"))

def y_clubs(year_counts, y):

    avg_counts = year_counts.groupby("club_name") \
                                      .agg(F.avg("count").alias("avg_count")) \
                                      .orderBy(F.desc("avg_count"))
    
    avg_counts = avg_counts.withColumn("rank", F.dense_rank().over(Window.orderBy(F.desc("avg_count"))))
    top_y = avg_counts.filter(F.col("rank") <= y).drop("rank")
    
    years_present = year_counts.groupby("club_name").agg(F.countDistinct("year").alias("years_present"))
    top_y = top_y.join(years_present, on="club_name", how="left")

    top_y = top_y.withColumn("final_avg_count", F.col("avg_count") * \
                             F.when(F.col("years_present") > 0,\
                                            1 / F.col("years_present")).otherwise(1))

    return top_y

In [43]:
top_y = y_clubs(year_counts, 5)

# Show the results
top_y.show()

+--------------------+---------+-------------+---------------+
|           club_name|avg_count|years_present|final_avg_count|
+--------------------+---------+-------------+---------------+
|                null|   109.25|         null|         109.25|
|  Dorados de Sinaloa|     19.0|            1|           19.0|
| Matsumoto Yamaga FC|     19.0|            1|           19.0|
| Shanghai Shenhua FC|     18.5|            4|          4.625|
|          Qingdao FC|     18.0|            2|            9.0|
|Club Deportivo Jo...|     17.5|            2|           8.75|
+--------------------+---------+-------------+---------------+



In [47]:
from pyspark.sql import functions as F

def most_frequent_position(df):

    position_counts = df.groupby("year", "nation_position") \
                             .agg(F.count("*").alias("count")) \
                             .orderBy("year", F.desc("count"))

    window_spec = Window.partitionBy("year").orderBy(F.desc("count"))
    ranked_positions = position_counts.withColumn("rank", F.rank().over(window_spec))

    most_frequent_positions = ranked_positions.filter(F.col("rank") == 1).select("year", "nation_position", "count")

    return most_frequent_positions

In [48]:
most = most_frequent_position(merged_df)
most.show()

+----+---------------+-----+
|year|nation_position|count|
+----+---------------+-----+
|  15|           null|15074|
|  16|           null|14608|
|  17|           null|16515|
|  18|           null|16804|
|  19|           null|16981|
|  20|           null|17356|
|  21|           null|17817|
|  22|           null|18480|
+----+---------------+-----+

