## Parallel Logistic Regression Model for Spam Detection of Amazon "Sports and Outdoors" Product Reviews

### This notebook is to implement task parallelism, along with the current implementation of data parallelism

In [15]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from scipy.sparse import hstack, csr_matrix
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
import time


In [2]:
import warnings

# Suppress all FutureWarnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [None]:
# Load dataset
data = pd.read_json('~/Documents/Sports_and_Outdoors/Sports_and_Outdoors.json', lines=True)
data.head()

Here, the input features will be: `reviewText`, `overall`, `summary`, and `helpful`
The predictor will be `class`, which indicates whether the review is spam (1), or not spam (0)

The first element of the `helpful` feature is extracted, indicating the number of users that found that review helpful.

In [None]:
# Extract the relevant columns
data = data[['reviewText', 'overall', 'summary', 'helpful', 'class']]

# Clean the 'helpful' column: extract the first element of the list - num of helpful votes
data['helpful'] = data['helpful'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else 0)

# Check cleaned data
data.head()

In [5]:
# Split dataset into training and testing
X_train, X_test, y_train, y_test = train_test_split(data[['reviewText', 'overall', 'summary', 'helpful']], 
                                                    data['class'], test_size=0.2, random_state=42, shuffle=True)

We must convert text features (`reviewText` & `summary`) into numerical vectors suitable for ML training.

Here, we define separate functions for vectorizing and scaling our text features, into formats suitable for machine learning processing. 
These separate functions will be run in parallel, to vectorize and scale each of these features in parallel.

In [6]:

# Function to vectorize 'reviewText' feature
def vectorize_review_text(X_train, X_test):
    vectorizer_review = TfidfVectorizer(max_features=5000)
    X_train_review_tfidf = vectorizer_review.fit_transform(X_train['reviewText'])
    X_test_review_tfidf = vectorizer_review.transform(X_test['reviewText'])
    return X_train_review_tfidf, X_test_review_tfidf

# Function to vectorize 'summary' feature
def vectorize_summary(X_train, X_test):
    vectorizer_summary = TfidfVectorizer(max_features=1000)
    X_train_summary_tfidf = vectorizer_summary.fit_transform(X_train['summary'])
    X_test_summary_tfidf = vectorizer_summary.transform(X_test['summary'])
    return X_train_summary_tfidf, X_test_summary_tfidf

# Function to scale 'overall' and 'helpful' features
def scale_numerical_features(X_train, X_test):
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train[['overall', 'helpful']])
    X_test_scaled = scaler.transform(X_test[['overall', 'helpful']])
    return X_train_scaled, X_test_scaled

Here, the task parallelism is implemented, where we will use `ThreadPoolExecutor` to perform our preprocessing tasks in parallel.

In [None]:
# Start timer for parallel data preprocessing
start_time = time.time()

#  Execute tasks in parallel using ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
    future_review = executor.submit(vectorize_review_text, X_train, X_test)
    future_summary = executor.submit(vectorize_summary, X_train, X_test)
    future_scaling = executor.submit(scale_numerical_features, X_train, X_test)


    X_train_review_tfidf, X_test_review_tfidf = future_review.result()
    X_train_summary_tfidf, X_test_summary_tfidf = future_summary.result()
    X_train_scaled, X_test_scaled = future_scaling.result()

# Stop timer
end_time = time.time()
print(f"Data preprocessing (task parallelism) completed in: {end_time - start_time:.2f} seconds")


The preprocessed features are then combined and checked to ensure that they are appropriate dimensions and formats to train on.

In [None]:

# Combine training and testing features
X_train_combined = hstack([X_train_review_tfidf, X_train_summary_tfidf, csr_matrix(X_train_scaled)])
X_test_combined = hstack([X_test_review_tfidf, X_test_summary_tfidf, csr_matrix(X_test_scaled)])


# Check final shapes to make sure dimensions are compatible
print(f"Shape of X_train_combined: {X_train_combined.shape}")
print(f"Shape of X_test_combined: {X_test_combined.shape}")


# Convert combined matrix to CSR format for slicing
X_train_combined = X_train_combined.tocsr()

Here, we wil define and split the data into `num_chunks` chunks, where `num_chunks` can be equal to the number of CPU cores or be manually chosen.

In [None]:
# Define number of data chunks
num_chunks = mp.cpu_count()
print(f"Number of chunks utilized: {num_chunks}")

# Calculate chunk size
chunk_size = X_train_combined.shape[0] // num_chunks

# Ensure the last chunk includes all remaining rows if the split is not even
X_train_chunks = [X_train_combined[i*chunk_size:(i+1)*chunk_size] for i in range(num_chunks-1)]
y_train_chunks = [y_train[i*chunk_size:(i+1)*chunk_size] for i in range(num_chunks-1)]

# Add remaining rows in the last chunk
X_train_chunks.append(X_train_combined[(num_chunks-1)*chunk_size:])
y_train_chunks.append(y_train[(num_chunks-1)*chunk_size:])

The defined function below will train the Logistic Regression model on each 'chunk' of the data in parallel.

In [18]:
# Define a function to train Logistic Regression on a chunk of data
def train_on_chunk(X_chunk, y_chunk):
    # Ensure data is writable
    X_chunk = X_chunk.copy()
    y_chunk = y_chunk.copy()

    model = LogisticRegression(max_iter=1000, solver='lbfgs')
    model.fit(X_chunk, y_chunk)
    return model

Below, we train the the model in parallel, and time the training time. 

In [21]:
start_time = time.time()


# Create a multiprocessing pool
with mp.Pool(processes=num_chunks) as pool:
    # Train models in parallel on each chunk of data
    models = pool.starmap(train_on_chunk, zip(X_train_chunks, y_train_chunks))

end_time = time.time()

In [None]:
# Calculate the parallel training time
parallel_training_time = end_time - start_time
print(f"Parallel Training Time: {parallel_training_time:.2f} seconds")