# Beer Review Classification with Hierarchical Self-Attention Networks

## 1. Setup and Dependencies

In [None]:
# Import necessary libraries
import torch
from scipy.sparse import csr_matrix
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from collections import OrderedDict
from torch.nn.modules.module import Module
from torch.utils.data import TensorDataset
import time
import numpy as np
import collections
import pickle
import argparse
from random import shuffle
import math
import numpy as np
import matplotlib.pyplot as plt
import json
import sys
import datetime
import string
import nltk
from nltk import word_tokenize, sent_tokenize
from sklearn.preprocessing import LabelEncoder, LabelBinarizer
from sklearn.model_selection import train_test_split
from operator import itemgetter
from torch.autograd import Variable
from zipfile import ZipFile

# Download NLTK tokenizer models
nltk.download('punkt')

# Device setup
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## 2. Data Loading and Preprocessing

In [None]:
# Define data directory
datadir = "/content/drive/MyDrive/ML CLASS/HW7/"

# Load the data
with ZipFile(datadir+'labeled.json.zip', 'r') as ZZ:
    for filename in ZZ.namelist():
        with ZZ.open(filename, 'r') as f:
            beer_reviews = json.load(f)

# Dictionary to store overall ratings for each beer and brewer
beer_ratings = {}
brewer_ratings = {}

# Extract overall ratings for each beer and brewer
for review in beer_reviews:
    beer_name = review['beer_name']
    brewer_name = review['brewer']
    overall_rating = review['overall']

    # Update beer ratings
    if beer_name not in beer_ratings:
        beer_ratings[beer_name] = []
    beer_ratings[beer_name].append(overall_rating)

    # Update brewer ratings
    if brewer_name not in brewer_ratings:
        brewer_ratings[brewer_name] = []
    brewer_ratings[brewer_name].append(overall_rating)

# Calculate statistics for beers
beer_stats = {}
for beer_name, ratings in beer_ratings.items():
    beer_stats[beer_name] = {
        'mean': np.mean(ratings),
        'median': np.median(ratings),
        'std_dev': np.std(ratings)
    }

# Calculate statistics for brewers
brewer_stats = {}
for brewer_name, ratings in brewer_ratings.items():
    brewer_stats[brewer_name] = {
        'mean': np.mean(ratings),
        'median': np.median(ratings),
        'std_dev': np.std(ratings)
    }

# Print statistics
print("Beer Statistics:")
for beer_name, stats in beer_stats.items():
    print(f"Beer: {beer_name}, Mean: {stats['mean']}, Median: {stats['median']}, Std Dev: {stats['std_dev']}")

print("\nBrewer Statistics:")
for brewer_name, stats in brewer_stats.items():
    print(f"Brewer: {brewer_name}, Mean: {stats['mean']}, Median: {stats['median']}, Std Dev: {stats['std_dev']}")

## 3. Prepare Vocabulary and Embeddings

In [None]:
def prep_vocab_emb():
    vocab = 'word2idx_small'
    with open(datadir+vocab+'.json', 'r') as f:
        w2i = json.load(f)
        num_words = len(w2i.keys())
        print('NUM WORDS', num_words)

    # Load pre-trained word embeddings
    word2vec = {}
    start = time.time()
    with ZipFile(datadir+'glove.6B.50d.txt.zip', 'r') as ZZ:
        for filename in ZZ.namelist():
            with ZZ.open(filename, 'r') as f:
                for i, line in enumerate(f):
                    values = line.split()
                    word = values[0]
                    vec = np.asarray(values[1:], dtype='float32')
                    word2vec[word] = vec

    # Prepare embedding matrix
    WordEmbeddings = np.zeros((num_words+1, 50))  # Index 0 will be zero
    for word, i in w2i.items():
        if word in word2vec:
            WordEmbeddings[i] = word2vec[word]

    return WordEmbeddings, w2i

## 4. Text Preprocessing Functions

In [None]:
def remove_punctuation(s):
    return s.translate(str.maketrans('', '', string.punctuation+"\n"))

def ConvertSentence2Word(s):
    return word_tokenize(remove_punctuation(s).lower())

def ConvertSent2Idx(s):
    s_temp = [w for w in ConvertSentence2Word(s) if w in w2i]
    temp = [w2i[w] for w in s_temp]
    return temp

def ConvertDoc2List(doc):
    temp_doc = sent_tokenize(doc)
    temp = [ConvertSent2Idx(sentence) for sentence in temp_doc if len(ConvertSent2Idx(sentence)) >= 1]
    return temp

def ConvertList2Array(docs):
    ms = len(docs)
    mw = len(max(docs, key=len))
    result = np.zeros((ms, mw))
    for i, line in enumerate(docs):
        for j, word in enumerate(line):
            result[i, j] = word
    return result

def data_to_array(X_t, Y_t):
    X_t_data = []
    Y_t_data = []
    p = len(w2i.keys())
    for i in range(len(X_t)):
        X_input = ConvertDoc2List(X_t[i])
        if len(X_input) < 1:
            continue
        X_input = torch.LongTensor(ConvertList2Array(X_input))
        Y_t_data.append(Y_t[i])
        X_t_data.append(X_input.to(device))
    Y_t_data = torch.tensor(np.array(Y_t_data).reshape((-1, 1))).type(torch.long).to(device)
    return X_t_data, Y_t_data

## 5. Load and Split Data

