In [None]:
import numpy as np
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine, text
import pandas as pd
import json
from naivebayes import NaiveBayesTextClassifier
from sklearn.metrics import classification_report
from sklearn.feature_extraction.text import CountVectorizer


### OLD CODE

In [None]:
load_dotenv()

In [None]:
username = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
account = os.getenv('SNOWFLAKE_ACCOUNT')
warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
database = os.getenv('SNOWFLAKE_DATABASE')
schema = os.getenv('SNOWFLAKE_SCHEMA')

In [None]:
engine = create_engine(
    f'snowflake://{username}:{password}@{account}/{database}/{schema}?warehouse={warehouse}'
)

In [None]:
query = "SELECT * FROM MODEL_PARAMETERS"

In [None]:
with engine.connect() as connection:
    result = connection.execute(text(query))
    rows = result.fetchall()

In [None]:
trained = pd.DataFrame(rows)

### END

In [None]:
train = pd.read_parquet('yelp_review_full/yelp_review_full/train-00000-of-00001.parquet')

In [None]:
train = train[:1000]

In [None]:
test = pd.read_parquet('yelp_review_full/yelp_review_full/test-00000-of-00001.parquet')

In [None]:
train = train[train['label'].isin([0,1])]

In [None]:
test = test[test['label'].isin([0,1])]

In [None]:
classifier = NaiveBayesTextClassifier()
classifier.fit(train['text'], train['label'])

In [None]:
classifier.word_probabilities

In [None]:
y_pred = classifier.predict(test['text'])

In [None]:
y_true = list(test['label'][:100])

In [None]:
print(classification_report(y_true, y_pred))


### Sklearn Implementation

In [None]:
from sklearn.naive_bayes import GaussianNB

vectorizer = CountVectorizer()
training_data = vectorizer.fit_transform(train['text'][:1000])
test_data = vectorizer.transform(test['text'][:100])

nb_classifier = GaussianNB()

nb_classifier.fit(training_data.toarray(), list(train['label'][:1000]))

y_pred = nb_classifier.predict(test_data.toarray())

print("\nClassification Report:\n", classification_report(y_true, y_pred))


##### Naive Bayes Python implementation

In [None]:
# priors = {}
# for i,j in Counter(train['label']).items():
#     priors[i] = j/len(train['label'])

In [None]:
# vocab_per_label_dict = {}
# for label, vector in zip(list(train['label']), X.toarray()):
#     if label in vocab_per_label_dict:
#         vocab_per_label_dict[label] += vector
#     else:
#         vocab_per_label_dict[label] = vector


In [None]:
# total_per_label = {}
# for i, j in vocab_per_label_dict.items():
#     total_per_label[i] = np.sum(j)  

In [None]:
# word_probabilities = {}
# for i,j in vocab_per_label_dict.items():
#     word_prob_list = []
#     for k in j:
#         if k != 0:
#             prob = k/total_per_label[i]
#             word_prob_list.append(prob)
#         else:
#             word_prob_list.append(0)
#     word_probabilities[i] = np.array(word_prob_list)

In [None]:
# results = dict()
# for label in word_probabilities.keys():
#     test = []
#     for i,j in zip(word_probabilities[label],y.toarray()[0]):
#         if i == 0:
#             test.append(1)
#         else:
#             test.append(i**j)
#     results[label] = priors[label]*math.prod(test)

In [None]:
# max_key = max(results, key=lambda k: results[k])

In [None]:
# class NaiveBayesTextClassifier:
#     def __init__(self):
#         self.vectorizer = CountVectorizer()
#         self.priors = {}
#         self.vocab_per_label_dict = {}
#         self.total_per_label = {}
#         self.word_probabilities = {}
    
#     def fit(self, train_texts, train_labels):
#         X = self.vectorizer.fit_transform(train_texts)
#         self.feature_names = self.vectorizer.get_feature_names_out()
        
#         label_counts = Counter(train_labels)
#         self.priors = {label: count / len(train_labels) for label, count in label_counts.items()}
        
