In [2]:
import pyspark
import pandas

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Combine Player Datasets") \
    .getOrCreate()

# Load the three datasets
# Assuming the datasets are in CSV format with headers
# Replace the file paths with your actual data sources
player_valuations = spark.read.csv("data/raw/kaggle/player-scores/player_valuations.csv", header=True, inferSchema=True)
players_stats = spark.read.csv("data/raw/kaggle/player-scores/players.csv", header=True, inferSchema=True)
transfers = spark.read.csv("data/raw/kaggle/player-scores/transfers.csv", header=True, inferSchema=True)

# Display the schema of each dataset to understand their structure
print("Player Valuations Schema:")
player_valuations.printSchema()

print("Players Stats Schema:")
players_stats.printSchema()

print("Transfers Schema:")
transfers.printSchema()

# Get column names from each dataset to identify duplicates
valuation_cols = set(player_valuations.columns)
stats_cols = set(players_stats.columns)
transfers_cols = set(transfers.columns)

# Identify duplicate columns (excluding player_id which we use for joining)
duplicate_cols_stats = [col for col in stats_cols if col in valuation_cols and col != "player_id"]
duplicate_cols_transfers = [col for col in transfers_cols if (col in valuation_cols or col in stats_cols) and col != "player_id"]

print(f"Duplicate columns in players_stats: {duplicate_cols_stats}")
print(f"Duplicate columns in transfers: {duplicate_cols_transfers}")

# Handle duplicate columns by renaming them in the secondary datasets
players_stats_renamed = players_stats
transfers_renamed = transfers

# Rename duplicate columns in players_stats to avoid conflicts
for dup_col in duplicate_cols_stats:
    players_stats_renamed = players_stats_renamed.withColumnRenamed(dup_col, f"stats_{dup_col}")

# Rename duplicate columns in transfers to avoid conflicts
for dup_col in duplicate_cols_transfers:
    transfers_renamed = transfers_renamed.withColumnRenamed(dup_col, f"transfers_{dup_col}")

# Perform joins with renamed columns
combined_data = player_valuations.join(
    players_stats_renamed,
    player_valuations["player_id"] == players_stats_renamed["player_id"],
    "inner"
).join(
    transfers_renamed,
    player_valuations["player_id"] == transfers_renamed["player_id"],
    "inner"
)

# Drop duplicate player_id columns
combined_data = combined_data.drop(players_stats_renamed["player_id"], transfers_renamed["player_id"])

# Show a preview of the combined dataset
print("Combined Dataset Preview:")
combined_data.show(5)

# Count the number of players in each dataset and in the combined dataset
print(f"Number of players in valuations dataset: {player_valuations.count()}")
print(f"Number of players in stats dataset: {players_stats.count()}")
print(f"Number of players in transfers dataset: {transfers.count()}")
print(f"Number of players in combined dataset: {combined_data.count()}")

# Use this instead:
combined_data.coalesce(1).write.option("header", True).mode("overwrite").csv("data/processed/combined_players")

# Or even better, to get a specific filename:
combined_data.toPandas().to_csv("data/processed/combined_kaggle.csv", index=False)

# Stop the Spark session
spark.stop()

Player Valuations Schema:
root
 |-- player_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- market_value_in_eur: integer (nullable = true)
 |-- current_club_id: integer (nullable = true)
 |-- player_club_domestic_competition_id: string (nullable = true)

Players Stats Schema:
root
 |-- player_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- last_season: integer (nullable = true)
 |-- current_club_id: integer (nullable = true)
 |-- player_code: string (nullable = true)
 |-- country_of_birth: string (nullable = true)
 |-- city_of_birth: string (nullable = true)
 |-- country_of_citizenship: string (nullable = true)
 |-- date_of_birth: timestamp (nullable = true)
 |-- sub_position: string (nullable = true)
 |-- position: string (nullable = true)
 |-- foot: string (nullable = true)
 |-- height_in_cm: integer (nullable = true)
 |-- contract_expiration_date: timestamp (

Py4JJavaError: An error occurred while calling o64.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.execution.SparkPlan$$anon$1._next(SparkPlan.scala:415)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:426)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:412)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1(SparkPlan.scala:449)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1$adapted(SparkPlan.scala:448)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$4272/0x0000000101703840.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3425/0x00000001014b1040.apply(Unknown Source)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4149)
	at org.apache.spark.sql.Dataset$$Lambda$4461/0x00000001017a7840.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.Dataset$$Lambda$2101/0x0000000100fae040.apply(Unknown Source)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset$$Lambda$1753/0x0000000100e2fc40.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1764/0x0000000100e36440.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1754/0x0000000100e30840.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)


