<a href="https://colab.research.google.com/github/Yushamsi/Data-Ingestion-Combined-Flights/blob/main/Data-Ingestion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# *Data Ingestion*

In [None]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import os
import psutil
import time
import yaml
import testutility as util

In [None]:
# File path
file_path = 'your_large_file.csv'

## Reading the file

In [None]:
# Function to measure CPU and memory usage
def measure_usage(library_name, read_function, file_path):
    # Measure before reading
    start_cpu = psutil.cpu_percent(interval=1)
    start_memory = psutil.virtual_memory().used / (1024 ** 3)  # Convert to GB
    start_time = time.time()

    # Read file
    df = read_function(file_path)

    # Measure after reading
    end_cpu = psutil.cpu_percent(interval=1)
    end_memory = psutil.virtual_memory().used / (1024 ** 3)  # Convert to GB
    end_time = time.time()

    # Calculate differences
    cpu_usage = end_cpu - start_cpu
    memory_usage = end_memory - start_memory
    read_time_seconds = end_time - start_time
    read_time_minutes = read_time_seconds / 60

    # Print results
    print(f"{library_name} read time: {read_time_seconds:.2f} seconds ({read_time_minutes:.2f} minutes)")
    print(f"{library_name} CPU usage change: {cpu_usage:.2f}%")
    print(f"{library_name} Memory usage change: {memory_usage:.2f} GB")

Reading with Pandas

In [None]:
# Measure Pandas
measure_usage("Pandas", pd.read_csv, file_path)

Reading with Dask


In [None]:
# For Dask since the actual data loading operation (like converting to pandas DataFrame) happens when you perform an action (like compute for Dask), the direct approach works differently.
# For these, typically need to wrap the operation in a function if you're doing more than just reading.
measure_usage("Dask", lambda file: dd.read_csv(file).compute(), file_path)

Reading with Modin (Pandas on Ray)

In [None]:
# Modin with Ray
os.environ["MODIN_ENGINE"] = "ray"  # Use Ray as the backend
measure_usage("Modin Ray", mpd.read_csv, file_path)


Reading with Modin (Pandas on Ray)

In [None]:
# Modin with Dask
os.environ["MODIN_ENGINE"] = "dask"  # Use Dask as the backend
measure_usage("Modin Dask", mpd.read_csv, file_path)

# Cleaning Data

In [None]:
def clean_column_names(df):
    df.columns = df.columns.str.strip()
    df.columns = df.columns.str.replace(' ', '_', regex=True)
    df.columns = df.columns.str.replace('[^\\w\\s]', '', regex=True)
    return df

In [None]:
# Apply the function to your DataFrame
df = clean_column_names(df)

# Display the cleaned column names
print(df.columns)

#YAML

In [None]:
%%writefile schema.yaml
file_type: csv
dataset_name: testfile
file_name: test_data
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - city
    - price
    - distance

In [None]:
# Load the YAML schema
config_data = util.read_config_file("file.yaml")

# Access file settings and column names from the schema
read_separator = config_data['inbound_delimiter']
write_separator = config_data['outbound_delimiter']
columns = config_data['columns']

print(f"Read Separator: {read_separator}")
print(f"Write Separator: {write_separator}")
print(f"Columns: {columns}")


#Comparing Schema File to DF

In [None]:
util.col_header_val(df, config_data)

In [None]:
print("Columns of DF are:" , df.columns)
print("Columns of YAML are:" , config_data['columns'])

In [None]:
if util.col_header_val(df, config_data) == 0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine