
# Overview of the Code

This code performs a comprehensive data analysis process using PySpark. It combines multiple datasets to compute the top 3 highest valued player for every club and generate insights. Below is a general description of the steps involved:




## Data loading

- Create and configure Spark environment
- Configures Hadoop to support Google Cloud Storage as a filesystem.
- Enables seamless reading of files stored in GCS.

- Reads datasets from a GCS bucket into Spark DataFrames.


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Value_pipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
# load data
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")


#  Google Storage File Path
gsc_file_path_pla = 'gs://data_de2024_bz/players.csv'
# Create data frame
players_df = spark.read.format("csv").option("header", "true").option("delimiter", ",") \
       .load(gsc_file_path_pla)
players_df.printSchema()
players_df.show(5)

#  Google Storage File Path
gsc_file_path_com = 'gs://data_de2024_bz/competitions.csv'
# Create data frame
competitions_df = spark.read.format("csv").option("header", "true").option("delimiter", ",") \
       .load(gsc_file_path_com)
competitions_df.printSchema()
competitions_df.show(5)

root
 |-- player_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- last_season: string (nullable = true)
 |-- current_club_id: string (nullable = true)
 |-- player_code: string (nullable = true)
 |-- country_of_birth: string (nullable = true)
 |-- city_of_birth: string (nullable = true)
 |-- country_of_citizenship: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- sub_position: string (nullable = true)
 |-- position: string (nullable = true)
 |-- foot: string (nullable = true)
 |-- height_in_cm: string (nullable = true)
 |-- contract_expiration_date: string (nullable = true)
 |-- agent_name: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- url: string (nullable = true)
 |-- current_club_domestic_competition_id: string (nullable = true)
 |-- current_club_name: string (nullable = true)
 |-- market_value_in_eur: string (nullable = true)
 |--

### Rename and drop

- Rename values
- Drop unnecessary columsn

In [None]:
# renaming the values that otherwise would interfere
players_df = players_df.withColumnRenamed("name", "player_name")
competitions_df = competitions_df.withColumnRenamed("name", "competition_name")

# drop columns that are not used
players_df = players_df.drop('player_code', 'country_of_birth', 'city_of_birth', 'country_of_citizenship', 'date_of_birth', 'sub_position',
                             'position', 'foot', 'height_in_cm', 'contract_expiration_date', 'agent_name', 'image_url', 'url')
competitions_df = competitions_df.drop('competition_code', 'sub_type', 'city_of_birth', 'country_id', 'country_name', 'domestic_league_code',
                             'confederation', 'url', 'is_major_national_league')

### Joining and changing the dataframes

- Join the two dataframes
- Change and add columns
- Select the top 3 best players for all the clubs
- Create a dataframe with all the necessary columns

In [None]:
from pyspark.sql.functions import *
from pyspark.sql import Row, Window
from pyspark.sql.functions import col, sum as spark_sum


joinExpression = players_df["current_club_domestic_competition_id"] == competitions_df["competition_id"]
# default join is the inner join
merged_value_df = players_df.join(competitions_df, joinExpression,"left").drop("competition_id")
merged_value_df.show(20)

merged_value_df = merged_value_df.withColumn("market_value_in_eur", col("market_value_in_eur").cast("long"))

# Assuming the columns are 'competition_name', 'market_value', and 'player_name'
# Calculate total market value per competition
competition_totals = merged_value_df.groupBy("competition_name") \
    .agg(spark_sum("market_value_in_eur").alias("total_market_value"))

# Join the total market values back to the original DataFrame
df_with_totals = merged_value_df.join(competition_totals, on="competition_name", how="inner")

# Calculate the percentage of market value
result_df = df_with_totals.withColumn(
    "market_value_percentage_competition",
    (col("market_value_in_eur") / col("total_market_value")) * 100
)

# Reduce the decimals to 4
result_df = result_df.withColumn(
    "market_value_percentage_competition",
    format_number("market_value_percentage_competition", 4)
)

# create window ordered by the market_value_in_eur
window_spec = Window.partitionBy(col("current_club_name")).orderBy(col("market_value_in_eur").desc())

# Add a row number to each player's data
df_with_row_number = result_df.withColumn("rank", row_number().over(window_spec))

# Filter to include only the top 3 players per club
top_3_players = df_with_row_number.filter(col("rank") <= 3).drop("rank")
# Show the resulting DataFrame
final_df = top_3_players.select("current_club_name", "market_value_in_eur", "player_name", "competition_name","market_value_percentage_competition")

final_df.show(20)



+---------+----------+--------------+--------------------+-----------+---------------+------------------------------------+--------------------+-------------------+---------------------------+----------------+---------------+
|player_id|first_name|     last_name|         player_name|last_season|current_club_id|current_club_domestic_competition_id|   current_club_name|market_value_in_eur|highest_market_value_in_eur|competition_name|           type|
+---------+----------+--------------+--------------------+-----------+---------------+------------------------------------+--------------------+-------------------+---------------------------+----------------+---------------+
|       10|  Miroslav|         Klose|      Miroslav Klose|       2015|            398|                                 IT1|Società Sportiva ...|            1000000|                   30000000|         serie-a|domestic_league|
|       26|     Roman|  Weidenfeller|  Roman Weidenfeller|       2017|             16|          

### Exporting to BigQuery

This cell exports the processed data to a BigQuery table.

In [None]:
# Use the Cloud Storage bucket for temporary BigQuery export data the connector uses.
bucket = "temp_de2024_bz"
spark.conf.set('temporaryGcsBucket', bucket)
# Saving the data to BigQuery
final_df.write.format('bigquery') \
  .option('table', 'de2024-435509.labdataset.top3_valued_players') \
  .mode("overwrite") \
  .save()

In [None]:
# Stop the spark context
spark.stop()