In [13]:
import tensorflow as tf
from keras.utils.data_utils import pad_sequences
from keras import layers
from keras.models import Sequential
from keras.layers import Conv1D, Flatten, Dense, MaxPooling1D, Input
import numpy as np
import json
import spacy
import string
nlp = spacy.load('en_core_web_sm')

limiter = 100000

def filter_sentence(sentence):
    try:
        doc = nlp(sentence)
        # Include unigrams, bigrams, and trigrams
        filtered_tokens = [token.text for token in doc if not token.is_stop and token.text not in string.punctuation and token.text.strip()]
        filtered_token = []
        filtered_token.extend([' '.join(gram) for gram in zip(filtered_tokens, filtered_tokens[1:])] + 
                              [' '.join(gram) for gram in zip(filtered_tokens, filtered_tokens[1:], filtered_tokens[2:])])
        return filtered_token
    except Exception as e:
        # Detailed logging
        print(f"Error processing sentence: {sentence}. Error: {str(e)}")
        raise  # Re-raise the exception for a more detailed traceback

def calculate_accuracy(Y_true, Y_preds):
    correct_predictions = 0
    total_samples = len(Y_true)

    for i in range(total_samples):
        true_class = np.argmax(Y_true[i])
        pred_class = np.argmax(Y_preds[i])

        if true_class == pred_class:
            correct_predictions += 1

    accuracy = correct_predictions / total_samples
    return accuracy

def train_test_split(X,Y):
    # Initialize empty lists for each category
    category_1_indices = []
    category_2_indices = []
    category_3_indices = []
   # Iterate through the indices and categorize them
    Y = np.array(Y)
    max_split_index = int(len(Y)*0.75)
    for i in range(max_split_index):
        val = Y[i]
        if val[0] ==  1:
            category_1_indices.append(i)
        elif val[1] == 1:
            category_2_indices.append(i)
        elif val[2] == 1:
            category_3_indices.append(i)
    
    neutral_len = len(category_2_indices)
    positive_len = len(category_3_indices)
    negative_len = len(category_1_indices)

    length_of_each = min(neutral_len, negative_len, positive_len)
    
    negative_indices = category_1_indices[:length_of_each]
    neutral_indices = category_2_indices[:length_of_each]
    positive_indices = category_3_indices[:length_of_each]
    X= np.array(X)
    mask = np.ones(len(X), dtype=bool)
    mask[negative_indices] = False
    mask[positive_indices] = False
    mask[neutral_indices] = False
    X_train = np.concatenate([X[negative_indices], X[neutral_indices], X[positive_indices]], axis = 0)
    Y_train = np.concatenate([Y[negative_indices], Y[neutral_indices], Y[positive_indices]], axis = 0)
    X_test = np.concatenate([X[mask], X[max_split_index:]], axis = 0)
    Y_test = np.concatenate([Y[mask], Y[max_split_index:]], axis = 0)
    return X_train.astype(np.float16), Y_train.astype(np.float16), X_test.astype(np.float16), Y_test.astype(np.float16)



# Define the CNN_Text model in TensorFlow
class CNN_Text(tf.keras.Model):
    def __init__(self, vocab_size, vector_size, n_filters, filter_sizes, output_dim, dropout, input_len):
        super(CNN_Text, self).__init__()

        # Create word embeddings from the input words
        self.embedding = layers.Embedding(input_dim=vocab_size, output_dim=vector_size, input_length=input_len, mask_zero=True)

        # Specify convolutions with filters of different sizes (fs)
        self.convs = [layers.Conv1D(filters=n_filters, kernel_size=(fs), activation='sigmoid') for fs in filter_sizes]

        # Add a fully connected layer for final predictions
        self.linear = layers.Dense(output_dim, activation = 'softmax')

        # Dropout layer
        self.dropout = layers.Dropout(dropout)

    def call(self, inputs):
        # Get word embeddings and format them for convolutions
        inputs = inputs[0]
        embedded = self.embedding(inputs)

        print('Embedding shape: {}'.format(embedded.shape))

        # Perform convolutions and apply activation functions
        conved = [conv(embedded) for conv in self.convs]

        # Pooling layer to reduce dimensionality
        pooled = [layers.Flatten()(tf.reduce_max(conv, axis=[1, 2])) for conv in conved]

        cat = self.dropout(tf.concat(pooled, axis=1))

        cat = self.linear(pooled)

        return cat

