In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import time
import logging
import subprocess
import yaml
import datetime 
import gc
import re
import gzip
import time
import warnings
from dask import dataframe as dd
import csv
import gzip

In [7]:
#Read in the data with Dask
from dask import dataframe as dd
start = time.time()
EnFr_df = dd.read_csv('en-fr.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.018071889877319336 sec


In [5]:
#Read in the data with Pandas
import pandas as pd
start = time.time()
EnFr_df1 = pd.read_csv('en-fr.csv',delimiter=',')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  255.34760904312134 sec


In [58]:
#Read in the data with Modin
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('en-fr.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

Read csv with modin and ray:  346.1644399166107 sec


In [2]:
#We use dask because it opens with the fastest Dask
from dask import dataframe as dd
df = dd.read_csv('en-fr.csv',delimiter=',')

In [11]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utility.py


In [20]:
%%writefile utility.py
import yaml
import logging
import re

def read_cfg(path):
    with open(path, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df, table_cfg):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x, '_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(), table_cfg["columns"]))
    expected_col.sort()
    #df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if list(expected_col) == list(df.columns):
        print("column name validation passed")
        return 1
    else:
        print("column name validation failed")
        mismatch = list(set(df.columns).difference(expected_col))
        print("Columns not in YAML file: ", mismatch)
        missing =  list(set(expected_col).difference(df.columns))
        print("Columns not in data file: ", missing)
        return 0

Overwriting utility.py


In [59]:
%%writefile language.yaml
file_type: csv
dataset_name: file
file_name: en-fr
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
      - fr
      - en
      

Writing language.yaml


In [3]:
# Reading config file
import utility as util
config_data = util.read_cfg("language.yaml")

In [4]:
#data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'en-fr',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['fr', 'en']}

In [5]:
# Reading process of the file using Dask
df_sample = dd.read_csv('en-fr.csv',delimiter=',')
df_sample.head()

Unnamed: 0,en,fr
0,Changing Lives | Changing Society | How It Wor...,Il a transformé notre vie | Il a transformé la...
1,Site map,Plan du site
2,Feedback,Rétroaction
3,Credits,Crédits
4,Français,English


In [6]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'

In [7]:
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,en,fr
0,Changing Lives | Changing Society | How It Wor...,Il a transformé notre vie | Il a transformé la...
1,Site map,Plan du site
2,Feedback,Rétroaction
3,Credits,Crédits
4,Français,English


In [8]:
#validation
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")
    

column name validation passed
col validation passed


In [9]:
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'en-fr',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['fr', 'en']}

In [40]:
#Output gz file
df.to_csv('en-fr.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

In [10]:
#No. of Rows
len(df.index)

22520376

In [11]:
#Size of the file
os.path.getsize('en-fr.csv')

8410507707

In [12]:
#No, of Columns
len(df.columns)

2

In [13]:
#Size of the file
os.path.getsize('en-fr.csv.gz')

2668433373