In [7]:
# Revised code with memory optimizations
from pyspark.sql import Window
from pyspark.sql.functions import (
    collect_list, col, concat_ws, when, lit, first,
    isnan, isnull, row_number
)
from pyspark.sql.types import NumericType

# 1. Tune Spark configuration first (add these at Spark session initialization)
# spark = SparkSession.builder \
#     .config("spark.driver.memory", "8g") \
#     .config("spark.executor.memory", "4g") \
#     .getOrCreate()

# 2. Clean columns with memory-efficient operations
def clean_field(df, column_name):
    if column_name not in df.columns:
        return df
    
    data_type = df.schema[column_name].dataType
    is_numeric = isinstance(data_type, NumericType)
    
    condition = col(column_name).isNull() | (col(column_name) == "")
    if is_numeric:
        condition = condition | isnan(col(column_name))
    
    return df.withColumn(
        column_name,
        when(condition, lit("N/A")).otherwise(col(column_name))
    )

print("Cleaning data...")
for column in combined_data.columns:
    combined_data = clean_field(combined_data, column)

# 3. Optimize grouping and joining operations
compressed_data_temp = combined_data.select("player_id").distinct()

# Use window functions instead of multiple joins for static columns
window_spec = Window.partitionBy("player_id").orderBy(col("date").desc())

static_columns_aggregated = combined_data.withColumn(
    "row_num", row_number().over(window_spec)
).filter("row_num = 1").drop("row_num")

compressed_data_temp = compressed_data_temp.join(
    static_columns_aggregated.select(
        "player_id", *static_columns
    ),
    on="player_id",
    how="left"
)

# 4. Streamline history collection with limit
MAX_HISTORY_ENTRIES = 5  # Adjust based on memory constraints
for column in change_tracking_columns:
    if column in combined_data.columns:
        history_window = Window.partitionBy("player_id").orderBy(col("date").desc())
        limited_history = combined_data.withColumn(
            "rank", row_number().over(history_window)
        ).filter(f"rank <= {MAX_HISTORY_ENTRIES}").drop("rank")
        
        value_history = limited_history.groupBy("player_id").agg(
            concat_ws(", ", collect_list(col(column))).alias(f"{column}_history")
        )
        
        compressed_data_temp = compressed_data_temp.join(
            value_history,
            on="player_id",
            how="left"
        )

# 5. Optimized data quality check
print("\nRunning efficient data quality checks...")
compressed_data_temp.cache()  # Cache for multiple actions
total_count = compressed_data_temp.count()

# Batch null checks using single computation
null_counts = {}
for column in compressed_data_temp.columns:
    null_counts[column] = compressed_data_temp.filter(
        col(column).isNull() | (col(column) == "")
    ).count()

print("\nData Quality Report:")
for col_name, count in null_counts.items():
    if count > 0:
        print(f"Column '{col_name}' has {count} null/empty values ({count/total_count*100:.2f}%)")

# 6. Safe write operation
print("Saving data...")
try:
    # Write with multiple partitions
    (compressed_data_temp
     .repartition(10)  # Adjust based on dataset size
     .write
     .option("header", True)
     .mode("overwrite")
     .csv("data/processed/compressed_players")
    )
    print("Successfully wrote partitioned data")
except Exception as e:
    print(f"Write failed: {str(e)}")
    print("Consider using Parquet format for better compression")

compressed_data_temp.unpersist()
print("Operation complete")

Cleaning data quality issues...

Data Quality Report:


Py4JJavaError: An error occurred while calling o2419.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 743.0 failed 1 times, most recent failure: Lost task 2.0 in stage 743.0 (TID 1665) (NW61813.nwmissouri.edu executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: Java heap space


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\s536877\Documents\GitHub\finaldata\.venv\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ~~~~~~~~~~~~~~~~~~~~^^
  File "C:\Python313\Lib\socket.py", line 719, in readinto
    return self._sock.recv_into(b)
           ~~~~~~~~~~~~~~~~~~~~^^^
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\s536877\Documents\GitHub\finaldata\.venv\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\s536877\Documents\GitHub\finaldata\.venv\Lib\site-packages\py4j\clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
        "Error while sending o