<a href="https://colab.research.google.com/github/felladib/H_SentimentAnalysis_REC/blob/main/SA_modul.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras import metrics
from keras.layers import Embedding, Multiply, Dense, Dot, Conv2D, Input, Flatten
from keras.layers import concatenate
from keras.models import Model

In [None]:
!pip install flair

In [None]:
from sklearn.preprocessing import LabelBinarizer
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import nltk
from nltk.corpus import stopwords
from string import punctuation
from flair.data import Sentence
import pickle
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

# **collection de données**

**SA_data**

In [None]:
#ouvrir le dataset
data = pd.read_csv('/content/drive/MyDrive/IMDB/IMDB_Dataset.csv')
#ouvrir le fichier du vocab
with  open('/content/drive/MyDrive/IMDB/IMDB_vocab',"rb") as file:
      tokens = pickle.load(file)
#ouvrir le fichier de la matrice d'embeddings
with  open('/content/drive/MyDrive/IMDB/IMDB_embd',"rb") as file:
      mat = pickle.load(file)

# **preprocessing**

In [None]:
def clean_doc(doc):#fonction pour nettoyer les commentaires
    # split into tokens by white space
    tokens = doc.split()
    # remove punctuation from each token
    table = str.maketrans('', '', punctuation)
    tokens = [w.translate(table) for w in tokens]
    # remove remaining tokens that are not alphabetic
    tokens = [word for word in tokens if word.isalpha()]
    # filter out stop words
    stop_words = set(stopwords.words('english'))
    tokens = [w for w in tokens if not w in stop_words]
    # filter out short tokens
    tokens = [word for word in tokens if len(word) > 1]
    tokens = [word.lower() for word in tokens ]
    return tokens

In [None]:
#nettoyer les reviews dans le dataset
data['review']=data['review'].apply(clean_doc)
#calculer le nombre de mot maximum dans une review
MAX_SEQUENCE_LENGTH=max(data.applymap(lambda x: len(x)).max())
MAX_SEQUENCE_LENGTH

1480

In [None]:
#transformer data['sentiment'] en 0 et 1 ==> 0 : négative / 1 : positive
lb=LabelBinarizer()
sentiment_data=lb.fit_transform(data['sentiment'])
sentiment_data

array([[1],
       [1],
       [1],
       ...,
       [0],
       [0],
       [0]])

In [None]:
#Découper le dataset en training set et test set
#train set (40000 row)
X_train_rev=data.review[:40000]
y_train_rev=sentiment_data[:40000]
#test_set (10000 rows)
X_test_rev=data.review[40000:45000]
y_test_rev=sentiment_data[40000:45000]
#valid_set (10000 rows)
X_val_rev=data.review[45000:]
y_val_rev=sentiment_data[45000:]

In [None]:
token = Tokenizer()
token.word_index = tokens

pad=token.texts_to_sequences(X_train_rev)
#token.texts_to_sequences(train_reviews) method is called to convert each review in the train_reviews list
#into a sequence of integers using the word_index property of the token object.

X_train_rev= pad_sequences(pad, maxlen=1480, padding='post')
#pad_sequences(pad, maxlen=1480, padding='post') function is called to pad the sequences in pad variable
# to a maximum length of 1480. This is done to ensure that all sequences have the same length.

pad=token.texts_to_sequences(X_test_rev)
X_test_rev= pad_sequences(pad, maxlen=1480, padding='post')

pad=token.texts_to_sequences(X_val_rev)
X_val_rev= pad_sequences(pad, maxlen=1480, padding='post')
#entrainement des modèles
length=1480
vocab_size=len(tokens)+1
# model=define_lstm(length,vocab_size,mat,16)
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['mae','accuracy'])

In [None]:
X_train_rev

array([[    1,     2,     3, ...,     0,     0,     0],
       [  150,   151,   152, ...,     0,     0,     0],
       [  221,   150,   222, ...,     0,     0,     0],
       ...,
       [   31,    27,  3868, ...,     0,     0,     0],
       [   15,    91,   302, ...,     0,     0,     0],
       [10621,  6386, 23136, ...,     0,     0,     0]], dtype=int32)

# **module d'analyse de sentiment**

In [None]:
#Metrics
from keras import metrics
rmse = metrics.RootMeanSquaredError()
precision = metrics.Precision()

## **classe LSTM**

