In [2]:
import json
import re
import requests
from bs4 import BeautifulSoup as bs
from fake_useragent import UserAgent
import nltk
from nltk.corpus import stopwords
import spacy
from collections import Counter
import time
import math
from string import punctuation

ua = UserAgent()
headers = {'User-Agent': ua.random}
stops = stopwords.words('english')
nlp = spacy.load('en_core_web_sm', disable=['ner'])


parser part

In [4]:
reviews = {}

def write_in_dict(url) -> None:
    headers_new = {'User-Agent': ua.random}

    while True:
        try:
            page_new = requests.get(url, headers=headers_new)
            break
        except requests.exceptions.ConnectionError:  # server is disconnecting me for a safety purposes
            print("Server is unreachable, one more try in 5 seconds.")  # but we'll try to sneak in
            time.sleep(5)

    soup_new = bs(page_new.text, "html.parser")
    for i in soup_new.findAll('div', class_='review_area'):
        if i is not None:
            text = i.text
            rate = re.findall(r"Score: .*\s", text)
            rev = re.findall(r"\d\d\d\d\s*(.*)", text)
            if rate:
                reviews[rev[0].strip()] = rate[0].strip()


def get_films(url, num_pages=0) -> None:
    page = requests.get(url, headers=headers)
    soup = bs(page.text, "html.parser")

    for i in soup.findAll('a'):
        link = i.get('href')
        if link is not None and link[:2] == '/m':
            # print(link)
            write_in_dict(f"https://www.rottentomatoes.com{link}/reviews")

In [57]:
print("Gettin' good rated films")
get_films("https://www.rottentomatoes.com/browse/movies_at_home/audience:upright?page=100")
print("Gettin' bad rated films")
get_films('https://www.rottentomatoes.com/browse/movies_at_home/audience:spilled~critics:rotten?page=200')

with open('reviews_raw.json', 'w', encoding="utf-8") as f:
    f.write(json.dumps(reviews, ensure_ascii=False, indent=4))

Gettin' good rated films
Gettin' bad rated films


preprocessing part

In [32]:
def normalize_rate(rate: str) -> int:
    """
    :param rate: raw rate (thing starts with "Score: ...")
    :return: 0 if rate is bad, 1 if rate is good
    """

    rate = rate.strip("Score:").replace(' ', '').strip('-').strip('+').replace('stars', '').replace('3.5.5', '3.5').replace('of', '/')
    try:
        letter_rates = {'F': 0,
                        'E': 0,
                        'D': 0,
                        'C': 0,  # could be fine-tuned
                        'B': 1,
                        'A': 1}

        if '/' in rate:  # in case if rating has digits in it
            rate = int(round(float(rate.split('/')[0]) / float(rate.split('/')[1])))
            if rate > 1:
                return 1
            return rate
        elif rate.upper() in letter_rates:  # in case if rating letter-based
            return letter_rates[rate.upper()]
        else:
            return int(round(float(rate) / 10.))
    except ValueError:
        print(rate) # bruh, completely weird rating
        return 0

In [25]:
def lemmatize_sentence(sentence: str) -> str:
    # sent = ' '.join(nltk.word_tokenize(sentence))
    doc = nlp(sentence)
    sent = [token.lemma_.lower() for token in doc]
    sent = ' '.join([w for w in sent if w not in punctuation and w not in stops])
    sent = pos_filter(sent, allowed=['ADJ', 'NOUN', 'VERB'])
    return sent

In [26]:
def pos_filter(sentence: str, allowed: list) -> str:
    doc = nlp(sentence)
    return ' '.join([token.text for token in doc if token.pos_ in allowed])

In [42]:
def compute_tf(text) -> Counter:
    tf_text = Counter(text)
    for i in tf_text:
        tf_text[i] = tf_text[i]/float(len(text))
    return tf_text

def compute_idf(word, corpus) -> float:
    return math.log10(len(corpus)/sum([1.0 for i in corpus if word in i]))

