In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


### Write YAML file

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - en_description
    - en_label
    - item_id

Overwriting file.yaml


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [4]:
config_data['inbound_delimiter']

','

In [5]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['en_description', 'en_label', 'item_id']}

In [6]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("test_data.csv",delimiter=',')
df_sample.head()

Unnamed: 0,item_id,en_label,en_description
0,1,Universe,totality of space and all contents
1,2,Earth,third planet from the Sun in the Solar System
2,3,life,matter capable of extracting energy from the e...
3,4,death,permanent cessation of vital functions
4,5,human,"common name of Homo sapiens, unique extant spe..."


In [7]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,item_id,en_label,en_description
0,1,Universe,totality of space and all contents
1,2,Earth,third planet from the Sun in the Solar System
2,3,life,matter capable of extracting energy from the e...
3,4,death,permanent cessation of vital functions
4,5,human,"common name of Homo sapiens, unique extant spe..."


In [8]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [9]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['item_id', 'en_label', 'en_description'], dtype='object')
columns of YAML are: ['en_description', 'en_label', 'item_id']


In [10]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [11]:
pd.read_csv("./test_data.csv")

Unnamed: 0,item_id,en_label,en_description
0,1,Universe,totality of space and all contents
1,2,Earth,third planet from the Sun in the Solar System
2,3,life,matter capable of extracting energy from the e...
3,4,death,permanent cessation of vital functions
4,5,human,"common name of Homo sapiens, unique extant spe..."
...,...,...,...
51450311,77257472,2dFGRS TGN256Z026,
51450312,77257483,2dFGRS TGS171Z171,
51450313,77257484,2dFGRS TGS373Z078,
51450314,77257491,2dFGRS TGS374Z114,


In [12]:
df

Unnamed: 0,item_id,en_label,en_description
0,1,Universe,totality of space and all contents
1,2,Earth,third planet from the Sun in the Solar System
2,3,life,matter capable of extracting energy from the e...
3,4,death,permanent cessation of vital functions
4,5,human,"common name of Homo sapiens, unique extant spe..."
...,...,...,...
51450311,77257472,2dFGRS TGN256Z026,
51450312,77257483,2dFGRS TGS171Z171,
51450313,77257484,2dFGRS TGS373Z078,
51450314,77257491,2dFGRS TGS374Z114,


In [19]:
import gzip

In [22]:
# Write the file in pipe-separated text file (|) in gz format
output_file = "output_data.txt"
df.to_csv(output_file, sep="|", index=False)

# Compress the file to gz format
compressed_file = "output_data.txt.gz"
with open(output_file, "rt") as f_in:
    with gzip.open(compressed_file, "wt") as f_out:
        f_out.writelines(f_in)

# summary:

51450316 rows × 3 columns

In [17]:
file_size = os.path.getsize("test_data.csv")
size_in_kb = file_size / 1024
size_in_mb = size_in_kb / 1024
size_in_gb = size_in_mb / 1024

print(size_in_gb)

3.857588341459632
