<a href="https://colab.research.google.com/github/MunirahHF/Deployment/blob/main/File_ingestion_and_schema_validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!pip install modin[ray]
!pip install ray
!pip install pyyaml

# Required Libraries
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray
import os
import time

# Initialize Ray
ray.init(ignore_reinit_error=True)

# File Path
file_path = 'Latest Football  Players 2024 Data.csv'

# Pandas
start_time = time.time()
df_pandas = pd.read_csv(file_path)
pandas_time = time.time() - start_time

# Dask
start_time = time.time()
ddf = dd.read_csv(file_path)
dask_time = time.time() - start_time

# Modin
start_time = time.time()
df_modin = mpd.read_csv(file_path)
modin_time = time.time() - start_time

# Ray
import ray.data as rdf
start_time = time.time()
df_ray = rdf.read_csv(file_path)
ray_time = time.time() - start_time

print(f"Pandas read time: {pandas_time} seconds")
print(f"Dask read time: {dask_time} seconds")
print(f"Modin read time: {modin_time} seconds")
print(f"Ray read time: {ray_time} seconds")






2024-08-04 02:31:31,861	INFO worker.py:1614 -- Calling ray.init() again after it has already been called.


Pandas read time: 0.0071184635162353516 seconds
Dask read time: 0.01932382583618164 seconds
Modin read time: 0.046150922775268555 seconds
Ray read time: 2.0954504013061523 seconds


Pandas is the quickest for this task, with Dask and Modin also performing well. Ray shows significantly longer read times

In [6]:
# Clean column names by removing special characters and white spaces
def clean_column_names(df):
    df.columns = df.columns.str.replace('[^A-Za-z0-9]+', '_').str.strip()
    return df

df_pandas = clean_column_names(df_pandas)


In [9]:
import yaml

# Define the schema
schema = {
    'separator': ',',
    'columns': [
        'players_clubs',
        'players_goals',
        'players_assists',
        'players_ratings',
        'players_name',
        'players_tenor_in_one_club',
        'players_matches'
    ]
}

# Write to a YAML file
with open('config.yaml', 'w') as file:
    yaml.dump(schema, file)

print("YAML file created successfully.")


YAML file created successfully.


In [10]:


def validate_columns(df, yaml_file):
    with open(yaml_file, 'r') as file:
        config = yaml.safe_load(file)

    expected_columns = config['columns']
    actual_columns = df.columns.tolist()

    if sorted(expected_columns) == sorted(actual_columns):
        print("Column validation successful.")
    else:
        print("Column validation failed.")
        print("Expected columns:", expected_columns)
        print("Actual columns:", actual_columns)

# Validate columns for Pandas DataFrame
validate_columns(df_pandas, 'config.yaml')


Column validation failed.
Expected columns: ['players_clubs', 'players_goals', 'players_assists', 'players_ratings', 'players_name', 'players_tenor_in_one_club', 'players_matches']
Actual columns: ['Teams', 'Seasons', 'Players', 'Matches', 'Goals', 'Assists', 'Seasons Ratings']


In [11]:
# Save DataFrame to a pipe-separated gzipped file
df_pandas.to_csv('output_file.txt.gz', sep='|', compression='gzip', index=False)


In [12]:
file_size = os.path.getsize('output_file.txt.gz')

print(f"Total number of rows: {df_pandas.shape[0]}")
print(f"Total number of columns: {df_pandas.shape[1]}")
print(f"File size: {file_size} bytes")


Total number of rows: 1216
Total number of columns: 7
File size: 13020 bytes