#         for label, vector in zip(train_labels, X.toarray()):
#             if label in self.vocab_per_label_dict:
#                 self.vocab_per_label_dict[label] += vector
#             else:
#                 self.vocab_per_label_dict[label] = vector
        
#         self.total_per_label = {label: np.sum(vector) for label, vector in self.vocab_per_label_dict.items()}
        
#         self.word_probabilities = {}
#         for label, vector in self.vocab_per_label_dict.items():
#             word_prob_list = [count / self.total_per_label[label] if count != 0 else 0 for count in vector]
#             self.word_probabilities[label] = np.array(word_prob_list)
    
#     def predict(self, test_texts):
#             # Vectorize the test data
#             X_test = self.vectorizer.transform(test_texts)
#             predictions = []
            
#             # Iterate over each test document
#             for test_vector in X_test.toarray():
#                 results = {}
                
#                 # For each label, compute the likelihood for the current test document
#                 for label in self.word_probabilities.keys():
#                     class_probabilities = []
#                     for i, word_count in enumerate(test_vector):
#                         word_prob = self.word_probabilities[label][i]
#                         if word_prob == 0:
#                             class_probabilities.append(1)
#                         else:
#                             class_probabilities.append(word_prob ** word_count)
                    
#                     # Multiply all word probabilities and store the result for the label
#                     results[label] = math.prod(class_probabilities)
                
#                 # Select the label with the highest likelihood
#                 max_key = max(results, key=lambda k: results[k])
#                 predictions.append(max_key)
            
#             return predictions

In [None]:
# classifier = NaiveBayesTextClassifier()
# classifier.fit(train['text'], train['label'])

In [None]:
# prediction = classifier.predict(test['text'])

In [None]:
# prediction

In [None]:
# import math
# import numpy as np
# from collections import Counter
# from sklearn.feature_extraction.text import CountVectorizer
# from functools import reduce
# import operator

# class NaiveBayesTextClassifier:
#     def __init__(self):
#         self.vectorizer = CountVectorizer()
#         self.priors = {}
#         self.vocab_per_label_dict = {}
#         self.total_per_label = {}
#         self.word_probabilities = {}
    
#     def fit(self, train_texts, train_labels):
#         X = self.vectorizer.fit_transform(train_texts)
#         self.feature_names = self.vectorizer.get_feature_names_out()
        
#         label_counts = Counter(train_labels)
#         self.priors = {label: count / len(train_labels) for label, count in label_counts.items()}
        
#         for label, vector in zip(train_labels, X.toarray()):
#             if label in self.vocab_per_label_dict:
#                 self.vocab_per_label_dict[label] += vector
#             else:
#                 self.vocab_per_label_dict[label] = vector
        
#         self.total_per_label = {label: np.sum(vector) for label, vector in self.vocab_per_label_dict.items()}
        
#         self.word_probabilities = {}
#         for label, vector in self.vocab_per_label_dict.items():
#             word_prob_list = [(count + 1) / (self.total_per_label[label] + len(self.feature_names)) 
#                               for count in vector]  # Laplace Smoothing

#             # word_prob_list = [count / self.total_per_label[label] if count != 0 else 0 for count in vector]

#             self.word_probabilities[label] = np.array(word_prob_list)
    
#     def predict(self, test_texts):
#         X_test = self.vectorizer.transform(test_texts)
#         predictions = []
        
#         for test_vector in X_test.toarray():
#             results = {}
            
#             for label in self.word_probabilities.keys():
#                 class_probabilities = []
#                 for i, word_count in enumerate(test_vector):
#                     word_prob = self.word_probabilities[label][i]
#                     if word_prob > 0:
#                         class_probabilities.append(word_count * math.log(word_prob))                
#                 results[label] = math.log(self.priors[label]) + sum(class_probabilities)
            
#             max_key = max(results, key=lambda k: results[k])
#             predictions.append(max_key)
        
#         return predictions


In [None]:
# classifier = NaiveBayesTextClassifier()
# classifier.fit(train['text'], train['label'])

In [None]:
# prediction = classifier.predict(test['text'])

In [None]:
# prediction