def get_vocab_size(corpus):
    flattened = [item for sublist in corpus for item in sublist]
    unique_nums = set(flattened)
    vocab_size = len(unique_nums)
    return vocab_size

dicts = []
with open('AMAZON_FASHION.json', 'r') as file:
    for line in file:
        dicts.append(json.loads(line))

ratings = []
Review_content = []

for dict in dicts:
    i = 0
    data = dict
    if 'reviewText' in data and 'overall' in data:
        ratings.append(data['overall'])
        Review_content.append(data['reviewText'])

Review_content = Review_content[:limiter]
ratings = ratings[:limiter]

filtered_sentences = [filter_sentence(sentence) for sentence in Review_content]

vocab_size = get_vocab_size(filtered_sentences)

In [None]:
from gensim.models import Word2Vec
vector_size = 10
word2vec_model = Word2Vec(sentences=filtered_sentences, vector_size=vector_size, window=10, min_count=1, workers=10)

X_sequences = []
for sentence in filtered_sentences:
    sentence_vectors = [word2vec_model.wv[word] for word in sentence]
    X_sequences.append(sentence_vectors)
max_len = 0
for i in X_sequences:
    if len(i) > max_len:
        max_len = len(i)
max_words = max_len  
X_padded = pad_sequences(X_sequences, maxlen=max_words, padding='post', dtype=object)
del X_sequences
ratings = np.array(ratings)
new_np_array = np.zeros((len(ratings), 3))
for i in range(len(ratings)):
    input_arr = np.zeros(3)
    val = ratings[i]
    if val == 1:
        input_arr[0] = 1
    elif val == 2:
        input_arr[0] = 1
    elif val == 3:
        input_arr[1] = 1
    elif val == 4:
        input_arr[2] = 1
    elif val == 5:
        input_arr[2] = 1
    else:
        print('error')
    new_np_array[i,:] = input_arr
Y = new_np_array

In [None]:
# Create an instance of the CNN_Text model
X_padded = np.transpose(X_padded, axes=[0,2,1])
x_train, y_train, x_test, y_test = train_test_split(X_padded, Y)

In [18]:
model = Sequential()
model.add(Input(shape=(vector_size, max_len)))
model.add(Conv1D(filters = 200 , kernel_size = 2, padding = 'same', activation = 'sigmoid'))
model.add(MaxPooling1D(2))
model.add(Conv1D(filters = 75 , kernel_size = 3, padding = 'same', activation = 'sigmoid'))
model.add(MaxPooling1D(2))
model.add(Conv1D(filters = 50 , kernel_size = 3, padding = 'same', activation = 'sigmoid'))
model.add(Flatten())
model.add(Dense(3, activation='softmax'))

optimizer = tf.keras.optimizers.Adam(learning_rate=1)
model.compile(loss = 'categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
model.fit(x_train, y_train,batch_size=16,validation_data=(x_test, y_test), epochs = 100)

# Save the model
model.save_weights('CNN-model.h5')



Model: "sequential_8"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_12 (Conv1D)          (None, 10, 200)           433400    
                                                                 
 max_pooling1d_8 (MaxPooling  (None, 5, 200)           0         
 1D)                                                             
                                                                 
 conv1d_13 (Conv1D)          (None, 5, 75)             45075     
                                                                 
 max_pooling1d_9 (MaxPooling  (None, 2, 75)            0         
 1D)                                                             
                                                                 
 conv1d_14 (Conv1D)          (None, 2, 50)             11300     
                                                                 
 flatten_4 (Flatten)         (None, 100)              