In [12]:
import pandas as pd
import yaml
import dask.dataframe as dd
import modin.pandas as md
import ray
import time
import os

In [7]:
def validateFileWithYaml(filepath):
    with open(filepath, "r") as stream:
        try:
            return yaml.safe_load(stream)
        except Exception as e:
            print(e)

def validate(data, configuration):
    data.columns = data.columns.str.lower()
    data.columns = data.columns.str.strip()
    data.columns = list(map(lambda x: x.replace(" ", "_"), list(data.columns)))
    configuration["columns"] = list(map((lambda x: x.replace(" ", "_")), configuration["columns"]))
    configuration["columns"] = list(
        map((lambda x: x.strip()), configuration["columns"])
    )
    configuration["columns"] = list(
        map((lambda x: x.lower()), configuration["columns"])
    )
    specialCharactersAndNumbers = ['1234567890!@#$%^&*()-=+[]{}\\`~/?.,<>;:\'\"']
    for i in specialCharactersAndNumbers:
        data.columns = list(map(lambda x: x.replace(i, "_"), list(data.columns)))
    for i in configuration["columns"]:
        if i not in data.columns:
            print(f"column {i} present in configuration yaml file is not found in the data!")
            return False
    if len(configuration["columns"]) != len(list(data.columns)):
        return False
    return True

In [3]:
config_data = validateFileWithYaml("file.yaml")
ray.init(ignore_reinit_error=True)
start_time = time.time()
dask = dd.read_csv("2019-Nov.csv", delimiter=config_data["inbound_delimiter"])
end_time = time.time()
print(f"{end_time - start_time} seconds using dask")
start_time = time.time()
modin = md.read_csv("2019-Nov.csv", delimiter=config_data["inbound_delimiter"])
end_time = time.time()
print(f"{end_time - start_time} seconds using Modin")
start_time = time.time()
rayDf = ray.data.read_csv("2019-Nov.csv", delimiter=config_data["inbound_delimiter"])
end_time = time.time()
print(f"{end_time - start_time} seconds using Ray")
start_time = time.time()
df_pandas = pd.read_csv("2019-Nov.csv", delimiter=config_data["inbound_delimiter"])
end_time = time.time()
print(f"{end_time - start_time} seconds using pandas")

2024-07-18 22:13:42,022	INFO worker.py:1788 -- Started a local Ray instance.


0.021056413650512695 seconds using dask
35.30572843551636 seconds using Modin
2.4987645149230957 seconds using Ray
87.8527979850769 seconds using pandas


In [21]:
# as we can see, the best approach for reading the csv file would be to use dask as it is the most time efficient method
file_type = config_data["file_type"]
source_file = "./" + config_data["file_name"] + f".{file_type}"
df = dd.read_csv(source_file, delimiter=config_data["inbound_delimiter"])
if not validate(df, config_data):
    print("validation failed")
else:
    print("validation succeeded")
    output_file = "./output.txt.gz"
    df.to_csv(output_file, sep=config_data["outbound_delimiter"], index=False, compression="gzip", single_file=True)
    

validation succeeded


In [13]:
config_data = validateFileWithYaml("file.yaml")
file_type = config_data["file_type"]
source_file = "./" + config_data["file_name"] + f".{file_type}"
df = dd.read_csv(source_file, delimiter=config_data["inbound_delimiter"])
total_rows = len(df["brand"])
num_columns = len(df.columns)
file_stats = os.stat("./output.txt.gz")
file_size = file_stats.st_size
print(f"Total Rows: {total_rows}")
print(f"Total Columns: {num_columns}")
print(f"Gz File Size:{file_size} Bytes")

Total Rows: 67501979
Total Columns: 9
Gz File Size:2813799300
