In [3]:
import pandas as pd
import re

# Load the dataset
data_path = r""
data = pd.read_csv(data_path)

# Define the preprocess_text function
def preprocess_text(text):
    if isinstance(text, str):  # Ensure the text is a string
        text = text.lower()
        text = re.sub(r'[^a-z\s]', '', text)  # Remove non-alphabetic characters
        return text
    return ""  # Return an empty string for non-string values

# Apply preprocessing to the 'email' column
data['clean_text'] = data['email'].apply(preprocess_text).fillna("")

# Balancing the dataset
spam = data[data['label'] == 1]
not_spam = data[data['label'] == 0]

# Balance the data by undersampling
min_count = min(len(spam), len(not_spam))
balanced_data = pd.concat([spam.sample(min_count), not_spam.sample(min_count)]).reset_index(drop=True)

print(balanced_data.head())


                                               email  label  \
0   事業者 氏名 vip mail 突然のメール失礼いたします 今後この広告がご不要な方はその...      1   
1  there is no stumbling on to it the greatest wa...      1   
2  we thought you may be interested in our new so...      1   
3   take control of your computer with this top o...      1   
4  me and my friends have this brand new idea a l...      1   

                                          clean_text  
0     vip mail   stop vip url  vip mail vip mail ...  
1  there is no stumbling on to it the greatest wa...  
2  we thought you may be interested in our new so...  
3   take control of your computer with this top o...  
4  me and my friends have this brand new idea a l...  


In [7]:
import numpy as np
from collections import Counter

# Tokenize the clean_text column
data['tokens'] = data['clean_text'].apply(lambda x: x.split())

# Build the vocabulary
vocab = Counter(word for tokens in data['tokens'] for word in tokens)
vocab_size = len(vocab)
word_to_index = {word: i for i, word in enumerate(vocab.keys())}
index_to_word = {i: word for word, i in word_to_index.items()}

# Hyperparameters
embedding_size = 10  # Embedding size for Word2Vec
window_size = 2  # Context window size
learning_rate = 0.01
epochs = 5
negative_samples = 5

# Initialize embeddings
word_embeddings = np.random.randn(vocab_size, embedding_size)
context_embeddings = np.random.randn(vocab_size, embedding_size)

# Generate training data for Word2Vec
def generate_training_data(tokens, window_size):
    training_data = []
    for sentence in tokens:
        for i, word in enumerate(sentence):
            target = word_to_index[word]
            context = []
            for j in range(-window_size, window_size + 1):
                if j != 0 and 0 <= i + j < len(sentence):
                    context.append(word_to_index[sentence[i + j]])
            training_data.append((target, context))
    return training_data

training_data = generate_training_data(data['tokens'], window_size)

# Negative sampling function
def negative_sampling(vocab_size, positive_indices, num_samples):
    negatives = []
    while len(negatives) < num_samples:
        neg = np.random.randint(0, vocab_size)
        if neg not in positive_indices:
            negatives.append(neg)
    return negatives

# Training the Word2Vec model
for epoch in range(epochs):
    loss = 0
    for target, context in training_data:
        positive_indices = context
        negative_indices = negative_sampling(vocab_size, positive_indices, negative_samples)
        
        # Calculate gradients and update weights
        for context_word in positive_indices + negative_indices:
            label = 1 if context_word in positive_indices else 0
            dot_product = np.dot(word_embeddings[target], context_embeddings[context_word])
            prediction = 1 / (1 + np.exp(-dot_product))
            error = label - prediction
            
            # Update weights
            word_embeddings[target] += learning_rate * error * context_embeddings[context_word]
            context_embeddings[context_word] += learning_rate * error * word_embeddings[target]
            
            loss += -label * np.log(prediction + 1e-9) - (1 - label) * np.log(1 - prediction + 1e-9)
    
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss:.4f}")

# Save the final word embeddings
word_to_vec = {word: word_embeddings[word_to_index[word]] for word in vocab.keys()}


Epoch 1/5, Loss: 4993843.3960
Epoch 2/5, Loss: 3688138.3942
Epoch 3/5, Loss: 2841609.8187
Epoch 4/5, Loss: 2303532.5609
Epoch 5/5, Loss: 2108654.7788


