In [None]:
import random
import torch
import torch.nn as nn
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import numpy as np 


In [None]:
# Load the dataset
data = pd.read_excel('/Users/achaudhari/Desktop/Book Prediction New/out/Training.xlsx')
   
# Use get_dummies to convert 'genre' into binary data
data = pd.get_dummies(data, columns=['Genre','Author'])

In [None]:
data['Total Reviews'] = data['Total Reviews'].astype(str).str.replace(',', '').astype(int)
data['Length'] = data['Length'].astype(str).str.replace(',', '').astype(float)

# Define features and target variable
X = data[['Ratings out of 5','Price','Length'] 
        + [col for col in data.columns if 'Author_' in col]
        + [col for col in data.columns if 'Genre_' in col]
    ]

data['is_bestseller'] = (data['Best Seller Rank'] <= 10000).astype(float)

# Now use 'is_bestseller' as the target variable
y = data['is_bestseller']

In [None]:
# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True)

# Normalize features
scaler = StandardScaler()
features_to_scale = ['Ratings out of 5','Price','Length']
X_train[features_to_scale] = scaler.fit_transform(X_train[features_to_scale])
X_test[features_to_scale] = scaler.transform(X_test[features_to_scale])

In [None]:
# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train.to_numpy().astype(np.float32))
y_train_tensor = torch.tensor(y_train.to_numpy().astype(np.float32))
X_test_tensor = torch.tensor(X_test.to_numpy().astype(np.float32))
y_test_tensor = torch.tensor(y_test.to_numpy().astype(np.float32))

# Create datasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)


In [None]:
# Create data loaders
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)

In [None]:
# Define the logistic regression model
class LogisticRegressionModel(nn.Module):
    def __init__(self, input_dim):
        super(LogisticRegressionModel, self).__init__()
        self.linear = nn.Linear(input_dim, 1)
    
    def forward(self, x):
        outputs = torch.sigmoid(self.linear(x))
        return outputs.squeeze()


In [None]:
# Instantiate the model
input_dim = X_train.shape[1]
model = LogisticRegressionModel(input_dim)

# Define loss function and optimizer
criterion = nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.005)

# Model Training

In [None]:
# Train the model
num_epochs = 1000
for epoch in range(num_epochs):
    for inputs, y_true in train_loader:
        y_pred = model(inputs)

        loss = criterion(y_pred, y_true)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


# Testing Model

In [None]:
# Test the model
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        outputs = model(inputs)
        
        predicted = (outputs >= 0.5).float()

        correct += (predicted == labels).sum().item()

        total += labels.size(0)

accuracy = correct / total
print(f'Accuracy of the model on the test set: {accuracy:.4f}')


# Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Function to get predictions from the PyTorch model
def get_predictions(model, data_loader):
    model.eval()  # Set the model to evaluation mode
    predictions = []
    with torch.no_grad():
        for inputs, _ in data_loader:
            outputs = model(inputs)
            predicted = (outputs >= 0.5).float()  # Apply threshold to get binary predictions
            predictions.extend(predicted.tolist())
    return predictions

# Get predictions
y_pred = get_predictions(model, test_loader)

# Since y_test is a numpy array, ensure y_pred is too
y_pred = np.array(y_pred)

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plotting the confusion matrix
plt.figure(figsize=(10,7))
sns.heatmap(cm, annot=True, fmt='d')
plt.xlabel('Predicted')
plt.ylabel('Truth')
plt.title('Confusion Matrix')
plt.show()


# Distribution of Books and Authors

In [None]:
import matplotlib.pyplot as plt

# Assuming your authors were one-hot encoded like 'Author_Name'
# Extracting columns that are authors
author_columns = [col for col in data.columns if 'Author_' in col]

# Count the number of times each author appears in the dataset
author_counts = data[author_columns].sum()

# Create a frequency distribution of the number of books per author
freq_distribution = author_counts.value_counts()

# Plotting
plt.figure(figsize=(12, 6))
freq_distribution.plot(kind='bar')
plt.title('Frequency Distribution of Number of Books per Author')
plt.xlabel('Number of Books')
plt.ylabel('Number of Authors')
plt.show()


# Training and Testing Loss Curve

In [None]:
import matplotlib.pyplot as plt

# Define the logistic regression model with dropout
class LogisticRegressionModel(nn.Module):
    def __init__(self, input_dim):
        super(LogisticRegressionModel, self).__init__()
        self.linear = nn.Linear(input_dim, 1)
        self.dropout = nn.Dropout(p=0.5)  # 50% dropout

    def forward(self, x):
        x = self.dropout(x)
        outputs = torch.sigmoid(self.linear(x))
        return outputs.squeeze()

# Instantiate the model with dropout
model = LogisticRegressionModel(input_dim)

