## Installing Java, Spark and Pyspark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

## Enviroment Setup and Importing Libraries

In [107]:
from time import perf_counter

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

In [35]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
SQL_CONTEXT = SQLContext(spark)

In [145]:
class DataAnalyzer:
  """
    Class that resolves the Challenges for analyzing the Fifa 21 Data.
  """

  def __init__(self):
    self.data = SQL_CONTEXT.read.csv("/content/data/fifa21_male2.csv", header=True).cache()
    self.nationality_output_file = "output_data/nationalities"
    self.top_10_position_output_file = "output_data/top_10_position"
    self.club_position_output_file = "output_data/club_position"
    self.top_10_sprint_speed_output_file = "output_data/top_10_sprint_speed"
    self.overweight_output_file = "output_data/overweight_players"
    self.overweight = 25.0
  
  def __data_overview(self):
    """
      Showing the maing characteristics of the data.
    """
    print("\n###############################")
    print("Overview of the Data:")
    print(f"Data shape: {(self.data.count(), len(self.data.columns))}")
    print("Data columns:")
    print(self.data.columns)
    print("Data first 5 columns:")
    data.limit(5).show()
    print("###############################\n")

  def __group_by_nationality(self):
    """
      Partitions and saves data by nationallity.
    """
    nationality = (
      self.data
      .select(
          F.col('Name'),
          F.col('Nationality')
      )
    )

    print("Showing Data with Nationality and Name")
    nationality.limit(5).show()

    saved_nationality_file = (
      nationality
      .write
      .format("orc")
      .option("header", "true")
      .mode('Overwrite')
      .partitionBy('Nationality')
      .save(self.nationality_output_file)
    )

    print("Data Succesfully Partitioned by Nationality and Saved!\n\n")

  def __compute_top_10_position(self):
    """
      Computes the top ten for each position based on Total Stats
      which is taken as the summary of the player Skills
    """
    
    window = Window.partitionBy(data['Position']).orderBy(data["Total Stats"].desc())

    top_10_position_data = (
      data.select(F.col("Position"), 
                  F.col("Name"), 
                  F.col("Total Stats"), 
                  F.rank().over(window).alias('ranking')) 
      .filter(F.col('ranking') <= 10) 
    ) 

    print("Showing data grouped by Position")
    top_10_position_data.limit(20).show()

    saved_club_position_file = (
      top_10_position_data
      .write
      .format("orc")
      .option("header", "true")
      .mode('Overwrite')
      .partitionBy('Position')
      .save(self.top_10_position_output_file)
    )

    print("Data Succesfully Partitioned by Position and Saved!\n\n")

  def __group_by_club_position(self):
    """
      Groups Data by club and position and partitions by club.
    """

    club_position_data = (
      self.data
      .select(F.col("Club"),
              F.col("Position"),
              F.col("Name"))
      .groupby("Club", "Position")
      .agg(F.count(F.col("Name")).alias("Count"))
      .orderBy("Club")
      .dropna()
    )

    print("Showing data grouped by club and Position")
    club_position_data.limit(5).show()

    saved_club_position_file = (
      club_position_data
      .write
      .format("orc")
      .option("header", "true")
      .mode('Overwrite')
      .partitionBy('Club')
      .save(self.club_position_output_file)
    )

    print("Data Succesfully Partitioned by Club and Saved!\n\n")
  
  def __compute_top_10_speed(self):
    """
      Computes average Sprint Speed for each club and then selects the top 10.
    """

    top_10_sprint_speed_data = (
      self.data
      .select(F.col("Club"),  F.col("Sprint Speed"))
      .groupBy(F.col("Club"))
      .agg(F.mean(F.col("Sprint Speed")).alias("Average Sprint Speed"))
      .orderBy(F.col("Average Sprint Speed"), ascending=False)
      .limit(10)
    )

    print("Showing Top 10 Sprint Speed Teams")
    top_10_sprint_speed_data.show()

    saved_top_10_sprint_speed_file = (
      top_10_sprint_speed_data 
      .coalesce(1)
      .write
      .format("orc")
      .option("header", "true")
      .mode('Overwrite')
      .save(self.top_10_sprint_speed_output_file)
    )

    print("Top Ten Average Sprint Speed Teams Saved Successfully!\n\n")

  def __select_overweight_players(self):
    """
      Computes IMC and then select those players with IMC > 25.
    """

    def compute_imc(height, weight):
      """
        Function that computes the IMC.
        height: String of the form: foot'inches""
        weight: String of the form: weightlbs
      """

      foot, inches = str(height)[:-2].split("'")[0], str(height)[:-2].split("'")[1]
      height = (int(foot) * 12) + int(inches) 
      weight = int(str(weight)[:-3])
      imc = round(703 * weight / (height ** 2), 4)
      
      return imc

    compute_imc_udf = F.udf(compute_imc, T.FloatType())

    overweight_players = (
        self.data
        .select(F.col("Name"), F.col("Weight"), F.col("Height"))
        .withColumn("IMC", compute_imc_udf("Height", "Weight"))
        .filter(F.col("IMC") > self.overweight)
    )

    print(f"There are {overweight_players.cache().count()} players with overweight, showing first 10.")  
    overweight_players.limit(10).show()


    overweight_players_saved_file = (
      overweight_players
      .coalesce(1)
      .write
      .format("orc")
      .option("header", "true")
      .mode('Overwrite')
      .save(self.overweight_output_file)
    )

    print("Overweighted Players Saved Successfully!\n\n")

  def execute_analysis(self):
    """
      Executes all the methods for resolving the test.
    """

    self.__data_overview()
    self.__group_by_nationality()
    self.__compute_top_10_position()
    self.__group_by_club_position()
    self.__compute_top_10_speed()
    self.__select_overweight_players()

In [146]:
data_analyzer = DataAnalyzer()
print("Starting to Resolve Test!")
start = perf_counter()
data_analyzer.execute_analysis()
print(f"Test Done Successfully in {round(perf_counter() - start, 4)} seconds")

Starting to Resolve Test!

###############################
Overview of the Data:
Data shape: (17125, 107)
Data columns:
['ID', 'Name', 'Age', 'OVA', 'Nationality', 'Club', 'BOV', 'BP', 'Position', 'Player Photo', 'Club Logo', 'Flag Photo', 'POT', 'Team & Contract', 'Height', 'Weight', 'foot', 'Growth', 'Joined', 'Loan Date End', 'Value', 'Wage', 'Release Clause', 'Contract', 'Attacking', 'Crossing', 'Finishing', 'Heading Accuracy', 'Short Passing', 'Volleys', 'Skill', 'Dribbling', 'Curve', 'FK Accuracy', 'Long Passing', 'Ball Control', 'Movement', 'Acceleration', 'Sprint Speed', 'Agility', 'Reactions', 'Balance', 'Power', 'Shot Power', 'Jumping', 'Stamina', 'Strength', 'Long Shots', 'Mentality', 'Aggression', 'Interceptions', 'Positioning', 'Vision', 'Penalties', 'Composure', 'Defending', 'Marking', 'Standing Tackle', 'Sliding Tackle', 'Goalkeeping', 'GK Diving', 'GK Handling', 'GK Kicking', 'GK Positioning', 'GK Reflexes', 'Total Stats', 'Base Stats', 'W/F', 'SM', 'A/W', 'D/W', 'IR', 