In [11]:
class TextAnalyze:
    def __init__(self, stop_words_file):
        # load the stop_words file and initialize count
        self.stop_words = self.load_stop_words(stop_words_file)
        self.word_count = {}
    
    def load_stop_words(self, stop_words_file):
        # open and load the stop words file
        with open(stop_words_file, 'r') as file:
            stop_words = file.read().splitlines()
        return stop_words
    
    def process_text_file(self, file_path):
        with open(file_path, 'r') as file:
            for line in file:
                self.process_line(line)
    
    def process_line(self, line):
        # strip and set the texts to lowercase
        words = line.strip().split()
        for word in words:
            if word.isalpha():      #check if the word is alphabetic 
                if word.lower() not in self.stop_words:
                    self.word_count[word] = self.word_count.get(word, 0) + 1

    def get_top_words(self, k):
        # sort and get the top k words
        sorted_words = sorted(self.word_count.items(), key = lambda x: x[1], reverse=True)
        return sorted_words[:k]

In [12]:
# test 
analyzer = TextAnalyze("stop_words/NLTK's list of english stopwords")
analyzer.process_text_file('small_50MB_dataset.txt')
top_words = analyzer.get_top_words(10)

#print top words
for word, count in top_words:
    print(f'{word},{count}')

would,21795
US,15810
economic,14464
countries,13136
new,12460
political,12229
also,12018
one,11708
global,10935
European,10824


In [16]:
#analyze performance 
import time
import psutil

def analyze_performance(filepath):
    # Start measuring time
    start_time = time.time()

    # get top k words 
    analyzer = TextAnalyze("stop_words/NLTK's list of english stopwords")
    analyzer.process_text_file(filepath)
    top_words = analyzer.get_top_words(10)

    #print top words
    for word, count in top_words:
        print(f'{word},{count}')

    # Stop measuring time
    end_time = time.time()
    running_time = end_time - start_time
    print(f"Running Time: {running_time} seconds")

    # Get CPU utilization
    cpu_utilization = psutil.cpu_percent()
    print(f"CPU Utilization: {cpu_utilization}%")

    # Get memory usage
    memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # in MB
    print(f"Memory Usage: {memory_usage} MB")

In [14]:
# Call the performance analysis function
filepath = 'small_50MB_dataset.txt'
analyze_performance(filepath)

would,21795
US,15810
economic,14464
countries,13136
new,12460
political,12229
also,12018
one,11708
global,10935
European,10824
Running Time: 7.834611892700195 seconds
CPU Utilization: 11.5%
Memory Usage: 130.60546875 MB


In [17]:
# 300MB
filepath = 'data_300MB.txt'
analyze_performance(filepath)

European,316713
Mr,210158
would,178550
also,175427
must,153717
Commission,138001
Member,119742
like,107437
Parliament,85550
one,84992
Running Time: 56.86176419258118 seconds
CPU Utilization: 14.4%
Memory Usage: 44.8125 MB


In [18]:
# 2.5GB
filepath = 'data_2.5GB.txt'
analyze_performance(filepath)

said,1572125
would,903267
one,755766
also,704422
de,620092
last,573309
two,566546
first,557474
people,557166
new,546573
Running Time: 486.91321206092834 seconds
CPU Utilization: 26.1%
Memory Usage: 196.24609375 MB


In [19]:
# 16GB
filepath = 'data_16GB.txt'
analyze_performance(filepath)

said,10397763
would,5738120
one,4664992
also,4446636
two,3535784
last,3520997
first,3458376
people,3447034
new,3377098
could,3286270
Running Time: 3063.7685120105743 seconds
CPU Utilization: 33.4%
Memory Usage: 283.8359375 MB


In [60]:
import threading
import heapq # use heap to sort the hashmap

class WordCounterThread(threading.Thread):
    def __init__(self, partition, stop_words):
        threading.Thread.__init__(self)
        self.partition = partition
        self.word_count = {}
        self.stop_words = stop_words

    def run(self):
        # Process the partition and count the words
        for line in self.partition:
            words = line.strip().split()
            for word in words:
                if word.isalpha():
                    if word.lower() not in self.stop_words:
                        self.word_count[word] = self.word_count.get(word, 0) + 1

    def get_word_count(self):
        return self.word_count

def create_partitions(file_path, num_partitions):
    partitions = []
    with open(file_path, 'r') as file:
        lines = file.readlines()
        total_lines = len(lines)
        lines_per_partition = total_lines // num_partitions

        start = 0
        for i in range(num_partitions - 1):
            end = start + lines_per_partition
            partition = lines[start:end]
            partitions.append(partition)
            start = end

        # Last partition may have remaining lines
        last_partition = lines[start:]
        partitions.append(last_partition)

    return partitions

