# BDS Final Project
Ethan Assefa, Thomas Burrell, Tatev Gomtsyan

### Loads in and reads data in parallel across all the Ray workers:
Preprocessing first to ensure empty strings dont cause problems, then read in full datasets.
- Done for each of the three categories we are interested in (Appliances, All Beauty, and Video Games)

In [2]:
import ray
import gzip
import json
import os
import random  # Import random module for sampling

# Ensure Ray is shut down and then reinitialize
ray.shutdown()
try:
    # Official way to initialize Ray with memory management
    ray.init(address='ray://172.31.28.176:10001')
except Exception as e:
    print(f"Failed to initialize Ray: {e}")

@ray.remote(num_cpus=2)
def load_data(file_path, start, end):
    texts = []
    with gzip.open(file_path, 'rt', encoding='utf-8') as f:
        f.seek(start)
        if start != 0:
            f.readline()  # Skip partial line
        while f.tell() < end:
            line = f.readline()
            if line:
                json_data = json.loads(line)
                texts.append(json_data['text'])
    return texts

def read_data(file_path, num_workers, sample_fraction=0.1):
    file_size = os.path.getsize(file_path)
    chunk_size = file_size // num_workers
    futures = []
    for i in range(num_workers):
        start = i * chunk_size
        end = start + chunk_size if i != num_workers - 1 else file_size
        futures.append(load_data.remote(file_path, start, end))
    
    all_texts = []
    for future in ray.get(futures):
        all_texts.extend(future)

    # Sample a fraction of the data randomly
    sampled_texts = random.sample(all_texts, int(len(all_texts) * sample_fraction))
    return sampled_texts

if __name__ == "__main__":
    file_paths = ['Appliances.jsonl.gz', 'All_Beauty.jsonl.gz', 'Musical_Instruments.jsonl.gz', 'Video_Games.jsonl.gz']
    num_workers = 7  # Define the number of Ray workers to use

    # Cycle through the given files and pull texts
    for file_path in file_paths:
        texts = read_data(file_path, num_workers, sample_fraction=1)  # Read data from each file
        var_name = file_path.split('.')[0] + "_full_texts"  # Create variable name based on the file name
        globals()[var_name] = texts  # Assign texts to a dynamically named variable
        print(f"Loaded {len(texts)} texts from {file_path} into {var_name}.")

SIGTERM handler is not set because current thread is not the main thread.


Loaded 606201 texts from Appliances.jsonl.gz into Appliances_full_texts.
Loaded 196645 texts from All_Beauty.jsonl.gz into All_Beauty_full_texts.
Loaded 863026 texts from Musical_Instruments.jsonl.gz into Musical_Instruments_full_texts.
Loaded 1370708 texts from Video_Games.jsonl.gz into Video_Games_full_texts.


### Test Run of Sentiment Analysis
Run small subset test on "local" head node without distributing across workers first to make sure there is no problem with the sentiment analysisx logic of our code:
- Conducted on the appliances dataset

In [3]:
import ray
import os
from flair.models import TextClassifier
from flair.data import Sentence

def test_local_sentiment_analysis(texts, model_path):
    classifier = TextClassifier.load(model_path)
    results = []
    for text in texts:
        sentence = Sentence(text)
        try:
            classifier.predict(sentence)
            results.append((sentence.text, sentence.labels[0].value, sentence.labels[0].score))
        except Exception as e:
            results.append((sentence.text, 'Error', str(e)))
    return results

# Example usage with a small subset
sample_texts = Appliances_full_texts[:10]
model_path = os.path.expanduser('~/.flair/models/sentiment-en-mix-distillbert_4.pt')
local_results = test_local_sentiment_analysis(sample_texts, model_path)
print(local_results)

