In [16]:
import pandas as pd
import os
import random
import string
from tqdm import tqdm  # Import tqdm for the progress bar

# Specify the path to the existing dataset
existing_dataset_path = "books.csv"

# Define the target file size in bytes (3 GB)
target_file_size = 3 * 1024 * 1024 * 1024  # 3 GB

# Estimate the size of each row in bytes (adjust this based on your data)
# For simplicity, let's assume each row contributes around 300 bytes to the file size
estimated_row_size = 300

# Calculate the number of rows needed to reach the target file size
rows_to_add = int(target_file_size // estimated_row_size)

# Create a temporary file to store the new data
temp_file_path = "temp_dataset.csv"

# Open the existing dataset and create a new temporary file
with open(existing_dataset_path, 'r') as existing_file, open(temp_file_path, 'w') as temp_file:
    # Copy the header from the existing dataset to the temporary file
    header = existing_file.readline()
    temp_file.write(header)

    # Iterate over the existing dataset and copy rows to the temporary file
    for line in existing_file:
        temp_file.write(line)

    # Create a tqdm progress bar for appending new rows
    progress_bar = tqdm(total=rows_to_add, desc="Appending Rows")

    # Generate and append new rows to the temporary file to reach the target size
    for _ in range(rows_to_add):
        row_id = ''.join(random.choice(string.ascii_letters) for _ in range(10))
        data = ''.join(random.choice(string.ascii_letters) for _ in range(100))
        new_row = f"{row_id},{data}\n"
        temp_file.write(new_row)
        
        # Update the progress bar
        progress_bar.update(1)

    # Close the progress bar
    progress_bar.close()

os.replace(temp_file_path, existing_dataset_path)



Appending Rows: 100%|████████████████████████████████████████████████████| 10737418/10737418 [31:59<00:00, 5594.65it/s]


In [1]:
import pandas as pd
import time

start_time = time.time()
df = pd.read_csv('books.csv')
end_time = time.time()

execution_time = end_time - start_time
print(f"Pandas took {execution_time} seconds to read the file.")


  df = pd.read_csv('books.csv')


Pandas took 83.32999444007874 seconds to read the file.


In [3]:
import dask.dataframe as dd
import time

start_time = time.time()
df = dd.read_csv('books.csv')
df = df.compute()
end_time = time.time()

execution_time = end_time - start_time
print(f"Dask took {execution_time} seconds to read the file.")


  df = reader(bio, **kwargs)


Dask took 76.67320609092712 seconds to read the file.


In [4]:
data.info(memory_usage="deep")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11737437 entries, 0 to 11737436
Data columns (total 3 columns):
 #   Column       Dtype 
---  ------       ----- 
 0   Book Title   object
 1   Author Name  object
 2   Runtime      object
dtypes: object(3)
memory usage: 2.8 GB


In [17]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


In [36]:
%%writefile file.yaml
file_type: csv
dataset_name: books
file_name: books
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - book_title
    - author_name
    - runtime

Overwriting file.yaml


In [37]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [38]:
config_data['inbound_delimiter']


','

In [39]:
config_data


{'file_type': 'csv',
 'dataset_name': 'books',
 'file_name': 'books',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['book_title', 'author_name', 'runtime']}

In [25]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("books.csv",delimiter=',')
df_sample.head()

  df_sample = pd.read_csv("books.csv",delimiter=',')


Unnamed: 0,Book Title,Author Name,Runtime
0,Warrior Fae,"Caroline Peckham, Susanne Valenti",29 hrs and 37 mins
1,Choose Your Enemies,Sandy Mitchell,10 hrs and 4 mins
2,The Ritual,Shantel Tessier,16 hrs and 46 mins
3,Reckless,Elsie Silver,9 hrs and 58 mins
4,The Puppeteers,Jason Chaffetz,8 hrs and 49 mins


In [40]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
delimiter = config_data['inbound_delimiter']

df = pd.read_csv(source_file, delimiter=delimiter)
df.head()

  df = pd.read_csv(source_file, delimiter=delimiter)


Unnamed: 0,Book Title,Author Name,Runtime
0,Warrior Fae,"Caroline Peckham, Susanne Valenti",29 hrs and 37 mins
1,Choose Your Enemies,Sandy Mitchell,10 hrs and 4 mins
2,The Ritual,Shantel Tessier,16 hrs and 46 mins
3,Reckless,Elsie Silver,9 hrs and 58 mins
4,The Puppeteers,Jason Chaffetz,8 hrs and 49 mins


In [43]:
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [44]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['book_title', 'author_name', 'runtime'], dtype='object')
columns of YAML are: ['book_title', 'author_name', 'runtime']