def load_stop_words(file_path):
    with open(file_path, 'r') as file:
        stop_words = set(file.read().split())
    return stop_words

def count_words_parallel(file_path, stop_words_file, num_threads, k):
    partitions = create_partitions(file_path, num_threads)
    stop_words = load_stop_words(stop_words_file)

    # Create and start the threads
    threads = []
    for i in range(num_threads):
        thread = WordCounterThread(partitions[i], stop_words)
        threads.append(thread)
        thread.start()

    # Wait for all threads to finish
    for thread in threads:
        thread.join()

    # Merge the word counts from each thread
    merged_word_count = {}
    for thread in threads:
        word_count = thread.get_word_count()
        for word, count in word_count.items():
            merged_word_count[word] = merged_word_count.get(word, 0) + count

    # Get the top k words based on counts
    top_words = heapq.nlargest(k, merged_word_count, key=merged_word_count.get)
    top_word_count = {word: merged_word_count[word] for word in top_words}

    return top_word_count

# Usage example
file_path = 'small_50MB_dataset.txt'
stop_words_file = "stop_words/NLTK's list of english stopwords"
num_threads = 4
k = 10
top_word_count = count_words_parallel(file_path, stop_words_file, num_threads, k)
print(top_word_count)


{'would': 21795, 'US': 15810, 'economic': 14464, 'countries': 13136, 'new': 12460, 'political': 12229, 'also': 12018, 'one': 11708, 'global': 10935, 'European': 10824}


In [46]:
#analyze performance 

def analyze_performance_2(filepath, stop_words_file, num_threads, k):
    # Start measuring time
    start_time = time.time()

    # get top k words 
    top_word_count = count_words_parallel(file_path, stop_words_file, num_threads, k)
    print(top_word_count)

    # Stop measuring time
    end_time = time.time()
    running_time = end_time - start_time
    print(f"Running Time: {running_time} seconds")

    # Get CPU utilization
    cpu_utilization = psutil.cpu_percent()
    print(f"CPU Utilization: {cpu_utilization}%")

    # Get memory usage
    memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # in MB
    print(f"Memory Usage: {memory_usage} MB")

In [47]:
# 50 MB 4 threads
file_path = 'small_50MB_dataset.txt'
analyze_performance_2(file_path,stop_words_file, 4, 10)

{'would': 21795, 'US': 15810, 'economic': 14464, 'countries': 13136, 'new': 12460, 'political': 12229, 'also': 12018, 'one': 11708, 'global': 10935, 'European': 10824}
Running Time: 14.21212387084961 seconds
CPU Utilization: 31.4%
Memory Usage: 2358.015625 MB


In [48]:
# 300 MB 4 threads
file_path = 'data_300MB.txt'
analyze_performance_2(file_path,stop_words_file, 4, 10)

{'European': 316713, 'Mr': 210158, 'would': 178550, 'also': 175427, 'must': 153717, 'Commission': 138001, 'Member': 119742, 'like': 107437, 'Parliament': 85550, 'one': 84992}
Running Time: 16.15682315826416 seconds
CPU Utilization: 27.5%
Memory Usage: 1871.66015625 MB


In [49]:
# 300 MB 8 threads
file_path = 'data_300MB.txt'
analyze_performance_2(file_path,stop_words_file, 8, 10)

{'European': 316713, 'Mr': 210158, 'would': 178550, 'also': 175427, 'must': 153717, 'Commission': 138001, 'Member': 119742, 'like': 107437, 'Parliament': 85550, 'one': 84992}
Running Time: 15.069303035736084 seconds
CPU Utilization: 26.3%
Memory Usage: 1857.77734375 MB


In [50]:
# 2.5 GB 4 threads
file_path = 'data_2.5GB.txt'
analyze_performance_2(file_path,stop_words_file, 4, 10)

{'said': 1572125, 'would': 903267, 'one': 755766, 'also': 704422, 'de': 620092, 'last': 573309, 'two': 566546, 'first': 557474, 'people': 557166, 'new': 546573}
Running Time: 154.55087280273438 seconds
CPU Utilization: 34.9%
Memory Usage: 2695.95703125 MB


In [51]:
# 2.5 GB 8 threads
file_path = 'data_2.5GB.txt'
analyze_performance_2(file_path,stop_words_file, 8, 10)