def tf_idf(corpus) -> list:
    documents = []    
    for text in corpus:
        tf_idf_words = {}
        computed_tf = compute_tf(text)

        for word in computed_tf:
            tf_idf_words[word] = computed_tf[word] * compute_idf(word, corpus)
        
        documents.append(Counter(tf_idf_words).most_common(5))
    
    return documents

actual working code

In [58]:
with open('reviews_raw.json', 'r', encoding="utf-8") as f,\
        open('reviews_normalized.json', 'w', encoding="utf-8") as n:
    reviews = json.load(f)
    goods = 0
    new_revs = {}
    for r in reviews:
        old_rate = reviews[r]
        lemm_sent = lemmatize_sentence(r)
        goods += normalize_rate(old_rate)
        new_revs[lemm_sent] = normalize_rate(old_rate)

    n.write(json.dumps(new_revs, ensure_ascii=False, indent=4))

    print(f"Reviews: {len(new_revs)}")
    print(f"Good rates: {goods}, bad rates: {len(new_revs) - goods}")

    bad_rates = []
    good_rates = []

    for i in new_revs:
        if new_revs[i] == 0:
            bad_rates.append(nltk.word_tokenize(i))
        else:
            good_rates.append(nltk.word_tokenize(i))

B-plus
B-plus
BigScreenWatch
BigScreenWatch
ReadABook
ReadABook
Reviews: 4504
Good rates: 2768, bad rates: 1736


In [59]:
bad_words = set()
for sentence in tf_idf(bad_rates):
    for word in sentence:
        bad_words.add(str(word[0]))

good_words = set()
for sentence in tf_idf(good_rates):
    for word in sentence:
        good_words.add(str(word[0]))

Получаем слова находящиеся только в одном списке

In [96]:
b = bad_words.difference(good_words)
g = good_words.difference(bad_words)

In [92]:
def check_sentence(sent: str) -> str:
    sent = lemmatize_sentence(sent)
    sent = set(nltk.word_tokenize(sent))
    if len(sent.intersection(g)) > len(sent.intersection(b)):  # just check words from which set are more often
        return "Positive"
    else:
        return "Negative"

In [94]:
test_batch = [
    ("I thought this film brought a new concept to screen, wild turns and dark themed. Loved it.", "Positive"),
    ("My god this movie is so bad and boring and people say this movie is great your wrong is a terrible action movie and it totally rip off 1999 action hit film The Matrix.", "Negative"),
    ("I thought torture was outlawed by the Geneva Convention in 1949 but Ballistic: Ecks vs. Sever proves that atrocities like these are far from over.", "Negative"),
    ("What do you mean? I loved it!", "Positive"),
    ("Very dissapointing held together by a decent cast.", "Negative"),
    ("The entire movie is dumb. The long, boring, unnecessary intro is big foreshadowing of what's to come: more boring, unnecessarily long scenes filled with a whole lot of nothingness. The end is dumb, too.", "Negative"),
    ("Windfall is being touted as a pedestrian thriller when the attention rightfully belongs to its astute satire and triangular approach to social perspective.", "Positive")
    ]


In [95]:
for i in test_batch:
    print(f"Prediction: {check_sentence(i[0])}  Correct: {i[1]}")

Prediction: Negative  Correct: Positive
Prediction: Negative  Correct: Negative
Prediction: Positive  Correct: Negative
Prediction: Negative  Correct: Positive
Prediction: Negative  Correct: Negative
Prediction: Negative  Correct: Negative
Prediction: Positive  Correct: Positive


Результаты не очень впечатляют. Мои предложения по улучшению:
1. Возможно есть корреляция положительности/отрицательности отзыва и длинны этого отзыва (плохие фильмы больше ругают)
2. Искать биграммы вроде "terrible movie" или "great work" и проверять есть ли отрицание этой биграммы
3. Построить вектора для каждого слова и сравнивать общий вектор предложения с каким-то "общим негативным/позитивным" вектором