In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import string
import gc
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import string
import dask.bag as db

# SOME MEMORY-BASED OCCURING ERRORS CAN BE TRIED WITH "FAKE.TXT" FILE IN THE REPO 

################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def normalize_text(text):
    # Lowercase all letters
    text = text.lower()
    # Remove punctuation
    text = text.translate(str.maketrans("", "", string.punctuation))
    return text

def validate(df, table_config):
    file_type = table_config['file_type']
    source_file = "./" + table_config['file_name'] + f'.{file_type}' 
    file_size = os.path.getsize(source_file)
    file_size = file_size / (1024 * 1024 * 1024)
    
    '''
    Commented due to insufficient amount of memory of my pc
    checkpoint_dir = "C:/Users/BatuhanYILMAZ/Desktop/staj/week6/checkpoints" 
    df = df.persist(checkpoint=checkpoint_dir)
    '''
    min_size = int(table_config['validations']['file_size']['min_size'])
    max_size = int(table_config['validations']['file_size']['max_size'])
    if file_size > min_size and file_size < max_size:
        print("File size validation failed")
        print(f"""Expected file size should be between {min_size} - {max_size} GB """)
        print(f"""Size of the actual file : {file_size} GB""")
        return False

    text_length = (
        df.map(lambda text: text.replace(" ", ""))
        .flatten()
        .map(len)
        .sum()
        .compute()
    )
    min_length = int(table_config['validations']['text_length']['min_length'])
    max_length = int(table_config['validations']['text_length']['max_length'])
    if text_length > min_length and text_length < max_length:
        print("Text length validation failed")
        print(f"""Expected text length should be between {min_length} - {max_length} characters""")
        print(f"""Total text length of actual file : {text_length} characters""")
        return False
    
    
    word_count = (
        df.map(normalize_text)
        .str.split()
        .flatten()
        .count()
        .compute()
    )
    min_count = int(table_config['validations']['word_count']['min_count'])
    max_count = int(table_config['validations']['word_count']['max_count'])
    if word_count > min_count and word_count < max_count:
        print("Word count validation failed")
        print(f"""Expected word count should be between {min_count} - {max_count}""")
        print(f"""Total number of words in the actual file : {word_count}""")
        return False
    '''
    Commented due to insufficient amount of memory of my pc
    pattern = table_config['validations']['format_validation']['required_pattern']
    if not any(re.search(pattern, line) for line in df.compute()):
        print("Format validation failed")
        print("Expected file format should start with:", pattern)
        return False
    '''
    print("All validations passed successfully")
    print(f"""Size of the actual file : {file_size} GB""")
    print(f"""Total text length of actual file : {text_length} characters""")
    print(f"""Total number of words in the actual file : {word_count}""")
    #print(f"""Each paragraph of the file starts with the correct pattern : [pattern] """)
    return True

def get_data_summary(bag, table_config):
    print("---------------Data Summary---------------")
    # Normalize the text and count the total number of words
    total_words = (
        bag.map(normalize_text)
        .str.split()
        .flatten()
        .count()
        .compute()
    )
    print("Total number of words in file: ", total_words)
    
    # Total length of whole words in the file   
    total_length = (
        bag.map(lambda text: text.replace(" ", ""))
        .flatten()
        .map(len)
        .sum()
        .compute()
    )
    print("Total length of words in file: ", total_length)
    
    # Get the size of the file
    file_type = table_config['file_type']
    source_file = "./" + table_config['file_name'] + f'.{file_type}' 
    file_stats = os.stat(source_file)
    print(f'File Size in Bytes is {file_stats.st_size}')
    print(f'File Size in MegaBytes is {file_stats.st_size / (1024 * 1024)}')
    print(f'File Size in GigaBytes is {file_stats.st_size / (1024 * 1024 * 1024)}')
    
    word_counts = (
    bag.map(normalize_text)
    .str.split()
    .flatten()
    .frequencies()
    .compute()
    )

    # Print the word frequencies
    print("Frequency of words:")
    for word, count in word_counts:
        print(f"{word}: {count}")

def write(bag, table_config):
    # Convert the Bag to a DataFrame
    df = db.from_sequence(bag).to_dataframe()

    # Define the output file path
    output_file = "output_file.txt.gz"

    # Write the DataFrame to a single pipe-separated text file
    df.to_csv(output_file, sep=table_config['outbound_delimiter'], compression="gzip", single_file=True, index=False)
    
    print("Gzipped file created:", output_file)

Overwriting testutility.py


In [2]:
%%writefile file.yaml
file_type: txt
dataset_name: testfile
file_name: data
table_name: edsurv
inbound_delimiter: "\n\n"
outbound_delimiter: "|"
validations:
  file_size:
    min_size: 2
    max_size: 10 # gb
  text_length:
    min_length: 1000000000
    max_length: 100000000000
  word_count:
    min_count: 1000000
    max_count: 1000000000000
  format_validation:
    required_pattern: "Lorem ipsum"

Overwriting file.yaml


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
print(source_file)

./data.txt


In [4]:
config_data