In [None]:
class MyLSTM(tf.keras.Model):
    def __init__(self,units,vocab_size,mat,l1=0,l2=0 , droup_val=0):
        super(MyLSTM, self).__init__()
        self.embedding           = Embedding(vocab_size, 768,weights=[mat], trainable=True)
        self.dropout1            = tf.keras.layers.Dropout(0.5 , name='dropout')
        self.lstm                = tf.keras.layers.LSTM(units,
                                                        return_sequences=True,
                                                        #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                        name='lstm1')
        self.attention           = tf.keras.layers.Attention()
        self.dropout             = tf.keras.layers.Dropout(droup_val , name='dropout')
        self.flatten             = tf.keras.layers.Flatten(name='flatten')
        self.dense1              = tf.keras.layers.Dense(128,
                                                         #kernel_regularizer=tf.keras.regularizers.l2(l2=l2),
                                                         activation='tanh', name='dense1')
        self.dense3              = tf.keras.layers.Dense(1 , activation = 'sigmoid' , name ='output')

    def call(self, inputs):
        x = self.embedding(inputs)
        x = self.dropout1(x)
        x = self.lstm(x)
        x = self.attention([x , x])
        x = self.dropout(x)
        x = self.flatten(x)
        x = self.dense1(x)
        return self.dense3(x)



**entrainement LSTM**

In [None]:
units = 64
BATCH_SIZE = 8
EPOCHS = 20
droup_val =0.3

In [None]:
model_sa=MyLSTM(units,vocab_size,mat,droup_val)

In [None]:
model_sa.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss='binary_crossentropy', metrics=[precision,'mae', metrics.RootMeanSquaredError()])

In [None]:
history = model_sa.fit(X_train_rev, y_train_rev, epochs=EPOCHS, batch_size= BATCH_SIZE, validation_data=(X_val_rev, y_val_rev))

In [None]:
import matplotlib.pyplot as plt
loss_curve= history.history["loss"]
acc_curve = history.history["precision_1"]
rmse_curve= history.history["root_mean_squared_error"]
mae_curve = history.history["mae"]

loss_val = history.history["val_loss"]
acc_val  = history.history["val_precision_1"]
rmse_val = history.history["val_root_mean_squared_error"]
mae_val  = history.history["val_mae"]

# ploter loss function
def ploter(title , curve , valid):
  plt.plot(curve , label = "train")
  plt.plot(valid , label = "validation")
  plt.ylim(0, 1)
  plt.legend(loc='upper left')
  plt.title(title)
  plt.show()

ploter('loss' , loss_curve , loss_val)
ploter('rmse' , rmse_curve , rmse_val)
ploter('mae' , mae_curve , mae_val)
ploter('acc' , acc_curve , acc_val)


**test LSTM**

In [None]:
score = model_sa.evaluate(X_test_rev , y_test_rev)

In [None]:
score

## **classe BiLstm**

In [None]:
class MyBiLstm(tf.keras.Model):
        def __init__(self,units,vocab_size,mat,l1=0,l2=0 , droup_val=0):
            super(MyBiLstm, self).__init__()
            self.embedding           = Embedding(vocab_size, 768,weights=[mat], trainable=True)
            self.dropout1            = tf.keras.layers.Dropout(0.5 , name='dropout')
            self.bilstm              = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(int(units/2),
                                                                    return_sequences=True,
                                                                    # input_shape=(None , 128 , 768),
                                                                    #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                                    name='bilstm1'))
            self.attention           = tf.keras.layers.Attention()
            self.dropout             = tf.keras.layers.Dropout(droup_val)
            self.flatten             = tf.keras.layers.Flatten()
            self.dense1              = tf.keras.layers.Dense(128,
                                                             #kernel_regularizer=tf.keras.regularizers.l2(l2=l2),
                                                             activation='tanh')
            self.dense2              = tf.keras.layers.Dense(1 , activation = 'sigmoid' , name ='output')


        def call(self, inputs):
            x = self.embedding(inputs)
            x = self.dropout1(x)
            x = self.bilstm(x)
            x = self.attention([x , x])
            x = self.dropout(x)
            x = self.flatten(x)
            x = self.dense1(x)
            return self.dense2(x)


**Entrainement BiLSTM**

In [None]:
units = 64
BATCH_SIZE = 8
EPOCHS = 20
droup_val =0.4

In [None]:
model_sa=MyBiLstm(units,vocab_size,mat, droup_val)

In [None]:
model_sa.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5), loss='binary_crossentropy', metrics=[precision,'mae', metrics.RootMeanSquaredError()])

In [None]:
history = model_sa.fit(X_train_rev, y_train_rev, epochs=EPOCHS, batch_size= BATCH_SIZE, validation_data=(X_val_rev, y_val_rev))

**test BiLstm**

In [None]:
scores= model_sa.evaluate(X_test_rev,y_test_rev,verbose=0)