In [45]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [47]:
### Creating test file for this demo:
testdata = {
    'Book Title' : ['a','b','c','d','e'],
    'Author Name' : ['a','b','c','d','e'],
    'Runtime' : [34, 30, 16,33,22],
}
import pandas as pd
df = pd.DataFrame(testdata, columns=['Book Title', 'Author Name','Runtime'])
df.to_csv("test_data_ingestion.csv",index=False)

In [48]:
testdata


{'Book Title': ['a', 'b', 'c', 'd', 'e'],
 'Author Name': ['a', 'b', 'c', 'd', 'e'],
 'Runtime': [34, 30, 16, 33, 22]}

In [5]:
pip install modin

Collecting modin
  Downloading modin-0.23.1-py3-none-any.whl (1.1 MB)
Collecting pandas<2.1,>=2
  Downloading pandas-2.0.3-cp39-cp39-win_amd64.whl (10.8 MB)
Installing collected packages: pandas, modin
  Attempting uninstall: pandas
    Found existing installation: pandas 2.1.0
    Uninstalling pandas-2.1.0:
      Successfully uninstalled pandas-2.1.0



ERROR: Could not install packages due to an OSError: [WinError 5] Access is denied: 'C:\\Users\\97798\\anaconda3\\Lib\\site-packages\\~andas.libs\\msvcp140-59fdf63e48138046aebeb6ddb5b4e960.dll'
Consider using the `--user` option or check the permissions.



In [7]:
pip install modin[ray]


Collecting modin[ray]
Note: you may need to restart the kernel to use updated packages.  Using cached modin-0.23.1-py3-none-any.whl (1.1 MB)

Collecting ray[default]!=2.5.0,>=1.13.0
  Downloading ray-2.6.3-cp39-cp39-win_amd64.whl (22.4 MB)
Collecting pyarrow>=7.0.0
  Downloading pyarrow-13.0.0-cp39-cp39-win_amd64.whl (24.4 MB)
Collecting gpustat>=1.0.0
  Downloading gpustat-1.1.1.tar.gz (98 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
    Preparing wheel metadata: started
    Preparing wheel metadata: finished with status 'done'
Collecting virtualenv<20.21.1,>=20.0.24
  Downloading virtualenv-20.21.0-py3-none-any.whl (8.7 MB)
Collecting opencensus
  Downloading opencensus-0.11.2-py2.py3-none-any.whl (128 kB)
Collecting py-spy>=0.2.0
  Downloading py_spy-0.3.14-py2.py3-none-win_amd64.whl (1.4 MB)
Collecting aiohttp



In [8]:
pip install modin[dask]


Building wheels for collected packages: gpustat
  Building wheel for gpustat (PEP 517): started
  Building wheel for gpustat (PEP 517): finished with status 'done'
  Created wheel for gpustat: filename=gpustat-1.1.1-py3-none-any.whl size=26445 sha256=507dd874794cef6a936f9ccc2d8642a593b0780f00b65ae78388f114b8c76f51
  Stored in directory: c:\users\97798\appdata\local\pip\cache\wheels\12\7d\d7\444dca5ad3c5ea8c4e00ff211673505e0214fa9b0303dbd3b6
Successfully built gpustat
Installing collected packages: ansicon, jinxed, platformdirs, opencensus-context, nvidia-ml-py, distlib, blessed, virtualenv, ray, py-spy, opencensus, gpustat, colorful, aiohttp-cors, pyarrow, modin
Successfully installed aiohttp-cors-0.7.0 ansicon-1.89.0 blessed-1.20.0 colorful-0.5.5 distlib-0.3.7 gpustat-1.1.1 jinxed-1.2.0 modin-0.23.1 nvidia-ml-py-12.535.108 opencensus-0.11.2 opencensus-context-0.1.3 platformdirs-3.10.0 py-spy-0.3.14 pyarrow-13.0.0 ray-2.6.3 virtualenv-20.21.0





Collecting dask>=2.22.0
  Downloading dask-2022.2.1-py3-none-any.whl (1.1 MB)
Installing collected packages: dask
  Attempting uninstall: dask
    Found existing installation: dask 2023.9.1
    Uninstalling dask-2023.9.1:
      Successfully uninstalled dask-2023.9.1
Successfully installed dask-2022.2.1