# Define loss function and optimizer with L2 regularization (weight decay)
optimizer = torch.optim.SGD(model.parameters(), lr=0.005, weight_decay=1e-4)

# Lists to store losses and implement early stopping
train_losses = []
test_losses = []
early_stopping_patience = 10
min_test_loss = np.Inf
patience_counter = 0

# Train the model with early stopping
num_epochs = 1000
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0
    for inputs, y_true in train_loader:
        y_pred = model(inputs)
        loss = criterion(y_pred, y_true)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()
    avg_train_loss = total_train_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    model.eval()
    total_test_loss = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_test_loss += loss.item()
    avg_test_loss = total_test_loss / len(test_loader)
    test_losses.append(avg_test_loss)

    if avg_test_loss < min_test_loss:
        min_test_loss = avg_test_loss
        patience_counter = 0  # reset counter if test loss has decreased
    else:
        patience_counter += 1  # increase counter if test loss has not decreased

    if (epoch+1) % 100 == 0 or patience_counter == early_stopping_patience:
        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {avg_train_loss:.4f}, Testing Loss: {avg_test_loss:.4f}')

    if patience_counter == early_stopping_patience:
        print("Early stopping triggered.")
        break

# Plotting the training and testing losses
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(test_losses, label='Testing Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Testing Loss Over Epochs')
plt.legend()
plt.show()


# Model Prediction

In [None]:
# New book data (example)
new_book_data = pd.DataFrame([{
    'Total Reviews': 30308,
    'Ratings out of 5': 4.5,
    'Price': 21.14,
    'Length': 368,
    # Add appropriate 'Genre_' and 'Author_' columns with 0/1 values
    # based on the genre and author of the new book
    'Genre_Womens Fiction': 1,
    # ... other genre columns set to 0
    'Author_Andrzej Sapkowski': 1,
    'Author_Danusia Stok': 1
    # ... other author columns set to 0
}])

# Use the same dummy variable encoding as used for the training data
# For the columns not present in new_book_data, set them to 0
for column in X_train.columns:
    if column not in new_book_data.columns:
        new_book_data[column] = 0

# Ensure the order of columns matches that of the training data
new_book_data = new_book_data[X_train.columns]

# Normalize features
new_book_data[features_to_scale] = scaler.transform(new_book_data[features_to_scale])

# Convert to PyTorch tensor
new_book_tensor = torch.tensor(new_book_data.to_numpy().astype(np.float32))

# Prediction
with torch.no_grad():
    predicted_prob = model(new_book_tensor)
    predicted_class = (predicted_prob >= 0.5).float()

print(f"Predicted Probability: {predicted_prob.item()}")
print(f"Predicted Class (1: Bestseller, 0: Not Bestseller): {predicted_class.item()}")


# Sensitivity Analysis when prices increase

In [None]:
def plot_price_sensitivity(model, X_base, feature_name='Price', range_values=(0, 100), steps=100):
    price_values = np.linspace(*range_values, steps)
    predictions = []

    # Get the mean and std for 'Price' from the scaler
    price_mean = scaler.mean_[X_train.columns.get_loc(feature_name)]
    price_std = scaler.scale_[X_train.columns.get_loc(feature_name)]

    for price in price_values:
        # Manually apply scaling
        scaled_price = (price - price_mean) / price_std
        X_temp = X_base.copy()
        X_temp[feature_name] = scaled_price
        with torch.no_grad():
            prediction = model(torch.tensor(X_temp.astype(np.float32)))
            predictions.append(prediction.numpy())

    plt.plot(price_values, predictions)
    plt.xlabel('Price')
    plt.ylabel('Probability of Being Bestseller')
    plt.title('Sensitivity Analysis for Price')
    plt.show()

# Base input - mean values for all features except 'Price'
X_base = X_train.mean()
plot_price_sensitivity(model, X_base)


# Sensitivity Analysis for Genre

In [None]:
def plot_genre_sensitivity_line(model, X_base, genre_columns):
    predictions = []
    genre_indices = list(range(len(genre_columns)))  # For x-axis

    for genre in genre_columns:
        X_temp = X_base.copy()
        X_temp[genre_columns] = 0  # Reset all genres to 0
        X_temp[genre] = 1  # Activate current genre
        with torch.no_grad():
            prediction = model(torch.tensor(X_temp.astype(np.float32)))
            predictions.append(prediction.numpy())

    plt.plot(genre_indices, predictions, marker='o')
    plt.xlabel('Genre')
    plt.ylabel('Probability of Being Bestseller')
    plt.title('Sensitivity Analysis for Genre')
    plt.xticks(genre_indices, genre_columns, rotation=90)
    plt.show()

# Base input - mean values for all features except genres
X_base = X_train.mean()
genre_columns = [col for col in X_train.columns if 'Genre_' in col]
plot_genre_sensitivity_line(model, X_base, genre_columns)
