In [1]:
import numpy as np
import nltk
import json
import re
import string
from collections import defaultdict, Counter

In [2]:
K = 5
TRAIN_DATA = "reviews_Digital_Music_5.json/Music_Review_train.json"
TEST_DATA = "reviews_Digital_Music_5.json/Music_Review_test.json"

In [3]:
def extract_data(file: str, split):
	with open(file) as f:
		X = []
		Y = []
		for line in f:
			X.append(defaultdict(int))
			x = json.loads(line)
			Y.append(int(x['overall']))
			for word in split(x['reviewText']):
				X[-1][word] += 1
		return np.array(X), np.array(Y)

In [4]:
def train_model(X: np.ndarray, Y: np.ndarray):
	vocab = defaultdict(lambda: np.ones(K))
	Phi = np.zeros(K, np.float64)
	n = np.zeros(K, np.int32)
	for x, y in zip(X, Y):
		y -= 1
		Phi[y] += 1
		for word, count in x.items():
			vocab[word][y] += count
			n[y] += count
	n += len(vocab)
	for word in vocab:
		vocab[word] = np.log(vocab[word] / n)
	n = np.log(1 / (1 + n))
	return vocab, n, np.log(Phi / np.sum(Phi))

In [5]:
def predict_nb(vocab: defaultdict, n: np.ndarray, Phi: np.ndarray, x: defaultdict):
	pred = -1
	best = -float("inf")
	for y, phi in enumerate(Phi):
		prob = phi
		for word in x:
			if word in vocab:
				prob += vocab[word][y] * x[word]
			else:
				prob += n[y]
		if prob > best:
			best = prob
			pred = y + 1
	return pred

In [6]:
def accuracy(X: np.ndarray, Y: np.ndarray, prediction):
	return sum(prediction(X) == Y) / Y.shape[0]

In [7]:
def confusion_matrix(Y: np.ndarray, pred_Y: np.ndarray):
	confusion = np.zeros((K, K), np.int32)
	for y, pred_y in zip(Y, pred_Y):
		confusion[y - 1][pred_y - 1] += 1
	return confusion

In [8]:
def gen_train_test(split_fn):
	print("Extracting data...")
	training_data = extract_data(TRAIN_DATA, split_fn)
	test_data = extract_data(TEST_DATA, split_fn)
	print("Data extracted!")
	return training_data, test_data

In [9]:
def nb_util(training_data, test_data, output, extra=False):
	print("Training model...")
	vocab, n, Phi = train_model(*training_data)
	print("Model trained!\nMaking predictions and writing output to file...")
	naive_bayes = np.vectorize(lambda x: predict_nb(vocab, n, Phi, x))
	m = test_data[1].shape[0]
	training_pred = naive_bayes(training_data[0])
	test_pred = naive_bayes(test_data[0])
	with open(output, 'w+') as f:
		f.write("train_accuracy   = {}\n".format(accuracy(*training_data, lambda X: training_pred)))
		f.write("test_accuracy    = {}\n".format(accuracy(*test_data, lambda X: test_pred)))
		if extra:
			f.write("random_accuracy  = {}\n".format(accuracy(*test_data, lambda X: np.random.randint(1, 6, m))))
			f.write("mode_accuracy    = {}\n".format(accuracy(*test_data, lambda X: np.full(m, Counter(test_data[1]).most_common(1)[0][0]))))
			f.write("confusion_matrix (training) =\n{}\n".format(confusion_matrix(training_data[1], training_pred)))
		f.write("confusion_matrix (test) =\n{}".format(confusion_matrix(test_data[1], test_pred)))
	print("Output written!")

In [10]:
stop_words = nltk.corpus.stopwords.words('english')
porter = nltk.stem.PorterStemmer()
stemmed = dict()
def stem(word: str):
	if word not in stemmed:
		stemmed[word] = porter.stem(word)
	return stemmed[word]
def stem_split(s: str):
	raw = nltk.tokenize.word_tokenize(re.sub('[{}]'.format(string.punctuation), ' ', s))
	stop_and_stem = []
	for word in raw:
		if not word in stop_words:
			stop_and_stem.append(stem(word))
	return stop_and_stem

In [11]:
def bigram_split(s: str):
	stemmed = stem_split(s)
	ret = []
	for i in range(len(stemmed) - 1):
		ret.append(stemmed[i] + stemmed[i + 1])
	return ret

In [12]:
def bigram_split_alter(s: str):
	split_text = re.sub('[{}]'.format(string.punctuation), ' ', s).split()
	ret = []
	for i in range(len(split_text) - 1):
		ret.append(split_text[i] + split_text[i + 1])
	return ret

In [13]:
def trigram_split(s: str):
	stemmed = stem_split(s)
	ret = []
	for i in range(len(stemmed) - 2):
		ret.append(stemmed[i] + stemmed[i + 1] + stemmed[i + 2])
	return ret

In [14]:
training_default, test_default = gen_train_test(str.split)
nb_util(training_default, test_default, "output/default", True)

Extracting data...
Data extracted!
Training model...
Model trained!
Making predictions and writing output to file...
Output written!


In [15]:
training_clean, test_clean = gen_train_test(stem_split)
nb_util(training_clean, test_clean, "output/clean")

Extracting data...
Data extracted!
Training model...
Model trained!
Making predictions and writing output to file...
Output written!


In [16]:
training_bigram_clean, test__bigram_clean = gen_train_test(bigram_split)
nb_util(training_bigram_clean, test__bigram_clean, "output/bigram")


Extracting data...
Data extracted!
Training model...
Model trained!
Making predictions and writing output to file...
Output written!


In [17]:
training_bigram, test_bigram = gen_train_test(bigram_split_alter)
nb_util(training_bigram, test_bigram, "output/bigram_og")

Extracting data...
Data extracted!
Training model...
Model trained!
Making predictions and writing output to file...
Output written!


In [18]:
training_trigram_clean, test__trigram_clean = gen_train_test(trigram_split)
nb_util(training_trigram_clean, test__trigram_clean, "output/trigram")

Extracting data...
Data extracted!
Training model...
Model trained!
Making predictions and writing output to file...
Output written!