In [None]:
scores

In [None]:
import matplotlib.pyplot as plt
loss_curve= history.history["loss"]
acc_curve = history.history["precision"]
rmse_curve= history.history["root_mean_squared_error"]
mae_curve = history.history["mae"]

loss_val = history.history["val_loss"]
acc_val  = history.history["val_precision"]
rmse_val = history.history["val_root_mean_squared_error"]
mae_val  = history.history["val_mae"]

# ploter loss function
def ploter(title , curve , valid):
  plt.plot(curve , label = "train")
  plt.plot(valid , label = "validation")
  plt.legend(loc='upper left')
  plt.title(title)
  plt.show()

ploter('loss' , loss_curve , loss_val)
ploter('rmse' , rmse_curve , rmse_val)
ploter('mae' , mae_curve , mae_val)
ploter('acc' , acc_curve , acc_val)



## **classe BiLstmCnn**

In [None]:
class MyBiLstmCnn(tf.keras.Model):
        def __init__(self,units,vocab_size,mat,l1=0,l2=0 , droup_val=0):
            super(MyBiLstmCnn, self).__init__()
            self.embedding       = Embedding(vocab_size, 768,weights=[mat], trainable=True)
            self.dropout1        = tf.keras.layers.Dropout(0.5)
            self.bilstm          = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(int(units/2),
                                                                    return_sequences=True,
                                                                    # input_shape=(None , 128 , 768),
                                                                    # kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                                    name='bilstm1'))
            self.cnn              = tf.keras.layers.Conv1D(64,3,activation="relu",padding="valid")
            self.attention        = tf.keras.layers.Attention()
            self.dropout2         = tf.keras.layers.Dropout(droup_val)
            self.flatten          = tf.keras.layers.Flatten()
            self.dense1           = tf.keras.layers.Dense(128,activation='tanh')
            self.dense3           = tf.keras.layers.Dense(1 , activation = 'sigmoid' , name ='output')


        def call(self, inputs):
            x = self.embedding(inputs)
            x = self.dropout1(x)
            x = self.bilstm(x)
            x = self.cnn(x)
            x = self.attention([x , x])
            x = self.dropout2(x)
            x = self.flatten(x)
            x = self.dense1(x)
            return self.dense3(x)


**TEST Bilstm cnn**

In [None]:
units = 64
droup_val = 0.4
EPOCH = 10
BATCH_SIZE = 8

In [None]:
model_sa=MyBiLstmCnn(units,vocab_size,mat, droup_val)

In [None]:
model_sa.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
                 loss='binary_crossentropy',
                 metrics=[precision,'mae', metrics.RootMeanSquaredError()])

In [None]:
history = model_sa.fit(X_train_rev, y_train_rev, epochs=EPOCH, batch_size= BATCH_SIZE, validation_data=(X_val_rev, y_val_rev))

In [None]:
import matplotlib.pyplot as plt
loss_curve= history.history["loss"]
acc_curve = history.history["precision"]
rmse_curve= history.history["root_mean_squared_error"]
mae_curve = history.history["mae"]

loss_val = history.history["val_loss"]
acc_val  = history.history["val_precision"]
rmse_val = history.history["val_root_mean_squared_error"]
mae_val  = history.history["val_mae"]

# ploter loss function
def ploter(title , curve , valid):
  plt.plot(curve , label = "train")
  plt.plot(valid , label = "validation")
  plt.legend(loc='upper left')
  plt.ylim(0,1)
  plt.title(title)
  plt.show()

ploter('loss' , loss_curve , loss_val)
ploter('rmse' , rmse_curve , rmse_val)
ploter('mae' , mae_curve , mae_val)
ploter('acc' , acc_curve , acc_val)

**test BiLstm_Cnn**

In [None]:
score = model_sa.evaluate(X_test_rev , y_test_rev)

In [None]:
score

## **classe BiLstmRnn**

