In [7]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import pandas as pd
from bs4 import BeautifulSoup
import requests
from lxml import html

pd.set_option('display.max_rows', None)

def ClearText(s):
    for elem in ['[', ']', ',', '.', '/', '\\', '"', "'", '!', '?', '”', '“', ':', ';', '--', '(', ')', '’', '…', '}', '{']:
        for _ in range(s.count(elem)):
            s = s.replace(elem, '')
    for _ in range(s.count(' - ')):
            s = s.replace(' - ', ' ')
    return s.lower()

def ConverScore(s):
    leters = "FDCBA";
    s = s.replace(" / ", "/")
    s = s.replace(" of ", "/")
    s = s.replace("'", "/")
    s = s.replace("'", "/")
    if s[0] in leters:
        return leters.find(s[0]) + 1
    elif s == "1-5 stars" or s == "Recommended": #Some extra cases
        return 5
    elif s.find('/') == -1:
        return round(float(s) / 2 + 0.01)
    elif s[-4:] == "/100":
        return round(float(s[:s.find('/')]) / 20 + 0.01)
    elif s[-3:] == "/20":
        return round(float(s[:s.find('/')]) / 4 + 0.01)
    elif s[-3:] == "/10":
        return round(float(s[:s.find('/')]) / 2 + 0.01)
    elif s[-2:] == "/4":
        return round(float(s[:s.find('/')]) * 1.25 + 0.01)
    else:
        return round(float(s[:s.find('/')]) + 0.01)

def GetDataFromTomato(cur_url):
    driver = webdriver.Chrome()
    driver.get(cur_url)
    wait = WebDriverWait(driver, 10)
    
    for i in range(100):
        try:
            load_more_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//*[@id='reviews']/div[2]/rt-button")))
            load_more_button.click()
            time.sleep(2)
        except:
            break
    
    review_elements = driver.find_elements(By.CLASS_NAME, "review-text-container")
    text_data = []
    score_data = []
    
    for review in review_elements:
        soup = str(BeautifulSoup(review.text, 'html.parser'))
        text = soup[:soup.find('\n')]
        score = soup[soup.find('|') + 18 :soup.find('|', soup.find('|') + 1) - 1]
        if (score != ""):
            text_data.append(ClearText(text))
            score_data.append(ConverScore(score))
    
    df = pd.DataFrame({
        "Score": score_data,
        "Text": text_data
    })
    
    return df

first_DF = GetDataFromTomato("https://www.rottentomatoes.com/m/star_wars_the_last_jedi/reviews")
second_DF = GetDataFromTomato("https://www.rottentomatoes.com/m/star_wars_the_rise_of_skywalker/reviews")
third_DF = GetDataFromTomato("https://www.rottentomatoes.com/m/star_wars_episode_vii_the_force_awakens/reviews")
fourth_DF = GetDataFromTomato("https://www.rottentomatoes.com/m/rogue_one_a_star_wars_story/reviews")

total_DF = pd.concat([first_DF, second_DF, third_DF, fourth_DF], ignore_index=True)
total_DF.to_csv('TextDF.csv')

total_DF.loc[:]

  soup = str(BeautifulSoup(review.text, 'html.parser'))


Unnamed: 0,Score,Text
0,4,star wars the last jedi delivers something tha...
1,4,the last jedi has some early pacing issues and...
2,3,johnsons film supplies the requisite spectacle...
3,5,star wars the last jedi was everything i hoped...
4,5,rian johnson has given us a star wars film tha...
5,5,as much into de-mythologizing the force as it ...
6,4,not all its risks pay off but its biggest wins...
7,4,the last jedi doesnt entirely detach from the ...
8,3,much of this picture involves space battles wh...
9,4,a film that will make you want to watch it aga...


In [79]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
import pandas as pd

#Coded all word in all reviews into numbers. Also collected total vocabulary of used words
def RreprocessText(dataframe):
    reviews = dataframe["Text"].tolist()
    scores = dataframe["Score"].tolist()
    
    #Create token vocabulary 
    all_tokens = [token for review in reviews for token in review.split()]
    vocab = {word: idx + 1 for idx, (word, _) in enumerate(Counter(all_tokens).items())}
    vocab['<PAD>'] = 0
    
    #Coded all text into number sequence 
    encoded_reviews = [
        torch.tensor([vocab[token] for token in review.split()])
        for review in reviews
    ]
    return encoded_reviews, scores, vocab

