## Initial Attempts to Read Data

In [6]:
import pandas as pd
import time

start_time = time.time()
df = pd.read_csv('en-fr.csv')
end_time = time.time()

print(f"Pandas read time: {end_time - start_time} seconds")
df.head()

KeyboardInterrupt: 

In [51]:
import dask.dataframe as dd

start_time = time.time()
df = dd.read_csv('en-fr.csv')
end_time = time.time()

print(f"Dask read time: {end_time - start_time} seconds")
df.head()

Dask read time: 0.014001607894897461 seconds


Unnamed: 0,en,fr
0,Changing Lives | Changing Society | How It Wor...,Il a transformé notre vie | Il a transformé la...
1,Site map,Plan du site
2,Feedback,Rétroaction
3,Credits,Crédits
4,Français,English


In [14]:
import ray

start_time = time.time()
df = ray.data.read_csv('en-fr.csv')
end_time = time.time()

print(f"Ray read time: {end_time - start_time} seconds")
df.schema()

2024-08-12 18:44:09,502	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in C:\Users\ADMINI~1\AppData\Local\Temp\ray\session_2024-08-12_18-43-29_320858_2652\logs\ray-data
2024-08-12 18:44:09,503	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]


Ray read time: 0.008155107498168945 seconds


- ReadCSV->SplitBlocks(4) 1: 0 bundle [00:00, ? bundle/s]

Running 0: 0 bundle [00:00, ? bundle/s]

Column  Type
------  ----
en      string
fr      string

In [18]:
import modin.pandas as modin

start_time = time.time()
df = modin.read_csv('en-fr.csv')
end_time = time.time()

print(f"Pandas read time: {end_time - start_time} seconds")
df.head()

RayTaskError(MemoryError): [36mray::_deploy_ray_func()[39m (pid=6176, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 1858, in ray._raylet.execute_task
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\modin\core\execution\ray\common\engine_wrapper.py", line 53, in _deploy_ray_func
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\modin\logging\logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\modin\core\storage_formats\pandas\parsers.py", line 362, in parse
    return PandasParser.generic_parse(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\modin\logging\logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\modin\core\storage_formats\pandas\parsers.py", line 222, in generic_parse
    pandas_df = callback(BytesIO(to_read), **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\modin\logging\logger_decorator.py", line 144, in run_and_log
    return obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\modin\core\storage_formats\pandas\parsers.py", line 386, in read_callback
    return pandas.read_csv(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\pandas\io\parsers\readers.py", line 1026, in read_csv
    return _read(filepath_or_buffer, kwds)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\pandas\io\parsers\readers.py", line 626, in _read
    return parser.read(nrows)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\pandas\io\parsers\readers.py", line 1923, in read
    ) = self._engine.read(  # type: ignore[attr-defined]
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\anaconda3\Lib\site-packages\pandas\io\parsers\c_parser_wrapper.py", line 234, in read
    chunks = self._reader.read_low_memory(nrows)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "parsers.pyx", line 838, in pandas._libs.parsers.TextReader.read_low_memory
  File "parsers.pyx", line 921, in pandas._libs.parsers.TextReader._read_rows
  File "parsers.pyx", line 1066, in pandas._libs.parsers.TextReader._convert_column_data
  File "parsers.pyx", line 1105, in pandas._libs.parsers.TextReader._convert_tokens
  File "parsers.pyx", line 1272, in pandas._libs.parsers.TextReader._convert_with_dtype
  File "parsers.pyx", line 1285, in pandas._libs.parsers.TextReader._string_convert
  File "parsers.pyx", line 1514, in pandas._libs.parsers._string_box_utf8
numpy.core._exceptions._ArrayMemoryError: Unable to allocate 2.00 MiB for an array with shape (262144,) and data type object

## Computational Efficientcy Analysis

Dask and Ray both performs fairly well, taking around 0.01 seconds to read the dataset. However, pandas and modin both run into the problem of not being able to allocate enough space for the dataset when running on my computer. The performance of Ray is slightly better, but we will proceed with Dask since Ray had some errors.

## Perform Basic Data Validation

In [106]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(' ','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df[sorted(df.columns)]
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [108]:
%%writefile file.yaml
file_type: csv
dataset_name: English_French_Translation
file_name: en-fr
table_name: English_French_Translation
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - en
    - fr

Overwriting file.yaml


In [1]:
import utility as util
config_data = util.read_config_file("file.yaml")

In [5]:
import dask.dataframe as dd
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = dd.read_csv(source_file)

df.head()

Unnamed: 0,en,fr
0,Changing Lives | Changing Society | How It Wor...,Il a transformé notre vie | Il a transformé la...
1,Site map,Plan du site
2,Feedback,Rétroaction
3,Credits,Crédits
4,Français,English


In [113]:
df.shape[0].compute()

22520376

In [7]:
util.col_header_val(df,config_data)

column name and column length validation passed


1

## Write the gz file

In [11]:
file_type = 'gz'
compression_type = 'gzip'
output_file = "./" + config_data['table_name'] + f'.{file_type}'
df.to_csv(output_file, sep=config_data['outbound_delimiter'], compression=compression_type, single_file=True)
## Here my computer's RAM does not support running file of this size. It should work if we use a server or other processor with larger size.

ParserError: Error tokenizing data. C error: out of memory