In [None]:
class MyBiLstmRnn(tf.keras.Model):
        def __init__(self,units,vocab_size,mat,l1=0,l2=0 , droup_val=0):
            super(MyBiLstmRnn, self).__init__()
            self.embedding           = Embedding(vocab_size, 768,weights=[mat], trainable=True)
            self.dropout1            = tf.keras.layers.Dropout(0.5)
            self.bilstm              = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(int(units/2),
                                                                    return_sequences=True,
                                                                    #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                                    name='bilstm1'))

            self.rnn                 = tf.keras.layers.SimpleRNN(int(units/2),
                                                        return_sequences=True ,
                                                        #input_shape=(None , 128 , 768),
                                                        #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                        name = 'rnn1')
            self.attention           = tf.keras.layers.Attention()
            self.dropout             = tf.keras.layers.Dropout(droup_val)
            self.flatten             = tf.keras.layers.Flatten()
            self.dense1              = tf.keras.layers.Dense(128,
                                                             #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                             activation='tanh')
            self.dense3              = tf.keras.layers.Dense(1 , activation = 'sigmoid' , name ='output')


        def call(self, inputs):
            x = self.embedding(inputs)
            x = self.dropout1(x)
            x = self.bilstm(x)
            x = self.rnn(x)
            x = self.attention([x , x])
            x = self.dropout(x)
            x = self.flatten(x)
            x = self.dense1(x)
            return self.dense3(x)



In [None]:
units = 64
droup_val = 0.3
EPOCHS = 10
BATCH_SIZE = 8

In [None]:
model_sa=MyBiLstmRnn(units,vocab_size,mat, droup_val)

In [None]:
model_sa.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5), loss='binary_crossentropy', metrics=[precision,'mae', metrics.RootMeanSquaredError()])

In [None]:
history = model_sa.fit(X_train_rev, y_train_rev, epochs=EPOCHS, batch_size= BATCH_SIZE, validation_data=(X_val_rev, y_val_rev))

**test BiLstm_Rnn**

In [None]:
scores = model_sa.evaluate(X_test_rev , y_test_rev)

In [None]:
scores

In [None]:
import matplotlib.pyplot as plt
loss_curve= history.history["loss"]
acc_curve = history.history["precision"]
rmse_curve= history.history["root_mean_squared_error"]
mae_curve = history.history["mae"]

loss_val = history.history["val_loss"]
acc_val  = history.history["val_precision"]
rmse_val = history.history["val_root_mean_squared_error"]
mae_val  = history.history["val_mae"]

# ploter loss function
def ploter(title , curve , valid):
  plt.plot(curve , label = "train")
  plt.plot(valid , label = "validation")
  plt.ylim(0, 1)
  plt.legend(loc='upper left')
  plt.title(title)
  plt.show()

ploter('loss' , loss_curve , loss_val)
ploter('rmse' , rmse_curve , rmse_val)
ploter('mae' , mae_curve , mae_val)
ploter('acc' , acc_curve , acc_val)

## **classe LstmCnn**

In [None]:
class MyLstmCnn(tf.keras.Model):
    def __init__(self,units,vocab_size,mat,l1=0,l2=0 , droup_val=0):
        super(MyLstmCnn, self).__init__()
        self.embedding       = Embedding(vocab_size, 768,weights=[mat], trainable=True)
        self.dropout1        = tf.keras.layers.Dropout(0.5)
        self.lstm                = tf.keras.layers.LSTM(units,
                                               return_sequences=True,
                                              #  input_shape=(None , 128 , 768),
                                              #  kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                               dropout=0.2 ,
                                               name = 'lstm1')
        self.cnn                 = tf.keras.layers.Conv1D(64,3,activation="relu",padding="valid")
        self.attention           = tf.keras.layers.Attention()
        self.dropout2             = tf.keras.layers.Dropout((droup_val))
        self.flatten             = tf.keras.layers.Flatten()
        self.dense1              = tf.keras.layers.Dense(128,
                                                        #  kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                         activation='tanh')
        self.dense3              = tf.keras.layers.Dense(1 , activation = 'sigmoid' , name =' output')


    def call(self, inputs):
        x = self.embedding(inputs)
        x = self.dropout1(x)
        x = self.lstm(x)
        x = self.cnn (x)
        x = self.attention([x,x])
        x = self.dropout2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        return self.dense3(x)

**Entrainment LSTM CNN**

In [None]:
units = 64
droup_val = 0.3
EPOCH = 10
BATCH_SIZE = 8

In [None]:
model_sa=MyLstmCnn(units,vocab_size,mat,droup_val)


In [None]:
model_sa.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
                 loss='binary_crossentropy',
                 metrics=[precision,'mae', metrics.RootMeanSquaredError()])

In [None]:
history = model_sa.fit(X_train_rev,
                       y_train_rev,
                       epochs=EPOCH,
                       batch_size= BATCH_SIZE,
                       validation_data=(X_val_rev, y_val_rev))

In [None]:
import matplotlib.pyplot as plt
loss_curve= history.history["loss"]
acc_curve = history.history["precision"]
rmse_curve= history.history["root_mean_squared_error"]
mae_curve = history.history["mae"]

loss_val = history.history["val_loss"]
acc_val  = history.history["val_precision"]
rmse_val = history.history["val_root_mean_squared_error"]
mae_val  = history.history["val_mae"]

# ploter loss function
def ploter(title , curve , valid):
  plt.plot(curve , label = "train")
  plt.plot(valid , label = "validation")
  plt.legend(loc='upper left')
  plt.title(title)
  plt.show()

ploter('loss' , loss_curve , loss_val)
ploter('rmse' , rmse_curve , rmse_val)
ploter('mae' , mae_curve , mae_val)
ploter('acc' , acc_curve , acc_val)

**test Lstm_Cnn**

In [None]:
scores = model_sa.evaluate(X_test_rev , y_test_rev)

In [None]:
scores

## **classe LstmRnn**

In [None]:
class MyLstmRnn(tf.keras.Model):
    def __init__(self,units,vocab_size,mat,l1=0,l2=0 , droup_val=0):
        super(MyLstmRnn, self).__init__()
        self.embedding           = Embedding(vocab_size, 768,weights=[mat], trainable=True)
        self.dropout1            = tf.keras.layers.Dropout(0.5 , name='dropout')
        self.lstm                = tf.keras.layers.LSTM(units,
                                                      return_sequences=True,
                                                      #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                      #input_shape=(None ,128 , 768),
                                                      name='lstm1')

        self.rnn                 = tf.keras.layers.SimpleRNN(int(units/2),
                                                              return_sequences=True,
                                                              #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                              name = 'rnn1')
        self.attention           = tf.keras.layers.Attention()
        self.dropout             = tf.keras.layers.Dropout(droup_val)
        self.flatten             = tf.keras.layers.Flatten()
        self.dense1              = tf.keras.layers.Dense(128,
                                                         #kernel_regularizer=tf.keras.regularizers.L1L2(l1=l1, l2=l2),
                                                         activation='tanh')
        self.dense3              = tf.keras.layers.Dense(1 , activation = 'sigmoid' , name ='output')

    def call(self, inputs):
        x = self.embedding(inputs)
        x = self.dropout1(x)
        x = self.lstm(x)
        x = self.rnn(x)
        x = self.attention([x , x])
        x = self.dropout(x)
        x = self.flatten(x)
        x = self.dense1(x)
        return self.dense3(x)

**Entrainment LSTM RNN**

In [None]:
units = 64
BATCH_SIZE = 8
EPOCHS = 10
droup_val =0.3

In [None]:
model_sa=MyLstmRnn(units,vocab_size,mat,droup_val)

In [None]:
model_sa.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5), loss='binary_crossentropy', metrics=[precision,'mae', metrics.RootMeanSquaredError()])