#Conver data into useable format for DataLoader
class ReviewDataset:
    def __init__(self, encoded_reviews, scores):
        self.encoded_reviews = encoded_reviews
        self.scores = scores
    def __len__(self):
        return len(self.encoded_reviews)
    def __getitem__(self, idx):
        return self.encoded_reviews[idx], torch.tensor(self.scores[idx] - 1)  # Сдвиг рейтинга для индексации

#Split all DataFrame to train and test parts (80%/20%)
df = pd.read_csv("TextDF.csv")
train_data, test_data = train_test_split(df, test_size=0.2, random_state=44)
train_reviews, train_scores, vocab = preprocess_text(train_data)
test_reviews, test_scores, _ = RreprocessText(test_data)
train_dataset = ReviewDataset(encoded_reviews = train_reviews, scores = train_scores)
test_dataset = ReviewDataset(encoded_reviews = test_reviews, scores = test_scores)

#Forming batch with same length for DataLoader
def collate_fn(batch):
    reviews, ratings = zip(*batch)
    padded_reviews = pad_sequence(reviews, batch_first=True, padding_value=vocab['<PAD>'])
    return padded_reviews, torch.tensor(ratings)

train_loader = DataLoader(train_dataset, batch_size=128, collate_fn=collate_fn, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, collate_fn=collate_fn)

#Create model by parent nn.Model from torch
class PredictModel(nn.Module):
    #Creation model with base params for all steps
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(PredictModel, self).__init__() #Parent constructor
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.batchnorm = nn.BatchNorm1d(hidden_dim)
        self.dropout = nn.Dropout(0.1)
        self.fc = nn.Linear(hidden_dim, output_dim)

    #Function for comfortable swaping steps in sequence
    def forward(self, x):
        x = self.embedding(x)
        _, (hidden, _) = self.lstm(x)  # Get last hidden layer
        hidden = hidden[-1]
        hidden = self.dropout(hidden)
        hidden = self.batchnorm(hidden)
        output = self.fc(hidden)
        return output

#Params for model
vocab_size = len(vocab)
embed_dim = 100
hidden_dim = 8
output_dim = 5

model = PredictModel(vocab_size, embed_dim, hidden_dim, output_dim)

#Loss function
criterion = nn.CrossEntropyLoss()

#Optimizer for overfitting based on gradient modification
optimizer = optim.Adam(model.parameters(), lr=0.001)

#Train model by bacths from loader with using optimizer and loss CrossEntropy functions for some epochs number
def Trainmodel(model, loader, criterion, optimizer, epochs=10):
    for epoch in range(epochs):
        model.train() #Swicth state model to train
        total_loss = 0
        for inputs, targets in loader:
            optimizer.zero_grad() #Clear old gradient
            outputs = model(inputs) #Count results
            loss = criterion(outputs, targets) #Count model loss
            loss.backward() #Count loss function gradient
            optimizer.step() #Update model weights
            total_loss += loss.item() #Count total loss

def Evaluatemodel(model, loader, dataset_type="Test"):
    model.eval() #Swicth state model to eval
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in loader:
            outputs = model(inputs) #Count results
            _, predicted = torch.max(outputs, 1) #Get class with highest probability
            total += targets.size(0) 
            correct += (predicted == targets).sum().item() #Compair model answers with correct answers
    accuracy = correct / total
    print(f"{dataset_type} Accuracy: {accuracy:.2%} ({accuracy})")
    return accuracy

#Check results
Trainmodel(model, train_loader, criterion, optimizer)
train_accuracy = Evaluatemodel(model, train_loader, dataset_type="Train")
test_accuracy = Evaluatemodel(model, test_loader, dataset_type="Test")

Train Accuracy: 51.26% (0.5125968992248062)
Test Accuracy: 52.33% (0.5232558139534884)