{'file_type': 'txt',
 'dataset_name': 'testfile',
 'file_name': 'data',
 'table_name': 'edsurv',
 'inbound_delimiter': '\n\n',
 'outbound_delimiter': '|',
 'validations': {'file_size': {'min_size': 2, 'max_size': 10},
  'text_length': {'min_length': 1000000000, 'max_length': 100000000000},
  'word_count': {'min_count': 1000000, 'max_count': 1000000000000},
  'format_validation': {'required_pattern': 'Lorem ipsum'}}}

In [5]:
import time

file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
print(source_file)

./data.txt


In [6]:
# Import data-reading libraries
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray as rpd
import dask.bag as db

In [7]:
# Read the file using pandas and measure computation time
start_time = time.time()
pandas_data = pd.read_csv(source_file, delimiter=config_data['inbound_delimiter'])
pandas_time = time.time() - start_time
pandas_data.head()



Unnamed: 0,"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur."
0,Excepteur sint occaecat cupidatat non proident...
1,quis nostrud exercitation ullamco laboris nisi...
2,"Lorem ipsum dolor sit amet, consectetur adipis..."
3,Excepteur sint occaecat cupidatat non proident...
4,Excepteur sint occaecat cupidatat non proident...


In [8]:
# Read the file using Dask and measure computation time
start_time = time.time()
dask_data = dd.read_csv(source_file, delimiter=config_data['inbound_delimiter']).compute()
dask_time = time.time() - start_time
dask_data.head()



Unnamed: 0,"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur."
0,Excepteur sint occaecat cupidatat non proident...
1,quis nostrud exercitation ullamco laboris nisi...
2,"Lorem ipsum dolor sit amet, consectetur adipis..."
3,Excepteur sint occaecat cupidatat non proident...
4,Excepteur sint occaecat cupidatat non proident...


In [9]:
# Read the file using Modin and measure computation time
start_time = time.time()
modin_data = mpd.read_csv(source_file, delimiter=config_data['inbound_delimiter'])
modin_time = time.time() - start_time
modin_data.head()


    import ray
    ray.init()

2023-07-18 15:56:34,370	INFO worker.py:1636 -- Started a local Ray instance.


Unnamed: 0,"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur."
0,Excepteur sint occaecat cupidatat non proident...
1,quis nostrud exercitation ullamco laboris nisi...
2,"Lorem ipsum dolor sit amet, consectetur adipis..."
3,Excepteur sint occaecat cupidatat non proident...
4,Excepteur sint occaecat cupidatat non proident...


In [10]:
# Read the file using Dask.Bag and measure computation time
start_time = time.time()
dask_bag_data = db.read_text(source_file)
dask_bag_time = time.time() - start_time

In [11]:
print("Computation times for reading the file using different methods:")
print("Pandas: {:.2f} seconds".format(pandas_time))

Computation times for reading the file using different methods:
Pandas: 33.23 seconds


In [12]:
print("Dask: {:.2f} seconds".format(dask_time))

Dask: 34.02 seconds


In [13]:
print("Modin: {:.2f} seconds".format(modin_time))

Modin: 36.14 seconds


In [14]:
print("Dask.bag: {:.2f} seconds".format(dask_bag_time))

Dask.bag: 0.01 seconds


In [15]:
# Validate the file with the expected format
util.validate(dask_bag_data,config_data)

All validations passed successfully
Size of the actual file : 2.32830810546875 GB
Total text length of actual file : 2108981248 characters
Total number of words in the actual file : 384352257


True

In [16]:
# Get the summary of data
util.get_data_summary(dask_bag_data,config_data)

---------------Data Summary---------------
Total number of words in file:  384352257
Total length of words in file:  2108981248
File Size in Bytes is 2500001792
File Size in MegaBytes is 2384.1875
File Size in GigaBytes is 2.32830810546875
Frequency of words:
lorem: 5554177
ipsum: 5570560
dolor: 11141120
sit: 5570560
amet: 5570560
consectetur: 5570560
adipiscing: 5570560
elit: 5570560
sed: 5570560
do: 5570560
eiusmod: 5570560
tempor: 5570560
incididunt: 5570560
ut: 16711680
labore: 5570560
et: 5570560
dolore: 11141120
magna: 5570560
aliqua: 5570560
enim: 5570560
ad: 5570560
minim: 5570560
veniam: 5570560
quis: 5570560
nostrud: 5570560
exercitation: 5570560
ullamco: 5570560
laboris: 5570560
nisi: 5570560
aliquip: 5570560
ex: 5570560
ea: 5570560
commodo: 5570560
consequat: 5570560
duis: 5570560
aute: 5570560
irure: 5570560
in: 16711680
reprehenderit: 5570560
voluptate: 5570560
velit: 5570560
esse: 5570560
cillum: 5570560
eu: 5570560
fugiat: 5570560
nulla: 5570560
pariatur: 5570560
except

In [19]:
# Write the Bag to a pipe-separated text file in gz format
# util.write(dask_bag_data, config_data)
# COMMENTED DUE TO INSUFFICIENT MEMORY ERROR OF MY PC