In [None]:
history = model_sa.fit(X_train_rev, y_train_rev, epochs=EPOCHS, batch_size= BATCH_SIZE, validation_data=(X_val_rev, y_val_rev))

**test LSTM RNN**

In [None]:
score = model_sa.evaluate(X_test_rev , y_test_rev)

In [None]:
score

In [None]:
import matplotlib.pyplot as plt
loss_curve= history.history["loss"]
acc_curve = history.history["precision"]
rmse_curve= history.history["root_mean_squared_error"]
mae_curve = history.history["mae"]

loss_val = history.history["val_loss"]
acc_val  = history.history["val_precision"]
rmse_val = history.history["val_root_mean_squared_error"]
mae_val  = history.history["val_mae"]

# ploter loss function
def ploter(title , curve , valid):
  plt.plot(curve , label = "train")
  plt.plot(valid , label = "validation")
  plt.ylim(0, 1)
  plt.legend(loc='upper left')
  plt.title(title)
  plt.show()

ploter('loss' , loss_curve , loss_val)
ploter('rmse' , rmse_curve , rmse_val)
ploter('mae' , mae_curve , mae_val)
ploter('acc' , acc_curve , acc_val)


## ***save sentiment model***

In [None]:
model_sa.save('/content/drive/MyDrive/concat_modelCASA', save_format='tf')

In [None]:
dataset = pd.read_csv('/content/drive/MyDrive/Our_Datasets/Yelp/yelp_normalized_dataset(55738).csv')

In [None]:
dataset.head()

In [None]:
loaded_model_SA = tf.keras.models.load_model('/content/drive/MyDrive/concat_modelCASA')

In [None]:
sentiment = loaded_model_SA.predict(dataset['text'])

In [None]:
sentiment

In [None]:
dataset['sentiment']= sentiment

In [None]:
dataset = pd.read_csv('/content/drive/MyDrive/yelp_normalized_dataset(55738).csv')

In [None]:
import numpy as np
values , counts = np.unique( matrice_de_confiance , return_counts = True) #verification que la matrice a ete bien rempli
print(values , counts)