In [3]:
from pyspark.sql.functions import col, to_date, mean, count
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, DateType, StringType 
from pyspark.sql.functions import *
from pyspark.sql import functions as F

schemaPlayers = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("country_of_citizenship", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("position", StringType(), True),
    StructField("sub_position", StringType(), True)
])

schemaPlayerValuations = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("date", DateType(), True),
    StructField("market_value_in_eur", IntegerType(), True)
    #StructField("player_club_domestic_competition_id", StringType(), True)
])

schemaAppearances = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("player_club_id", IntegerType(), True),
    StructField("date", DateType(), True),
    StructField("player_name", StringType(), True),
    StructField("competition_id", StringType(), True),
    StructField("yellow_cards", IntegerType(), True),
    StructField("red_cards", IntegerType(), True),
    StructField("goals", IntegerType(), True),
    StructField("assists", IntegerType(), True),
    StructField("mins", IntegerType(), True),
])

schemaClubs = StructType([
    StructField("club_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("domestic_competition_id", StringType(), True)
])

schemaCompetitions = StructType([
    StructField("competition_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("sub_type", StringType(), True),
    StructField("type", StringType(), True),
    StructField("country_id", IntegerType(), True),
    StructField("country_name", StringType(), True),
    StructField("dlc", StringType(), True),
])

In [4]:
appearancesDF = spark.read.csv("/user/student/BD/out/appearances", header=True, 
                                schema=schemaAppearances)
appearancesDF.printSchema()

playerValuationsDF = spark.read.csv("/user/student/BD/out/player_valuations", header=True, 
                                schema=schemaPlayerValuations)
playerValuationsDF.printSchema()

playersDF = spark.read.csv("/user/student/BD/out/players", header=True, 
                                schema=schemaPlayers)
playersDF.printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- player_club_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- player_name: string (nullable = true)
 |-- competition_id: string (nullable = true)
 |-- yellow_cards: integer (nullable = true)
 |-- red_cards: integer (nullable = true)
 |-- goals: integer (nullable = true)
 |-- assists: integer (nullable = true)
 |-- mins: integer (nullable = true)

root
 |-- player_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- market_value_in_eur: integer (nullable = true)

root
 |-- player_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- country_of_citizenship: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- position: string (nullable = true)
 |-- sub_position: string (nullable = true)



In [5]:
dates = [("2017-07-01", "2018-06-20"), ("2018-07-01", "2019-06-20"),
         ("2019-07-01", "2020-06-20"), ("2020-07-01", "2021-06-20"),
         ("2021-07-01", "2022-06-20"), ("2022-07-01", "2023-06-20"), 
         ("2023-07-01", "2024-06-20")]

passport = [("IT1", "Italy"), ("L1", "Germany"), ("RU1", "Russia"), ("GB1", "England"), ("ES1", "Spain"),
           ("FR1", "France"), ("TR1", "Turkey")]

age_groups = [(16, 20), (21, 25), (26, 30)]

positions = ["Attack", "Midfield", "Defender"]

In [8]:
group_list = []

for start_date, end_date in dates:
    
    # Making DF with CL and EL leagues.
    # CL
    filtered_cl_games = appearancesDF.filter(
        (col("competition_id") == "CL") &
        (col("date") >= start_date) &
        (col("date") <= end_date)
    )

    allCLFeaturesDF = filtered_cl_games \
                        .groupBy("player_id") \
                        .agg(sum("goals").alias("cl_goals"), sum("assists").alias("cl_assists"), 
                            sum("yellow_cards").alias("cl_yc"),
                            sum("red_cards").alias("cl_rc"), round(mean("mins"), 2).alias("cl_mins"))
    
    # EL
    filtered_el_games = appearancesDF.filter(
        (col("competition_id") == "EL") &
        (col("date") >= start_date) &
        (col("date") <= end_date)
    )
    
    allELFeaturesDF = filtered_el_games \
                        .groupBy("player_id") \
                        .agg(sum("goals").alias("el_goals"), sum("assists").alias("el_assists"), 
                            sum("yellow_cards").alias("el_yc"),
                            sum("red_cards").alias("el_rc"), round(mean("mins"), 2).alias("el_mins"))

    
    # Valuation per season
    seasonPlayerValuations = playerValuationsDF \
        .where(f"date >= '{start_date}' and date <= '{end_date}'")
    
    startPlayerDates = seasonPlayerValuations.groupBy("player_id") \
        .agg(min("date").alias("date"))
    
    startPlayersValuations = startPlayerDates \
        .join(seasonPlayerValuations, ["player_id", "date"])
    
    startPlayerValuations = startPlayersValuations.select("player_id", 
                            startPlayersValuations["date"].alias("start_date"),
                            startPlayersValuations["market_value_in_eur"].alias("start_val"))
    
    lastPlayerDates = seasonPlayerValuations.groupBy("player_id") \
        .agg(max("date").alias("date"))
    
    lastPlayersValuations = lastPlayerDates \
        .join(seasonPlayerValuations, ["player_id", "date"])
    
    lastPlayerValuations = lastPlayersValuations.select("player_id", 
                           lastPlayersValuations["date"].alias("last_date"),
                           lastPlayersValuations["market_value_in_eur"].alias("last_val"))
    
    # Price difference
    valsDF = startPlayerValuations.join(lastPlayerValuations, "player_id")
    diffVal = valsDF.last_val - valsDF.start_val
    valsDF = valsDF.withColumn("change", diffVal)
    
    
    for passport_id, country in passport: 
        
        filtered_games = appearancesDF.filter(
        (col("competition_id") == passport_id) &
        (col("date") >= start_date) &
        (col("date") <= end_date)
        )
        
        allFeaturesDF = filtered_games \
                        .groupBy("player_id") \
                        .agg(sum("goals").alias("total_goals"), sum("assists").alias("total_assists"), 
                            sum("yellow_cards").alias("yellow_cards"),
                            sum("red_cards").alias("red_cards"), round(mean("mins"), 2).alias("minutes_played"), 
                            sum("mins").alias("sum_minutes_played"))
        
        resultDF = valsDF.join(allFeaturesDF, "player_id")
        
        age_days = datediff(lit(end_date), col("date_of_birth"))

        age = round(age_days / 365, 2)

        slicedPlayersDF = playersDF.select("player_id", "name",
                                  playersDF["country_of_citizenship"].alias("citizenship"),
                                  playersDF["position"].alias("pos"), age.alias("age")).dropna(subset=["age"])
        
        
        season = resultDF.join(slicedPlayersDF, "player_id", "inner")
        
        season_df = season.withColumn("minutes_played", round(col("minutes_played"), 2)) \
            .withColumn(
                "is_native",
                when(col("citizenship") == lit(country), 1).otherwise(0)
            )
        
        season_df = season_df.drop("citizenship")
        fullDF = season_df.select(
        "player_id", "name", "pos", "age", "is_native", "start_date", "start_val", "last_date", "last_val", 
        "total_goals", "total_assists", "yellow_cards", "red_cards", "minutes_played", "sum_minutes_played", 
        "change"
        ).join(allCLFeaturesDF, "player_id", "left").join(allELFeaturesDF, "player_id", "left").fillna(0)

        # Цикл по возрастным диапазонам и позициям
        for age_min, age_max in age_groups:
            for position in positions:
                group_name = f"{passport_id}_{age_min}_{age_max}_{position}"
                df = fullDF.filter((col('age') >= age_min) & (col('age') <= age_max) & (col('pos') == position))
                output_path = f"/user/student/BD/csv_handler/{group_name}"
                if group_name in group_list:
                    df.write.mode("append").csv(output_path, header=True)
                else:
                    df.write.mode("overwrite").csv(output_path, header=True)
                    group_list.append(group_name)
                    
                print(f"{group_name} successfully written.\n")

2024-06-26 18:26:34,103 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

IT1_16_20_Attack successfully written.



                                                                                

IT1_16_20_Midfield successfully written.



                                                                                

IT1_16_20_Defender successfully written.



                                                                                

IT1_21_25_Attack successfully written.



                                                                                

IT1_21_25_Midfield successfully written.



                                                                                

IT1_21_25_Defender successfully written.



                                                                                

IT1_26_30_Attack successfully written.



KeyboardInterrupt: 