{'said': 1572125, 'would': 903267, 'one': 755766, 'also': 704422, 'de': 620092, 'last': 573309, 'two': 566546, 'first': 557474, 'people': 557166, 'new': 546573}
Running Time: 150.8342468738556 seconds
CPU Utilization: 32.4%
Memory Usage: 2580.6484375 MB


In [52]:
# 16 GB 4 threads 
file_path = 'data_16GB.txt'
analyze_performance_2(file_path,stop_words_file, 4, 10)

{'said': 10397763, 'would': 5738120, 'one': 4664992, 'also': 4446636, 'two': 3535784, 'last': 3520997, 'first': 3458376, 'people': 3447034, 'new': 3377098, 'could': 3286270}
Running Time: 1302.087240934372 seconds
CPU Utilization: 28.8%
Memory Usage: 2544.13671875 MB


In [53]:
# 16 GB 8 threads 
file_path = 'data_16GB.txt'
analyze_performance_2(file_path,stop_words_file, 8, 10)

{'said': 10397763, 'would': 5738120, 'one': 4664992, 'also': 4446636, 'two': 3535784, 'last': 3520997, 'first': 3458376, 'people': 3447034, 'new': 3377098, 'could': 3286270}
Running Time: 1433.357969045639 seconds
CPU Utilization: 38.8%
Memory Usage: 3808.78515625 MB


from above, we can see that using multi threading and partition significantly increase the speed of reading large size of data. Using more threads sometimes result in faster reading speed but the cpu and memory usage are much higher.

In [59]:
# using concurrent hashmap
from concurrent.futures import ThreadPoolExecutor

def count_words_concurrent(file_path, stop_words_file, num_threads, k):
    partitions = create_partitions(file_path, num_threads)
    stop_words = load_stop_words(stop_words_file)

    def count_words(partition):
        word_count = {}
        for line in partition:
            words = line.strip().split()
            for word in words:
                if word.isalpha():
                    if word.lower() not in stop_words:
                        word_count[word] = word_count.get(word, 0) + 1
        return word_count

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        word_count_futures = executor.map(count_words, partitions)

    merged_word_count = {}
    for word_count in word_count_futures:
        for word, count in word_count.items():
            merged_word_count[word] = merged_word_count.get(word, 0) + count

    # Get the top k words based on counts
    top_words = heapq.nlargest(k, merged_word_count, key=merged_word_count.get)
    top_word_count = {word: merged_word_count[word] for word in top_words}

    return top_word_count

# Usage example
file_path = 'small_50MB_dataset.txt'
stop_words_file = "stop_words/NLTK's list of english stopwords"
num_threads = 4
k = 10
top_word_count = count_words_concurrent(file_path, stop_words_file, num_threads, k)
print(top_word_count)

{'would': 21795, 'US': 15810, 'economic': 14464, 'countries': 13136, 'new': 12460, 'political': 12229, 'also': 12018, 'one': 11708, 'global': 10935, 'European': 10824}


In [61]:
#analyze performance 

def analyze_performance_3(filepath, stop_words_file, num_threads, k):
    # Start measuring time
    start_time = time.time()

    # get top k words 
    top_word_count = count_words_concurrent(file_path, stop_words_file, num_threads, k)
    print(top_word_count)

    # Stop measuring time
    end_time = time.time()
    running_time = end_time - start_time
    print(f"Running Time: {running_time} seconds")

    # Get CPU utilization
    cpu_utilization = psutil.cpu_percent()
    print(f"CPU Utilization: {cpu_utilization}%")

    # Get memory usage
    memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # in MB
    print(f"Memory Usage: {memory_usage} MB")

In [63]:
# 300 MB 4 threads
file_path = 'data_300MB.txt'
analyze_performance_3(file_path,stop_words_file, 4, 10)

{'European': 316713, 'Mr': 210158, 'would': 178550, 'also': 175427, 'must': 153717, 'Commission': 138001, 'Member': 119742, 'like': 107437, 'Parliament': 85550, 'one': 84992}
Running Time: 15.011220932006836 seconds
CPU Utilization: 39.3%
Memory Usage: 654.55859375 MB


In [64]:
# 300 MB 8 threads
file_path = 'data_300MB.txt'
analyze_performance_3(file_path,stop_words_file, 4, 10)

{'European': 316713, 'Mr': 210158, 'would': 178550, 'also': 175427, 'must': 153717, 'Commission': 138001, 'Member': 119742, 'like': 107437, 'Parliament': 85550, 'one': 84992}
Running Time: 13.844422817230225 seconds
CPU Utilization: 36.9%
Memory Usage: 630.23046875 MB