In [9]:
# Neural Network
class NeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size):
        self.weights_input_hidden = np.random.randn(input_size, hidden_size) * 0.01
        self.weights_hidden_output = np.random.randn(hidden_size, output_size) * 0.01
        self.bias_hidden = np.zeros((1, hidden_size))
        self.bias_output = np.zeros((1, output_size))
    
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def sigmoid_derivative(self, x):
        return x * (1 - x)
    
    def forward(self, x):
        self.hidden_input = np.dot(x, self.weights_input_hidden) + self.bias_hidden
        self.hidden_output = self.sigmoid(self.hidden_input)
        self.final_input = np.dot(self.hidden_output, self.weights_hidden_output) + self.bias_output
        self.final_output = self.sigmoid(self.final_input)
        return self.final_output
    
    def backward(self, x, y, output, learning_rate):
        output_error = y - output
        output_delta = output_error * self.sigmoid_derivative(output)
        
        hidden_error = np.dot(output_delta, self.weights_hidden_output.T)
        hidden_delta = hidden_error * self.sigmoid_derivative(self.hidden_output)
        
        # Update weights and biases
        self.weights_hidden_output += np.dot(self.hidden_output.T, output_delta) * learning_rate
        self.weights_input_hidden += np.dot(x.T, hidden_delta) * learning_rate
        self.bias_output += np.sum(output_delta, axis=0, keepdims=True) * learning_rate
        self.bias_hidden += np.sum(hidden_delta, axis=0, keepdims=True) * learning_rate

# Prepare data for the neural network
def prepare_input_data(tokens, word_to_vec, max_words=12, embedding_size=10):
    inputs = []
    for sentence in tokens:
        embeddings = [word_to_vec[word] for word in sentence if word in word_to_vec]
        if len(embeddings) < max_words:
            embeddings += [np.zeros(embedding_size)] * (max_words - len(embeddings))
        inputs.append(np.array(embeddings[:max_words]).flatten())
    return np.array(inputs)

X = prepare_input_data(data['tokens'], word_to_vec)
y = data['label'].values.reshape(-1, 1)

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train the neural network
input_size = X_train.shape[1]
hidden_size = 8
output_size = 1
learning_rate = 0.01
epochs = 50

nn = NeuralNetwork(input_size, hidden_size, output_size)

for epoch in range(epochs):
    output = nn.forward(X_train)
    nn.backward(X_train, y_train, output, learning_rate)
    loss = np.mean((y_train - output) ** 2)
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss:.4f}")


Epoch 1/50, Loss: 0.2520
Epoch 2/50, Loss: 0.1677
Epoch 3/50, Loss: 0.1673
Epoch 4/50, Loss: 0.1664
Epoch 5/50, Loss: 0.1640
Epoch 6/50, Loss: 0.1574
Epoch 7/50, Loss: 0.1480
Epoch 8/50, Loss: 0.1440
Epoch 9/50, Loss: 0.1426
Epoch 10/50, Loss: 0.1422
Epoch 11/50, Loss: 0.1421
Epoch 12/50, Loss: 0.1420
Epoch 13/50, Loss: 0.1420
Epoch 14/50, Loss: 0.1420
Epoch 15/50, Loss: 0.1419
Epoch 16/50, Loss: 0.1419
Epoch 17/50, Loss: 0.1419
Epoch 18/50, Loss: 0.1419
Epoch 19/50, Loss: 0.1418
Epoch 20/50, Loss: 0.1418
Epoch 21/50, Loss: 0.1417
Epoch 22/50, Loss: 0.1417
Epoch 23/50, Loss: 0.1417
Epoch 24/50, Loss: 0.1416
Epoch 25/50, Loss: 0.1416
Epoch 26/50, Loss: 0.1415
Epoch 27/50, Loss: 0.1415
Epoch 28/50, Loss: 0.1414
Epoch 29/50, Loss: 0.1413
Epoch 30/50, Loss: 0.1413
Epoch 31/50, Loss: 0.1412
Epoch 32/50, Loss: 0.1411
Epoch 33/50, Loss: 0.1410
Epoch 34/50, Loss: 0.1408
Epoch 35/50, Loss: 0.1407
Epoch 36/50, Loss: 0.1405
Epoch 37/50, Loss: 0.1403
Epoch 38/50, Loss: 0.1401
Epoch 39/50, Loss: 0.

In [25]:
import pandas as pd
import numpy as np
import re
from collections import Counter
from math import log2

# Function to preprocess and clean the text
def preprocess_text(text):
    if not isinstance(text, str):  # Check if the text is not a string (e.g., NaN, float)
        return ''  # Return an empty string or handle it as needed
    text = text.lower()  # Convert text to lowercase
    text = re.sub(r'[^a-z\s]', '', text)  # Remove non-alphabetic characters (only keep letters and spaces)
    return text

# Load the dataset
data = pd.read_csv(r"")  # Update with your actual path

# Preprocess the 'email' column in the dataset
data['clean_text'] = data['email'].apply(preprocess_text)  # Clean the text in the 'email' column

# Split the dataset into features (X) and target labels (y)
X = data['clean_text']  # Features (cleaned text)
y = data['label']  # Target labels (0 for ham, 1 for spam)

# Split the data into training and testing sets manually
# Randomly shuffle the data and split
np.random.seed(42)
indices = np.random.permutation(len(X))
train_size = int(0.8 * len(X))
train_indices, test_indices = indices[:train_size], indices[train_size:]

