<a href="https://colab.research.google.com/github/GiannisKarampinis/VolleyballDataAnalysis/blob/main/PySpark_Semester_Project_i_karampinis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Big Data - PySpark - Semester Project
You have been asked to process the provided csv file named "Mens-Volleyball-PlusLiga-2008-2023.csv" to extract useful data out of it with the utilization of Apache Spark and specifically PySpark.

The first step is to set up google colab for running PySpark by executing the following code block ([source](https://medium.com/@dipan.saha/pyspark-made-easy-day-2-execute-pyspark-on-google-colabs-f3e57da946a)). After initializing properly the google colab environment, a PySpark Session is necessary to be created here as well.


In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VolleyballDataAnalysis").master("local[*]").getOrCreate()
spark

In [None]:
%cd '/content/sample_data'
!git clone 'https://github.com/GiannisKarampinis/VolleyballDataAnalysis'

source: https://stackoverflow.com/questions/50818788/using-git-clone-in-google-colab-and-using-a-csv-file

In [3]:
init_df = spark.read.csv("/content/sample_data/VolleyballDataAnalysis/Mens-Volleyball-PlusLiga-2008-2023.csv", header=True, inferSchema=True)

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
init_df = init_df.withColumn("line_number", monotonically_increasing_id())
init_df.show()

Make all the necessary imports for the used functions of this project.

In [5]:
from pyspark.sql.functions import regexp_replace, upper, concat, col, lit, to_timestamp, concat_ws, when, count, desc
from pyspark.sql.functions import sum as _sum
from pyspark.sql.types import StringType, StructField, StructType, IntegerType, DoubleType

## 1st Exercise - 1
For columns **percentage_cols = ['T1_Srv_Eff', 'T1_Rec_Pos', 'T1_Rec_Perf', 'T1_Att_Kill_Perc', 'T1_Att_Eff', 'T1_Att_Sum', 'T2_Srv_Eff', 'T2_Rec_Pos', 'T2_Rec_Perf', 'T2_Att_Kill_Perc', 'T2_Att_Eff', 'T2_Att_Sum']**, remove percentage sign ('%') and depict the result without truncation.


In [None]:
# 1st Exercise - 1:
df1 = init_df  # modify the copy (df1) and not the original df

percentage_cols = ['T1_Srv_Eff', 'T1_Rec_Pos', 'T1_Rec_Perf', 'T1_Att_Kill_Perc', 'T1_Att_Eff',
                    'T1_Att_Sum', 'T2_Srv_Eff', 'T2_Rec_Pos', 'T2_Rec_Perf', 'T2_Att_Kill_Perc',
                    'T2_Att_Eff', 'T2_Att_Sum']

for colu in percentage_cols:
    df1 = df1.withColumn("{}".format(colu), regexp_replace("{}".format(colu), '%', ''))
df1.show(truncate=False)

## 1st Exercise - 2
Convert team names to uppercase.

In [None]:
# 1st Exercise - 2:
for colu in ['Team_1', 'Team_2']:
    df1 = df1.withColumn(colu, upper(colu))
df1.show()

## 1st Exercise - 3
Calculate and save to a variable the number of games read from csv.

In [8]:
# 1st Exercise - 3:
numOfGames = df1.count()
print(numOfGames)

2639


## 1st Exercise - 4
Calculate the number of sets per match.

In [None]:
# 1st Exercise - 4:
df2 = init_df
df2 = df2.withColumn("Sets_per_Game", sum(df2[colu] for colu in ["T1_Score", "T2_Score"]))
#df3 = df2.select("Sets_per_Game")
df2.show()

## 1st Exercise - 5
Calculate the total number of games per total number of sets.

In [10]:
# 1st Exercise - 5:
df3 = df2.select("Sets_per_Game")

# Total sets:
l2 = df3.groupBy().sum().collect()[0][0]

data = [(numOfGames/l2,)]
df = spark.createDataFrame(data)

df.coalesce(1).write.csv('/content/sample_data/games_div_by_sets', header = False, mode="overwrite")

## 1st Exercise - 6
Calculate and write to a csv file the number of games that each team took place (notice that the team could be either home or away).

In [11]:
# 1st Exercise - 6:
df6 = df1

home_teams = df6.select('Team_1')
away_teams = df6.select('Team_2')

un_df = home_teams.select('Team_1').union(away_teams.select('Team_2')).withColumnRenamed("Team_1", "Team")
un_df = un_df.groupBy("Team").count()

# PySpark will write a lot of csv files in parallel from its nature.
# We will use coalesce to merge those data in a single csv file.
# source: https://sparkbyexamples.com/spark/spark-write-dataframe-single-csv-file/
# However, caution should be taken in case of very very large data sets as coalesce is a computationally expensive operation.

un_df.coalesce(1).write.csv('/content/sample_data/num_of_matches_per_team', header=True, mode="overwrite")

## 2nd Exercise
Create and save to a csv file a holistic data analysis table for all the teams found inside the initial csv file. **For each team**, present in descending order per total games won:
1.   Number of games (home, away and total).
2.   Sets won and lost.
3.   Points won and lost.


In [None]:
# 2nd Exercise - 1:
# OUTPUT THIS TO CSV

# df6 is the initial dataframe for each subsequent task of exercise 2.

# To find out how many times a team won, we could find the number of times team won as a HomeTeam when Winner==0 and AwayTeam when Winner==1.

# How many times team won as a Home Team:
df7 = df6.select(['Team_1', 'Winner'])
home_games = df7.groupBy('Team_1').count().withColumnRenamed("count", "HomeGames")
home_games = home_games.withColumnRenamed("Team_1", "Team")
home_games_won = df7.groupBy('Team_1').agg(count(when(col("Winner") == 0, True)))
home_games_won = home_games_won.withColumnRenamed("Team_1", "Team")

#home_games.show()
#home_games_won.show()

# How many times team won as an Away Team:
df8 = df6.select(["Team_2", "Winner"])
away_games = df8.groupBy('Team_2').count().withColumnRenamed("count", "AwayGames")
away_games = away_games.withColumnRenamed("Team_2", "Team")
away_games_won = df8.groupBy('Team_2').agg(count(when(col("Winner") == 1, True)))
away_games_won = away_games_won.withColumnRenamed("Team_2", "Team")

#away_games.show()
#away_games_won.show()

# Join the results from the previous 2 dataframes:
joined_df1 = un_df.alias("un_df")\
    .join(home_games_won.alias("home_games_won"), col("un_df.Team") == col("home_games_won.Team"))\
    .join(away_games_won.alias("away_games_won"), col("un_df.Team") == col("away_games_won.Team"))\
    .join(home_games.alias("home_games"), col("un_df.Team") == col("home_games.Team")) \
    .join(away_games.alias("away_games"), col("un_df.Team") == col("away_games.Team")) \
    .select("un_df.Team", "un_df.count", "count(CASE WHEN (Winner = 0) THEN true END)", "count(CASE WHEN (Winner = 1) THEN true END)", "home_games.HomeGames", "away_games.AwayGames")

joined_df1 = joined_df1.withColumnRenamed("count(CASE WHEN (Winner = 0) THEN true END)", "HomeGamesWon")
joined_df1 = joined_df1.withColumnRenamed("count(CASE WHEN (Winner = 1) THEN true END)", "AwayGamesWon")
joined_df1 = joined_df1.withColumn("TotalGamesWon", col("HomeGamesWon") + col("AwayGamesWon"))
joined_df1 = joined_df1.withColumnRenamed("count", "TotalGames")
joined_df1.show()

In [None]:
# 2nd Exercise - 2:

sets = df6.select("Team_1", "Team_2", "T1_Score", "T2_Score", "T1_Sum", "T2_Sum", "Winner")

sets_home_won = sets.groupBy("Team_1").agg(_sum(sets.T1_Score))
sets_away_won = sets.groupBy("Team_2").agg(_sum(sets.T2_Score))

sets_won = sets_home_won.alias("sets_home_won").join(sets_away_won.alias("sets_away_won"), col("sets_home_won.Team_1") == col("sets_away_won.Team_2"))

sets_won = sets_won.withColumnRenamed("sum(T1_Score)", "SetsWonAsHomeTeam")
sets_won = sets_won.withColumnRenamed("sum(T2_Score)", "SetsWonAsAwayTeam")

#sets_won.show()

f_sets_won = sets_won.withColumn("SetsWon", col("SetsWonAsHomeTeam") + col("SetsWonAsAwayTeam"))

f_sets_won = f_sets_won.withColumnRenamed("Team_1", "Team_1_won")
f_sets_won = f_sets_won.drop("SetsWonAsHomeTeam")
f_sets_won = f_sets_won.drop("SetsWonAsAwayTeam")
f_sets_won = f_sets_won.drop("Team_2")

#f_sets_won.show()

sets_home_lost = sets.groupBy("Team_1").agg(_sum(col("T2_Score")))
sets_away_lost = sets.groupBy("Team_2").agg(_sum(col("T1_Score")))

sets_lost = sets_home_lost.alias("sets_home_lost").join(sets_away_lost.alias("sets_away_lost"),
                                                      col("sets_home_lost.Team_1") == col("sets_away_lost.Team_2"))
sets_lost = sets_lost.withColumnRenamed("sum(T2_Score)", "SetsLostAsHomeTeam")
sets_lost = sets_lost.withColumnRenamed("sum(T1_Score)", "SetsLostAsAwayTeam")

#sets_lost.show()

f_sets_lost = sets_lost.withColumn("SetsLost", col("SetsLostAsHomeTeam") + col("SetsLostAsAwayTeam"))

f_sets_lost = f_sets_lost.withColumnRenamed("Team_1", "Team_1_lost")
f_sets_lost= f_sets_lost.drop("SetsLostAsHomeTeam")
f_sets_lost = f_sets_lost.drop("SetsLostAsAwayTeam")
f_sets_lost = f_sets_lost.drop("Team_2")

#f_sets_lost.show()

joined_df1 = joined_df1.alias("joined_df1")\
            .join(f_sets_won.alias("f_sets_won"), col("joined_df1.Team") == col("f_sets_won.Team_1_won"))\
            .join(f_sets_lost.alias("f_sets_lost"), col("joined_df1.Team") == col("f_sets_lost.Team_1_lost"))

joined_df1 = joined_df1.drop("Team_1_lost")
joined_df1 = joined_df1.drop("Team_1_won")
joined_df1.show()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 2nd Exercise - 3:
points_home_won = sets.groupBy("Team_1").agg(_sum(sets.T1_Sum))
points_away_won = sets.groupBy("Team_2").agg(_sum(sets.T2_Sum))

#points_home_won.show()

points_won = points_home_won.alias("points_home_won").join(points_away_won.alias("points_away_won"),
                                                      col("points_home_won.Team_1") == col("points_away_won.Team_2"))

points_won = points_won.withColumnRenamed("sum(T1_Sum)", "pointsWonAsHomeTeam")
points_won = points_won.withColumnRenamed("sum(T2_Sum)", "pointsWonAsAwayTeam")

#points_won.show()

f_points_won = points_won.withColumn("pointsWon", col("pointsWonAsHomeTeam") + col("pointsWonAsAwayTeam"))

f_points_won = f_points_won.withColumnRenamed("Team_1", "Team_1_won")
f_points_won = f_points_won.drop("pointsWonAsHomeTeam")
f_points_won = f_points_won.drop("pointsWonAsAwayTeam")
f_points_won = f_points_won.drop("Team_2")

#f_points_won.show()


points_home_lost = sets.groupBy("Team_1").agg(_sum(sets.T2_Sum))
points_away_lost = sets.groupBy("Team_2").agg(_sum(sets.T1_Sum))

points_lost = points_home_lost.alias("points_home_lost").join(points_away_lost.alias("points_away_lost"),
                                                      col("points_home_lost.Team_1") == col("points_away_lost.Team_2"))
points_lost = points_lost.withColumnRenamed("sum(T2_Sum)", "pointsLostAsHomeTeam")
points_lost = points_lost.withColumnRenamed("sum(T1_Sum)", "pointsLostAsAwayTeam")

#points_lost.show()

f_points_lost = points_lost.withColumn("pointsLost", col("pointsLostAsHomeTeam") + col("pointsLostAsAwayTeam"))

f_points_lost = f_points_lost.withColumnRenamed("Team_1", "Team_1_lost")
f_points_lost= f_points_lost.drop("pointsLostAsHomeTeam")
f_points_lost = f_points_lost.drop("pointsLostAsAwayTeam")
f_points_lost = f_points_lost.drop("Team_2")

#f_points_lost.show()

joined_df1 = joined_df1.alias("joined_df1")\
            .join(f_points_won.alias("f_points_won"), col("joined_df1.Team") == col("f_points_won.Team_1_won"))\
            .join(f_points_lost.alias("f_points_lost"), col("joined_df1.Team") == col("f_points_lost.Team_1_lost"))


joined_df1 = joined_df1.drop("Team_1_lost")
joined_df1 = joined_df1.drop("Team_1_won")

joined_df1 = joined_df1.orderBy(desc("TotalGamesWon"))
joined_df1 = joined_df1.select("Team", "HomeGames", "AwayGames", "TotalGames", "SetsWon", "SetsLost", "PointsWon", "PointsLost")
joined_df1.coalesce(1).write.csv('/content/sample_data/final_output', header = True, mode="overwrite")
joined_df1.show(truncate=False)