In [None]:
def load_data(num, corpus):
    if corpus == 'beer':
        with ZipFile(datadir+'labeled.json.zip', 'r') as ZZ:
            for filename in ZZ.namelist():
                with ZZ.open(filename, 'r') as f:
                    brv = json.load(f)

        X = []
        Y = []
        for i, b in enumerate(brv):
            if i < num:
                X.append(b['review'])
                v = b['overall']
                y = 0
                if v >= 14:
                    y = 1
                Y.append(y)
        del brv
    else:
        npz = np.load(datadir + 'yelp_review_small.npz', allow_pickle=True)
        data = npz['arr_0']
        X = data[:, 0]  # Text
        Y = data[:, 1]  # Label
        Y = Y - 1
        del data
    return X, Y

def get_data(X, Y):
    X, Y = data_to_array(X, Y)
    ii = np.int64(np.arange(0, len(X), 1))
    np.random.shuffle(ii)
    XX = [X[i] for i in ii]
    X = XX
    Y = Y[ii]
    num = len(X)
    nntr = np.int32(.8 * num)
    nnva = np.int32(.82 * num)
    X_train_data = X[0:nntr]
    y_train_data = Y[0:nntr]
    X_val_data = X[nntr:nnva]
    y_val_data = Y[nntr:nnva]
    X_test_data = X[nnva:num]
    y_test_data = Y[nnva:num]
    return X_train_data, y_train_data, X_val_data, y_val_data, X_test_data, y_test_data

## 6. Define the Models

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, d_model):
        super(SelfAttention, self).__init__()
        self.d_model = d_model
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)

    def forward(self, x):
        k = self.k_linear(x)
        q = self.q_linear(x)
        v = self.v_linear(x)
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_model)
        scores = F.softmax(scores, dim=-1)
        scores = torch.matmul(scores, v)
        return scores

class TargetAttention(Module):
    def __init__(self, input_dim, dropout_rate):
        super(TargetAttention, self).__init__()
        self.target = nn.Parameter(torch.empty((1, input_dim)))
        nn.init.kaiming_uniform_(self.target)
        self.input_dim = input_dim
        self.sq_input_dim = np.sqrt(input_dim)
        self.dropout = nn.Dropout(dropout_rate)

    def target_att(self, t, k, v):
        out = torch.matmul(t, k.permute(0, 2, 1)) / self.sq_input_dim
        sf = torch.softmax(out, 2)
        targ_att = torch.matmul(sf, v)
        return targ_att

    def forward(self, input):
        target = self.target.expand(input.size(0), -1)
        return self.target_att(target, input, input)

class ConvolutionalCELL(nn.Module):
    def __init__(self, d_model, filter_sizes, num_filters):
        super(ConvolutionalCELL, self).__init__()
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=d_model, out_channels=num_filters, kernel_size=fs)
            for fs in filter_sizes
        ])
        self.fc = nn.Linear(len(filter_sizes) * num_filters, d_model)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        conv_outs = [F.relu(conv(x)) for conv in self.convs]
        conv_outs = [F.max_pool1d(co, co.size(2)).squeeze(2) for co in conv_outs]
        x = torch.cat(conv_outs, 1)
        x = self.fc(x)
        return x

class HierarchicalSelfAttentionNetwork(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, filter_sizes, num_filters):
        super(HierarchicalSelfAttentionNetwork, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(WordEmbeddings)
        self.convolutions = ConvolutionalCELL(embedding_dim, filter_sizes, num_filters)
        self.attention = SelfAttention(hidden_dim)
        self.fc = nn.Linear(hidden_dim, 2)

    def forward(self, x):
        x = self.embedding(x)
        x = self.convolutions(x)
        x = self.attention(x)
        x = torch.mean(x, dim=1)
        x = self.fc(x)
        return x

## 7. Training and Evaluation

In [None]:
def train_model(model, X_train, y_train, X_val, y_val, epochs=10, lr=0.001):
    model.train()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        epoch_loss = 0
        correct = 0
        total = 0

        for i in range(len(X_train)):
            inputs = X_train[i]
            labels = y_train[i]

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_accuracy = correct / total
        print(f"Epoch {epoch+1}/{epochs} - Loss: {epoch_loss/len(X_train):.4f}, Accuracy: {train_accuracy:.4f}")

        if epoch % 2 == 0:
            evaluate_model(model, X_val, y_val)

def evaluate_model(model, X_val, y_val):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for i in range(len(X_val)):
            inputs = X_val[i]
            labels = y_val[i]
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    val_accuracy = correct / total
    print(f"Validation Accuracy: {val_accuracy:.4f}")

def test_model(model, X_test, y_test):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for i in range(len(X_test)):
            inputs = X_test[i]
            labels = y_test[i]
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    test_accuracy = correct / total
    print(f"Test Accuracy: {test_accuracy:.4f}")

# Load and prepare data
X, Y = load_data(num=10000, corpus='beer')
X_train, y_train, X_val, y_val, X_test, y_test = get_data(X, Y)

# Initialize and train the model
embedding_dim = 50
hidden_dim = 100
filter_sizes = [3, 4, 5]
num_filters = 100

model = HierarchicalSelfAttentionNetwork(embedding_dim, hidden_dim, filter_sizes, num_filters).to(device)
train_model(model, X_train, y_train, X_val, y_val, epochs=10, lr=0.001)
test_model(model, X_test, y_test)

## 8. Conclusion
The Hierarchical Self-Attention Network (HSAN) project demonstrates the integration of self-attention mechanisms and convolutional layers for sentiment analysis of beer reviews. The model was trained and evaluated, showing promising results in classifying reviews into positive or negative sentiments.