X_train, X_test = X.iloc[train_indices], X.iloc[test_indices]
y_train, y_test = y.iloc[train_indices], y.iloc[test_indices]

# Manual Text Vectorization: Bag of Words
def build_vocab(corpus):
    vocab = Counter()
    for text in corpus:
        vocab.update(text.split())
    return vocab

def vectorize_text(text, vocab):
    # Convert text into a vector based on the vocabulary (Bag of Words)
    text_vector = np.zeros(len(vocab))
    words = text.split()
    word_counts = Counter(words)
    
    for idx, word in enumerate(vocab):
        if word in word_counts:
            text_vector[idx] = word_counts[word]
    
    return text_vector

# Build vocabulary on the training data
vocab = build_vocab(X_train)

# Vectorize the training and test data
X_train_vec = np.array([vectorize_text(text, vocab) for text in X_train])
X_test_vec = np.array([vectorize_text(text, vocab) for text in X_test])

# Implementing a simple Decision Tree Classifier (for binary classification)
class DecisionTree:
    def __init__(self, max_depth=5):
        self.max_depth = max_depth
        self.tree = None

    def fit(self, X, y):
        self.tree = self._build_tree(X, y)
        
    def _build_tree(self, X, y, depth=0):
        if len(set(y)) == 1:  # If all labels are the same# Instead of y[0], use y.iloc[0] or y.values[0] (for NumPy array)
            return y.iloc[0]  # or return y.values[0]

        if depth >= self.max_depth:  # Max depth reached
            return self._majority_vote(y)
        
        best_split = self._find_best_split(X, y)
        if not best_split:
            return self._majority_vote(y)
        
        left_indices = X[:, best_split['feature']] <= best_split['value']
        right_indices = ~left_indices
        
        left_tree = self._build_tree(X[left_indices], y[left_indices], depth + 1)
        right_tree = self._build_tree(X[right_indices], y[right_indices], depth + 1)
        
        return {
            'feature': best_split['feature'],
            'value': best_split['value'],
            'left': left_tree,
            'right': right_tree
        }

    def _find_best_split(self, X, y):
        best_gini = float('inf')
        best_split = None
        n_features = X.shape[1]
        
        for feature in range(n_features):
            values = np.unique(X[:, feature])
            for value in values:
                left_indices = X[:, feature] <= value
                right_indices = ~left_indices
                left_y, right_y = y[left_indices], y[right_indices]
                
                gini_left = self._gini_impurity(left_y)
                gini_right = self._gini_impurity(right_y)
                
                gini = (len(left_y) * gini_left + len(right_y) * gini_right) / len(y)
                
                if gini < best_gini:
                    best_gini = gini
                    best_split = {'feature': feature, 'value': value}
        
        return best_split
    
    def _gini_impurity(self, y):
        classes = np.unique(y)
        impurity = 1
        for c in classes:
            prob = np.sum(y == c) / len(y)
            impurity -= prob**2
        return impurity
    
    def _majority_vote(self, y):
        return np.bincount(y).argmax()
    
    def predict(self, X):
        return [self._predict_single(x, self.tree) for x in X]

    def _predict_single(self, x, tree):
        if isinstance(tree, dict):
            if x[tree['feature']] <= tree['value']:
                return self._predict_single(x, tree['left'])
            else:
                return self._predict_single(x, tree['right'])
        return tree

# Train the DecisionTree model
model = DecisionTree(max_depth=10)
model.fit(X_train_vec, y_train)

# Make predictions on the test data
y_pred = model.predict(X_test_vec)

# Evaluate the model's performance
accuracy = np.mean(y_pred == y_test)
print(f"Accuracy: {accuracy * 100:.2f}%")

# Print a simple classification report
from collections import Counter
def classification_report_manual(y_true, y_pred):
    # Calculate Precision, Recall, F1-Score for both classes
    report = {}
    labels = np.unique(y_true)
    for label in labels:
        tp = sum((y_true == label) & (y_pred == label))
        fp = sum((y_true != label) & (y_pred == label))
        fn = sum((y_true == label) & (y_pred != label))
        tn = sum((y_true != label) & (y_pred != label))
        
        precision = tp / (tp + fp) if tp + fp > 0 else 0
        recall = tp / (tp + fn) if tp + fn > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
        
        report[label] = {
            'precision': precision,
            'recall': recall,
            'f1_score': f1_score,
            'support': tp + fn
        }
    return report

report = classification_report_manual(y_test, y_pred)
print("\nClassification Report:")
for label, metrics in report.items():
    print(f"Label {label}:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.4f}")


Accuracy: 96.00%

Classification Report:
Label 0:
  precision: 0.9619
  recall: 0.9897
  f1_score: 0.9756
  support: 485.0000
Label 1:
  precision: 0.9505
  recall: 0.8348
  f1_score: 0.8889
  support: 115.0000