[("They close, won't deteriorate like many other hard plastic types. I read in reviews a tinkling noise, but i haven't noticed and been a year.", 'POSITIVE', 0.9998600482940674), ("K-cups are such a burden on the environment. In fact, the inventor of the Keurig. The inventor of the K-cup, John Sylvan said, &quot;I feel bad sometimes that I ever did it.&quot; Now, there is a way to lessen your burden on the environment by using reusable K-cups! These are solid, nice, pretty and reusable cups that can easily be filled with coffee, tea or the drink of your choice. They work just like regular K-cups except you don't throw them away after a use. I feel so much better about my coffee consumption now that I don't have to throw them away. I highly recommend this product due to the green factor.<br /><br />I received this item at a discount in exchange for my honest review.", 'POSITIVE', 0.9998317956924438), ('Great deal.', 'POSITIVE', 0.9862760305404663), ('I love this because the ice cubes ar

### Take subset of full data 
We decided to take a percentage of each full dataset for this exercise, with the knowledge that the infrastructure can be scaled to handle the entire dataset if we wish.
- Took 10% for each of the first three categories we are interested in (Appliances, All Beauty, and Musical Instruments)
- Took 5% for the last and largest category (Video Games)

In [1]:
import ray
import gzip
import json
import os
import random  # Import random module for sampling

# Ensure Ray is shut down and then reinitialize
ray.shutdown()
try:
    # Official way to initialize Ray with memory management
    ray.init(address='ray://172.31.28.176:10001')
except Exception as e:
    print(f"Failed to initialize Ray: {e}")

@ray.remote
def load_data(file_path, start, end):
    texts = []
    with gzip.open(file_path, 'rt', encoding='utf-8') as f:
        f.seek(start)
        if start != 0:
            f.readline()  # Skip partial line
        while f.tell() < end:
            line = f.readline()
            if line:
                json_data = json.loads(line)
                texts.append(json_data['text'])
    return texts

def read_data(file_path, num_workers, sample_fraction=0.1):
    file_size = os.path.getsize(file_path)
    chunk_size = file_size // num_workers
    futures = []
    for i in range(num_workers):
        start = i * chunk_size
        end = start + chunk_size if i != num_workers - 1 else file_size
        futures.append(load_data.remote(file_path, start, end))
    
    all_texts = []
    for future in ray.get(futures):
        all_texts.extend(future)

    # Sample a fraction of the data randomly
    sampled_texts = random.sample(all_texts, int(len(all_texts) * sample_fraction))
    return sampled_texts

if __name__ == "__main__":
    configurations = [
        {'file_path': 'Appliances.jsonl.gz', 'sample_fraction': 0.1},
        {'file_path': 'All_Beauty.jsonl.gz', 'sample_fraction': 0.1},
        {'file_path': 'Musical_Instruments.jsonl.gz', 'sample_fraction': 0.1},
        {'file_path': 'Video_Games.jsonl.gz', 'sample_fraction': 0.05}
    ]
    #file_paths = ['Appliances.jsonl.gz', 'All_Beauty.jsonl.gz', 'Musical_Instruments.jsonl.gz', 'Video_Games.jsonl.gz']
    num_workers = 7  # Define the number of Ray workers to use

    # Cycle through the given files and pull texts
    for config in configurations:
        texts = read_data(config['file_path'], num_workers, sample_fraction=config['sample_fraction'])  # Read data from each file
        var_name = config['file_path'].split('.')[0] + "_sampled_texts"  # Create variable name based on the file name
        globals()[var_name] = texts  # Assign texts to a dynamically named variable
        print(f"Loaded {len(texts)} texts from {config['file_path']} into {var_name}.")

SIGTERM handler is not set because current thread is not the main thread.


Loaded 60620 texts from Appliances.jsonl.gz into Appliances_sampled_texts.
Loaded 19664 texts from All_Beauty.jsonl.gz into All_Beauty_sampled_texts.
Loaded 86302 texts from Musical_Instruments.jsonl.gz into Musical_Instruments_sampled_texts.
Loaded 68535 texts from Video_Games.jsonl.gz into Video_Games_sampled_texts.


## Full System
We begin with 7 EC2 Instances (T3.Large), each with 2 CPUs. The system capabilities are laid out here:

In [6]:
import ray

# Ensure Ray is shut down and then reinitialize
ray.shutdown()
try:
    # Official way to initialize Ray with memory management
    ray.init(address='ray://172.31.28.176:10001')
except Exception as e:
    print(f"Failed to initialize Ray: {e}")

# Print the status of the Ray cluster.
print('''This cluster consists of
    {} nodes in total
    {} CPU resources in total
    {} Memory resources in total
    {} Object store memory resources in total
    '''.format(len(ray.nodes()), ray.cluster_resources()['CPU'],
           ray.cluster_resources()['memory'] / (1024*1024*1024),
           ray.cluster_resources()['object_store_memory'] / (1024*1024*1024)))

ray.shutdown()

This cluster consists of
    7 nodes in total
    14.0 CPU resources in total
    35.468619922176 Memory resources in total
    15.508581158705056 Object store memory resources in total
    


## Run sentiment analysis in parallel across the Ray workers for Each Review Category:
Four configurations each:
1. Number of Workers: 2 | Number of CPUs per Worker: 1 | Batch Size: 100
3. Number of Workers: 7 | Number of CPUs per Worker: 2 | Batch Size: 100
4. Number of Workers: 7 | Number of CPUs per Worker: 2 | Batch Size: 200
5. Number of Workers: 14 | Number of CPUs per Worker: 1 | Batch Size: 100
 
### Appliances Amazon Reviews
We begin with the Appliances sampled data, running the sentiment analysis six times with each system configuration.
- A log of events and warnings is kept to ensure we are aware of any trouble when running (`senti_analy_appliances.log`)
- Timers are set to provide the runtimes for each system configuration (time data saved to `appliances_timing.csv`)
  - Total time, time per batch, average time per text
- Results from the sentiment analysis on each review text are collected (saved to `appliances_results.csv`)
  - Text content, sentiment, confidence

In [10]:
import ray
import os
from flair.models import TextClassifier
from flair.data import Sentence
import logging
import pandas as pd
import time

# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='senti_analy_appliances.log', filemode='w')

# Ensure Ray is shut down and then reinitialize
ray.shutdown()
ray.init(address='ray://172.31.28.176:10001')

# Set up key parameters
model_path = os.path.expanduser('~/.flair/models/sentiment-en-mix-distillbert_4.pt') # Path to the prebuilt sentiment analyzer on each worker

# Contains code for sentiment analysis, loops through different system configurations
if __name__ == "__main__":
    configurations = [
        {'num_workers': 2, 'cpu_per_worker': 1, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 200},
        {'num_workers': 14, 'cpu_per_worker': 1, 'batch_size': 100}       
    ]
    
    # Initialize DataFrame to store results
    result_df_initialized = False
    
    for config in configurations:
        num_workers = config['num_workers']
        cpu_per_worker = config['cpu_per_worker']
        batch_size = config['batch_size']
        
        @ray.remote(num_cpus=cpu_per_worker)
        class SentimentAnalyzer:
            def __init__(self, model_path):
                # Initialize logging inside actor
                self.logger = logging.getLogger(__name__)
                self.logger.setLevel(logging.INFO)
                handler = logging.FileHandler('senti_analy_appliances.log')
                handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
                self.logger.addHandler(handler)
                
                self.classifier = TextClassifier.load(model_path)
                self.logger.info("Model loaded successfully.")
        
            def analyze_sentiments(self, texts):
                start_time = time.time()
                results = []
                for text in texts:
                    self.logger.info(f"Processing text: {text[:15]}...")  # Log first 15 characters of text
                    sentence = Sentence(text)
                    try:
                        self.classifier.predict(sentence)
                        if sentence.labels:
                            results.append((sentence.text, sentence.labels[0].value, sentence.labels[0].score))
                            self.logger.info("Processed result.")
                        else:
                            results.append((sentence.text, 'No Prediction', 0))
                            self.logger.warning("No prediction made for sentence.")
                    except Exception as e:
                        error_message = str(e)
                        results.append((sentence.text, 'Error', error_message))
                        logging.error(f"Error processing text: {text[:15]}..., error: {error_message}")
                end_time = time.time()
                processing_time = end_time - start_time
                return results, processing_time, len(texts)
        
        def preprocess_texts(texts):
            # Remove empty strings and strip whitespace
            return [text.strip() for text in texts if text.strip()]
        
        def distribute_analysis(texts, model_path, num_workers, batch_size):
            texts = preprocess_texts(texts)
            chunk_size = len(texts) // num_workers
            analyzers = [SentimentAnalyzer.remote(model_path) for _ in range(num_workers)]
            futures = []
            
            start_time = time.time()
            for i in range(num_workers):
                start_index = i * chunk_size
                end_index = start_index + chunk_size if i != num_workers - 1 else len(texts)
                for j in range(start_index, end_index, batch_size):
                    batch = texts[j:j+batch_size]
                    futures.append((len(batch), analyzers[i].analyze_sentiments.remote(batch)))
        
            results = []
            time_per_batch = []
            for batch_size, future in futures:
                batch_results, batch_time, _ = ray.get(future)
                results.extend(batch_results)
                time_per_batch.append((batch_size, batch_time))
            
            total_time = time.time() - start_time
            average_time_per_text = total_time / len(texts) if texts else 0
            return results, total_time, average_time_per_text, time_per_batch, num_workers, len(analyzers) * cpu_per_worker
        
        # Distribute and process the analysis
        results, total_time, average_time_per_text, time_per_batch, _, _ = distribute_analysis(
            Appliances_sampled_texts, model_path, num_workers, batch_size
        )
        
        # Save results to DataFrame and CSV only if not done already
        if not result_df_initialized:
            df = pd.DataFrame(results, columns=['Text', 'Sentiment', 'Confidence'])
            df.to_csv('appliances_results.csv', index=False)
            result_df_initialized = True
        
        # Prepare timing data for CSV
        timing_data = {
            'Total Time (s)': [total_time],
            'Average Time per Text (s)': [average_time_per_text],
            'Total Workers': [num_workers],
            'CPU per Worker': [cpu_per_worker],
            'Batch Size': [batch_size]
        }
        
        # Convert time_per_batch into a DataFrame and merge with timing_data
        batch_timing_df = pd.DataFrame(time_per_batch, columns=['Batch Size', 'Time per Batch'])
        for column, value in timing_data.items():
            batch_timing_df[column] = value[0]  # Add the same value to all rows
        
        # Append to or create the timing CSV
        if os.path.exists('appliances_timing.csv'):
            with open('appliances_timing.csv', 'a') as f:
                batch_timing_df.to_csv(f, header=False, index=False)
        else:
            batch_timing_df.to_csv('appliances_timing.csv', index=False)

        print(f"Processed {len(results)} sentiment analyses with configuration: Workers={num_workers}, CPUs={cpu_per_worker}, Batch Size={batch_size}")
        print(f"Timing data appended to 'appliances_timing.csv'.")

Processed 60598 sentiment analyses with configuration: Workers=2, CPUs=1, Batch Size=100
Timing data appended to 'appliances_timing.csv'.
Processed 60859 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=100
Timing data appended to 'appliances_timing.csv'.
Processed 61459 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=200
Timing data appended to 'appliances_timing.csv'.
Processed 61531 sentiment analyses with configuration: Workers=14, CPUs=1, Batch Size=100
Timing data appended to 'appliances_timing.csv'.


SIGTERM handler is not set because current thread is not the main thread.


### All Beauty Amazon Reviews
We continue with the All Beauty sampled data, running the sentiment analysis six times with each system configuration.
- A log of events and warnings is kept to ensure we are aware of any trouble when running (`senti_analy_allbeauty.log`)
- Timers are set to provide the runtimes for each system configuration (time data saved to `allbeauty_timing.csv`)
  - Total time, time per batch, average time per text
- Results from the sentiment analysis on each review text are collected (saved to `allbeauty_results.csv`)
  - Text content, sentiment, confidence

In [11]:
import ray
import os
from flair.models import TextClassifier
from flair.data import Sentence
import logging
import pandas as pd
import time

# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='senti_analy_allbeauty.log', filemode='w')

# Ensure Ray is shut down and then reinitialize
ray.shutdown()
ray.init(address='ray://172.31.28.176:10001')

# Set up key parameters
model_path = os.path.expanduser('~/.flair/models/sentiment-en-mix-distillbert_4.pt') # Path to the prebuilt sentiment analyzer on each worker

# Contains code for sentiment analysis, loops through different system configurations
if __name__ == "__main__":
    configurations = [
        {'num_workers': 2, 'cpu_per_worker': 1, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 200},
        {'num_workers': 14, 'cpu_per_worker': 1, 'batch_size': 100}       
    ]
    
    # Initialize DataFrame to store results
    result_df_initialized = False
    
    for config in configurations:
        num_workers = config['num_workers']
        cpu_per_worker = config['cpu_per_worker']
        batch_size = config['batch_size']
        
        @ray.remote(num_cpus=cpu_per_worker)
        class SentimentAnalyzer:
            def __init__(self, model_path):
                # Initialize logging inside actor
                self.logger = logging.getLogger(__name__)
                self.logger.setLevel(logging.INFO)
                handler = logging.FileHandler('senti_analy_allbeauty.log')
                handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
                self.logger.addHandler(handler)
                
                self.classifier = TextClassifier.load(model_path)
                self.logger.info("Model loaded successfully.")
        
            def analyze_sentiments(self, texts):
                start_time = time.time()
                results = []
                for text in texts:
                    self.logger.info(f"Processing text: {text[:15]}...")  # Log first 15 characters of text
                    sentence = Sentence(text)
                    try:
                        self.classifier.predict(sentence)
                        if sentence.labels:
                            results.append((sentence.text, sentence.labels[0].value, sentence.labels[0].score))
                            self.logger.info("Processed result.")
                        else:
                            results.append((sentence.text, 'No Prediction', 0))
                            self.logger.warning("No prediction made for sentence.")
                    except Exception as e:
                        error_message = str(e)
                        results.append((sentence.text, 'Error', error_message))
                        logging.error(f"Error processing text: {text[:15]}..., error: {error_message}")
                end_time = time.time()
                processing_time = end_time - start_time
                return results, processing_time, len(texts)
        
        def preprocess_texts(texts):
            # Remove empty strings and strip whitespace
            return [text.strip() for text in texts if text.strip()]
        
        def distribute_analysis(texts, model_path, num_workers, batch_size):
            texts = preprocess_texts(texts)
            chunk_size = len(texts) // num_workers
            analyzers = [SentimentAnalyzer.remote(model_path) for _ in range(num_workers)]
            futures = []
            
            start_time = time.time()
            for i in range(num_workers):
                start_index = i * chunk_size
                end_index = start_index + chunk_size if i != num_workers - 1 else len(texts)
                for j in range(start_index, end_index, batch_size):
                    batch = texts[j:j+batch_size]
                    futures.append((len(batch), analyzers[i].analyze_sentiments.remote(batch)))
        
            results = []
            time_per_batch = []
            for batch_size, future in futures:
                batch_results, batch_time, _ = ray.get(future)
                results.extend(batch_results)
                time_per_batch.append((batch_size, batch_time))
            
            total_time = time.time() - start_time
            average_time_per_text = total_time / len(texts) if texts else 0
            return results, total_time, average_time_per_text, time_per_batch, num_workers, len(analyzers) * cpu_per_worker
        
        # Distribute and process the analysis
        results, total_time, average_time_per_text, time_per_batch, _, _ = distribute_analysis(
            All_Beauty_sampled_texts, model_path, num_workers, batch_size
        )
        
        # Save results to DataFrame and CSV only if not done already
        if not result_df_initialized:
            df = pd.DataFrame(results, columns=['Text', 'Sentiment', 'Confidence'])
            df.to_csv('allbeauty_results.csv', index=False)
            result_df_initialized = True
        
        # Prepare timing data for CSV
        timing_data = {
            'Total Time (s)': [total_time],
            'Average Time per Text (s)': [average_time_per_text],
            'Total Workers': [num_workers],
            'CPU per Worker': [cpu_per_worker],
            'Batch Size': [batch_size]
        }
        
        # Convert time_per_batch into a DataFrame and merge with timing_data
        batch_timing_df = pd.DataFrame(time_per_batch, columns=['Batch Size', 'Time per Batch'])
        for column, value in timing_data.items():
            batch_timing_df[column] = value[0]  # Add the same value to all rows
        
        # Append to or create the timing CSV
        if os.path.exists('allbeauty_timing.csv'):
            with open('allbeauty_timing.csv', 'a') as f:
                batch_timing_df.to_csv(f, header=False, index=False)
        else:
            batch_timing_df.to_csv('allbeauty_timing.csv', index=False)

        print(f"Processed {len(results)} sentiment analyses with configuration: Workers={num_workers}, CPUs={cpu_per_worker}, Batch Size={batch_size}")
        print(f"Timing data appended to 'allbeauty_timing.csv'.")

Processed 19728 sentiment analyses with configuration: Workers=2, CPUs=1, Batch Size=100
Timing data appended to 'allbeauty_timing.csv'.
Processed 20213 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=100
Timing data appended to 'allbeauty_timing.csv'.
Processed 20813 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=200
Timing data appended to 'allbeauty_timing.csv'.
Processed 20916 sentiment analyses with configuration: Workers=14, CPUs=1, Batch Size=100
Timing data appended to 'allbeauty_timing.csv'.


### Musical Instruments Amazon Reviews
We continue with the Musical Instruments sampled data, running the sentiment analysis six times with each system configuration.
- A log of events and warnings is kept to ensure we are aware of any trouble when running (`senti_analy_musicalinstruments.log`)
- Timers are set to provide the runtimes for each system configuration (time data saved to `musicalinstruments_timing.csv`)
  - Total time, time per batch, average time per text
- Results from the sentiment analysis on each review text are collected (saved to `musicalinstruments_results.csv`)
  - Text content, sentiment, confidence

In [2]:
import ray
import os
from flair.models import TextClassifier
from flair.data import Sentence
import logging
import pandas as pd
import time

# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='senti_analy_musicalinstruments.log', filemode='w')

# Ensure Ray is shut down and then reinitialize
ray.shutdown()
ray.init(address='ray://172.31.28.176:10001')

# Set up key parameters
model_path = os.path.expanduser('~/.flair/models/sentiment-en-mix-distillbert_4.pt') # Path to the prebuilt sentiment analyzer on each worker

# Contains code for sentiment analysis, loops through different system configurations
if __name__ == "__main__":
    configurations = [
        {'num_workers': 2, 'cpu_per_worker': 1, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 200},
        {'num_workers': 14, 'cpu_per_worker': 1, 'batch_size': 100}       
    ]
    
    # Initialize DataFrame to store results
    result_df_initialized = False
    
    for config in configurations:
        num_workers = config['num_workers']
        cpu_per_worker = config['cpu_per_worker']
        batch_size = config['batch_size']
        
        @ray.remote(num_cpus=cpu_per_worker)
        class SentimentAnalyzer:
            def __init__(self, model_path):
                # Initialize logging inside actor
                self.logger = logging.getLogger(__name__)
                self.logger.setLevel(logging.INFO)
                handler = logging.FileHandler('senti_analy_musicalinstruments.log')
                handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
                self.logger.addHandler(handler)
                
                self.classifier = TextClassifier.load(model_path)
                self.logger.info("Model loaded successfully.")
        
            def analyze_sentiments(self, texts):
                start_time = time.time()
                results = []
                for text in texts:
                    self.logger.info(f"Processing text: {text[:15]}...")  # Log first 15 characters of text
                    sentence = Sentence(text)
                    try:
                        self.classifier.predict(sentence)
                        if sentence.labels:
                            results.append((sentence.text, sentence.labels[0].value, sentence.labels[0].score))
                            self.logger.info("Processed result.")
                        else:
                            results.append((sentence.text, 'No Prediction', 0))
                            self.logger.warning("No prediction made for sentence.")
                    except Exception as e:
                        error_message = str(e)
                        results.append((sentence.text, 'Error', error_message))
                        logging.error(f"Error processing text: {text[:15]}..., error: {error_message}")
                end_time = time.time()
                processing_time = end_time - start_time
                return results, processing_time, len(texts)
        
        def preprocess_texts(texts):
            # Remove empty strings and strip whitespace
            return [text.strip() for text in texts if text.strip()]
        
        def distribute_analysis(texts, model_path, num_workers, batch_size):
            texts = preprocess_texts(texts)
            chunk_size = len(texts) // num_workers
            analyzers = [SentimentAnalyzer.remote(model_path) for _ in range(num_workers)]
            futures = []
            
            start_time = time.time()
            for i in range(num_workers):
                start_index = i * chunk_size
                end_index = start_index + chunk_size if i != num_workers - 1 else len(texts)
                for j in range(start_index, end_index, batch_size):
                    batch = texts[j:j+batch_size]
                    futures.append((len(batch), analyzers[i].analyze_sentiments.remote(batch)))
        
            results = []
            time_per_batch = []
            for batch_size, future in futures:
                batch_results, batch_time, _ = ray.get(future)
                results.extend(batch_results)
                time_per_batch.append((batch_size, batch_time))
            
            total_time = time.time() - start_time
            average_time_per_text = total_time / len(texts) if texts else 0
            return results, total_time, average_time_per_text, time_per_batch, num_workers, len(analyzers) * cpu_per_worker
        
        # Distribute and process the analysis
        results, total_time, average_time_per_text, time_per_batch, _, _ = distribute_analysis(
            Musical_Instruments_sampled_texts, model_path, num_workers, batch_size
        )
        
        # Save results to DataFrame and CSV only if not done already
        if not result_df_initialized:
            df = pd.DataFrame(results, columns=['Text', 'Sentiment', 'Confidence'])
            df.to_csv('musicalinstruments_results.csv', index=False)
            result_df_initialized = True
        
        # Prepare timing data for CSV
        timing_data = {
            'Total Time (s)': [total_time],
            'Average Time per Text (s)': [average_time_per_text],
            'Total Workers': [num_workers],
            'CPU per Worker': [cpu_per_worker],
            'Batch Size': [batch_size]
        }
        
        # Convert time_per_batch into a DataFrame and merge with timing_data
        batch_timing_df = pd.DataFrame(time_per_batch, columns=['Batch Size', 'Time per Batch'])
        for column, value in timing_data.items():
            batch_timing_df[column] = value[0]  # Add the same value to all rows
        
        # Append to or create the timing CSV
        if os.path.exists('musicalinstruments_timing.csv'):
            with open('musicalinstruments_timing.csv', 'a') as f:
                batch_timing_df.to_csv(f, header=False, index=False)
        else:
            batch_timing_df.to_csv('musicalinstruments_timing.csv', index=False)

        print(f"Processed {len(results)} sentiment analyses with configuration: Workers={num_workers}, CPUs={cpu_per_worker}, Batch Size={batch_size}")
        print(f"Timing data appended to 'musicalinstruments_timing.csv'.")



Processed 86336 sentiment analyses with configuration: Workers=2, CPUs=1, Batch Size=100
Timing data appended to 'musicalinstruments_timing.csv'.
Processed 86727 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=100
Timing data appended to 'musicalinstruments_timing.csv'.
Processed 86727 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=200
Timing data appended to 'musicalinstruments_timing.csv'.
Processed 86765 sentiment analyses with configuration: Workers=14, CPUs=1, Batch Size=100
Timing data appended to 'musicalinstruments_timing.csv'.


### Video Games Amazon Reviews
We continue with the Video Games sampled data, running the sentiment analysis six times with each system configuration.
- A log of events and warnings is kept to ensure we are aware of any trouble when running (`senti_analy_videogames.log`)
- Timers are set to provide the runtimes for each system configuration (time data saved to `videogames_timing.csv`)
  - Total time, time per batch, average time per text
- Results from the sentiment analysis on each review text are collected (saved to `videogames_results.csv`)
  - Text content, sentiment, confidence

In [3]:
import ray
import os
from flair.models import TextClassifier
from flair.data import Sentence
import logging
import pandas as pd
import time

# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='senti_analy_videogames.log', filemode='w')

# Ensure Ray is shut down and then reinitialize
ray.shutdown()
ray.init(address='ray://172.31.28.176:10001')

# Set up key parameters
model_path = os.path.expanduser('~/.flair/models/sentiment-en-mix-distillbert_4.pt') # Path to the prebuilt sentiment analyzer on each worker

# Contains code for sentiment analysis, loops through different system configurations
if __name__ == "__main__":
    configurations = [
        {'num_workers': 2, 'cpu_per_worker': 1, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 100},
        {'num_workers': 7, 'cpu_per_worker': 2, 'batch_size': 200},
        {'num_workers': 14, 'cpu_per_worker': 1, 'batch_size': 100}       
    ]
    
    # Initialize DataFrame to store results
    result_df_initialized = False
    
    for config in configurations:
        num_workers = config['num_workers']
        cpu_per_worker = config['cpu_per_worker']
        batch_size = config['batch_size']
        
        @ray.remote(num_cpus=cpu_per_worker)
        class SentimentAnalyzer:
            def __init__(self, model_path):
                # Initialize logging inside actor
                self.logger = logging.getLogger(__name__)
                self.logger.setLevel(logging.INFO)
                handler = logging.FileHandler('senti_analy_videogames.log')
                handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
                self.logger.addHandler(handler)
                
                self.classifier = TextClassifier.load(model_path)
                self.logger.info("Model loaded successfully.")
        
            def analyze_sentiments(self, texts):
                start_time = time.time()
                results = []
                for text in texts:
                    self.logger.info(f"Processing text: {text[:15]}...")  # Log first 15 characters of text
                    sentence = Sentence(text)
                    try:
                        self.classifier.predict(sentence)
                        if sentence.labels:
                            results.append((sentence.text, sentence.labels[0].value, sentence.labels[0].score))
                            self.logger.info("Processed result.")
                        else:
                            results.append((sentence.text, 'No Prediction', 0))
                            self.logger.warning("No prediction made for sentence.")
                    except Exception as e:
                        error_message = str(e)
                        results.append((sentence.text, 'Error', error_message))
                        logging.error(f"Error processing text: {text[:15]}..., error: {error_message}")
                end_time = time.time()
                processing_time = end_time - start_time
                return results, processing_time, len(texts)
        
        def preprocess_texts(texts):
            # Remove empty strings and strip whitespace
            return [text.strip() for text in texts if text.strip()]
        
        def distribute_analysis(texts, model_path, num_workers, batch_size):
            texts = preprocess_texts(texts)
            chunk_size = len(texts) // num_workers
            analyzers = [SentimentAnalyzer.remote(model_path) for _ in range(num_workers)]
            futures = []
            
            start_time = time.time()
            for i in range(num_workers):
                start_index = i * chunk_size
                end_index = start_index + chunk_size if i != num_workers - 1 else len(texts)
                for j in range(start_index, end_index, batch_size):
                    batch = texts[j:j+batch_size]
                    futures.append((len(batch), analyzers[i].analyze_sentiments.remote(batch)))
        
            results = []
            time_per_batch = []
            for batch_size, future in futures:
                batch_results, batch_time, _ = ray.get(future)
                results.extend(batch_results)
                time_per_batch.append((batch_size, batch_time))
            
            total_time = time.time() - start_time
            average_time_per_text = total_time / len(texts) if texts else 0
            return results, total_time, average_time_per_text, time_per_batch, num_workers, len(analyzers) * cpu_per_worker
        
        # Distribute and process the analysis
        results, total_time, average_time_per_text, time_per_batch, _, _ = distribute_analysis(
            Video_Games_sampled_texts, model_path, num_workers, batch_size
        )
        
        # Save results to DataFrame and CSV only if not done already
        if not result_df_initialized:
            df = pd.DataFrame(results, columns=['Text', 'Sentiment', 'Confidence'])
            df.to_csv('videogames_results.csv', index=False)
            result_df_initialized = True
        
        # Prepare timing data for CSV
        timing_data = {
            'Total Time (s)': [total_time],
            'Average Time per Text (s)': [average_time_per_text],
            'Total Workers': [num_workers],
            'CPU per Worker': [cpu_per_worker],
            'Batch Size': [batch_size]
        }
        
        # Convert time_per_batch into a DataFrame and merge with timing_data
        batch_timing_df = pd.DataFrame(time_per_batch, columns=['Batch Size', 'Time per Batch'])
        for column, value in timing_data.items():
            batch_timing_df[column] = value[0]  # Add the same value to all rows
        
        # Append to or create the timing CSV
        if os.path.exists('videogames_timing.csv'):
            with open('videogames_timing.csv', 'a') as f:
                batch_timing_df.to_csv(f, header=False, index=False)
        else:
            batch_timing_df.to_csv('videogames_timing.csv', index=False)

        print(f"Processed {len(results)} sentiment analyses with configuration: Workers={num_workers}, CPUs={cpu_per_worker}, Batch Size={batch_size}")
        print(f"Timing data appended to 'videogames_timing.csv'.")

SIGTERM handler is not set because current thread is not the main thread.


Processed 68555 sentiment analyses with configuration: Workers=2, CPUs=1, Batch Size=100
Timing data appended to 'videogames_timing.csv'.
Processed 68588 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=100
Timing data appended to 'videogames_timing.csv'.
Processed 68588 sentiment analyses with configuration: Workers=7, CPUs=2, Batch Size=200
Timing data appended to 'videogames_timing.csv'.
Processed 68601 sentiment analyses with configuration: Workers=14, CPUs=1, Batch Size=100
Timing data appended to 'videogames_timing.csv'.


In [4]:
ray.shutdown()