In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os

In [37]:
# Create a spark session
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

# Get the path of the foler containing all our Data
root_dir = '/home/hadoop/AllData/Fixtures' # Needs to be changed

# Define a function that returns only the league we need
def league_id_dir(folder_name,id):
    return folder_name.startswith(id)

In [9]:
# Define the Schema of the DataFrame
Data_schema =StructType([
StructField('fixture_id',IntegerType(),True),  
StructField('fixture_referee',StringType(),True),  
StructField('fixture_timezone',StringType(),True),  
StructField('fixture_date',StringType(),True),  
StructField('fixture_timestamp',IntegerType(),True),  
StructField('fixture_periods_first',IntegerType(),True),  
StructField('fixture_periods_second',IntegerType(),True),
StructField('venue_id',IntegerType(),True), 
StructField('venue_name',StringType(),True),  
StructField('venue_city',StringType(),True),  
StructField('status_long',StringType(),True),  
StructField('status_short',StringType(),True),  
StructField('status_elapsed',IntegerType(),True),  
StructField('league_id',IntegerType(),True),  
StructField('league_name',StringType(),True),  
StructField('league_country',StringType(),True),  
StructField('league_logo',StringType(),True),  
StructField('league_flag',StringType(),True),  
StructField('league_season',IntegerType(),True),  
StructField('league_round',StringType(),True),  
StructField('teams_home_id',IntegerType(),True),  
StructField('teams_home_name',StringType(),True),  
StructField('teams_home_logo',StringType(),True),  
StructField('teams_home_winner',BooleanType(),True),  
StructField('teams_away_id',IntegerType(),True),  
StructField('teams_away_name',StringType(),True),  
StructField('teams_away_logo',StringType(),True),  
StructField('teams_away_winner',BooleanType(),True),  
StructField('goals_home',IntegerType(),True),  
StructField('goals_away',IntegerType(),True),  
StructField('score_halftime_home',IntegerType(),True),  
StructField('score_halftime_away',IntegerType(),True),  
StructField('score_fulltime_home',IntegerType(),True),  
StructField('score_fulltime_away',IntegerType(),True),  
StructField('score_extratime_home',IntegerType(),True),  
StructField('score_extratime_away',IntegerType(),True),  
StructField('score_penalty_home',IntegerType(),True),  
StructField('score_penalty_away',IntegerType(),True)])

In [None]:
# Get the path of the foler containing all our Data
root_dir = '/home/hadoop/AllData' # Needs to be changed
# List of the leagues IDs
leagues = ['39', '135', '140', '78', '61']
# Create an empty DataFrame
emp_RDD = spark.sparkContext.emptyRDD()
# Iterate throw all the leagues IDs
for i in leagues:
    # Get the league file
    directories = [dir for dir in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, dir)) and league_id_dir(dir,i)]
    combined_df = spark.createDataFrame(data = emp_RDD, schema=Data_schema)
    for folder in directories:
        folder_path = os.path.join(root_dir, folder)
        files = os.listdir(folder_path)
        csv_files = [file for file in files if file.endswith('.csv')]
        for csv_file in csv_files:
            file_path = os.path.join(folder_path, csv_file)
            df = spark.read.csv(file_path, header=True, inferSchema=True)
            combined_df = combined_df.union(df)
    combined_df = df.repartition(1)
    combined_df.write.csv('Fixtures/'+str(i)+'data.csv', header=True)

# Murging the data by season and by league into one csv file

In [1]:
import pandas as pd
import os

In [28]:
# Get the path of the foler containing all our Data
root_dir = '/home/hadoop/AllData' # Needs to be changed

# Define a function that returns only the league we need
def league_id_dir(folder_name,id):
    return folder_name.startswith(id)

In [46]:
# Get the path of the foler containing all our Data
root_dir = '/home/hadoop/AllData' # Needs to be changed

# List of the leagues IDs
leagues = ['39', '135', '140', '78', '61']


# Iterate throw all the leagues IDs
for i in leagues:
    # Get the league file
    directories = [dir for dir in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, dir)) and league_id_dir(dir,i)]
    combined_df = pd.DataFrame()

    for folder in directories:
        folder_path = os.path.join(root_dir, folder)
        files = os.listdir(folder_path)
        csv_files = [file for file in files if file.endswith('.csv')]
    
        for csv_file in csv_files:
            file_path = os.path.join(folder_path, csv_file)
            df = pd.read_csv(file_path)
            combined_df = pd.concat([combined_df,df])

    combined_df.to_csv('Fixtures/'+str(i)+'data.csv', index=False)

# Define the fuctions for Data Cleaning

## Drop the uneccecery columns

In [2]:
def Drop_columns(df):
    # Drop the uneccecery columns
    to_drop = ['fixture_periods_first', 'fixture_periods_second', 'fixture_timestamp', 'fixture_timezone', 'score_extratime_home', 'score_extratime_away','score_penalty_home','score_penalty_away']
    df.drop(columns=to_drop, inplace=True)
    return df

## Coversion of the float column type to int

In [3]:
def Convert_Data_types(df):
    columns = ['goals_home','goals_away','score_halftime_home' ,'score_halftime_away','score_fulltime_home' ,'score_fulltime_away', 'status_elapsed']
    for col in columns:
        df[col] = df[col].fillna(0).astype(int)

    df['teams_home_winner'].fillna('0', inplace=True)
    df['teams_away_winner'].fillna('0', inplace=True)

    df['teams_home_winner'] = df['teams_home_winner'].astype(str)
    df['teams_away_winner'] = df['teams_away_winner'].astype(str)

    return df

## Convert the date column format to YYYY-MM-DD

In [4]:
def Convert_Date(df):
    # Keep the date format as YYYY-MM-DD
    df['fixture_date'] = pd.to_datetime(df['fixture_date']).dt.date
    df.sort_values('fixture_date', inplace=True)
    return df

## Loading the Leagues csv files and perform the cleaning

In [9]:
files_path = '/home/hadoop/AllData/Fixtures'

# List all files in the directory
file_list = os.listdir(files_path)

# Iterate through each file
for filename in file_list:
    file_path = os.path.join(files_path, filename)

    # Read the CSV file using Pandas
    df = pd.read_csv(file_path)

    #Drop the unwanted columns
    df = Drop_columns(df)

    #Data type conversion (from float to int)
    df = Convert_Data_types(df)

    #Convert the date format
    df = Convert_Date(df)

    # Write the modified content back to the same file
    df.to_csv('/home/hadoop/AllData/Cleaned/'+filename, index=False)
    print(f"Processed {filename}")

Processed 78data.csv
Processed 140data.csv
Processed 39data.csv
Processed 61data.csv
Processed 135data.csv
