In [1]:
from bs4 import BeautifulSoup
import requests
import pandas as pd
from tqdm import tqdm
from imblearn.under_sampling import RandomUnderSampler
from spellchecker import SpellChecker
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem.snowball import FrenchStemmer
from unidecode import unidecode
import json
import random
import ast
from sklearn.metrics import mean_squared_error, mean_absolute_error
from transformers import pipeline

  from .autonotebook import tqdm as notebook_tqdm





In [2]:
# get all compagny url
page = 1
result_len = 0
compagnies_url = []
while True:
    url = f"https://fr.trustpilot.com/categories/car_dealer?page={page}"
    response = requests.get(url)
    web_page = response.text
    soup = BeautifulSoup(web_page, "html.parser")

    resp = soup.select("a[data-business-unit-card-link]")
    for res in resp:
        url = res["href"].replace("/review/", "")
        compagnies_url.append(url)
    if result_len == 0:
        result_len = len(resp)
    elif result_len != len(resp):
        break
    print(page)
    page += 1

1
2
3


In [3]:
def compagny_url(compagny_url):
    # Initialize lists
    review_titles = []
    review_ratings = []
    review_texts = []
    review_locations = []
    page_number = []

    # Set Trustpilot page numbers to scrape here
    from_page = 1
    to_page = 500

    for i in range(from_page, to_page + 1):
        response = requests.get(f"https://fr.trustpilot.com/review/{compagny_url}?page={i}")
        web_page = response.text
        soup = BeautifulSoup(web_page, "html.parser")

        if soup.find_all("article") == []:
            break

        for review in soup.find_all("article"):
            # Review titles
            review_title = review.select_one("a[data-review-title-typography]")
            if review_title == None:
                review_titles.append("")
            else:
                review_titles.append(review_title.getText())

            # Review text
            review_text = review.select_one("p[data-service-review-text-typography]")
            if review_text == None:
                review_texts.append("")
            else:
                review_texts.append(review_text.getText())

            # Review ratings
            review_rating = review.select_one("div[data-service-review-rating]")
            review_ratings.append(review_rating["data-service-review-rating"])
                
            review_location = review.select_one("div[data-consumer-country-typography]")
            if review_location == None:
                review_locations.append("")
            else:
                review_locations.append(review_location.getText())
            # Trustpilot page number
            page_number.append(i)

    # Create final dataframe from lists
    return pd.DataFrame(list(zip([compagny_url for i in range(len(review_titles))], review_titles, review_ratings, review_texts, review_locations)),
                    columns =['compagny', 'review_title', 'review_rating', 'review_text', 'review_location'])

In [4]:
df = pd.DataFrame()
for compagny in tqdm(compagnies_url):
    result = compagny_url(compagny)
    df = pd.concat([df, result])

df.to_csv("reviews.csv", index=False)

  0%|          | 0/79 [00:00<?, ?it/s]

100%|██████████| 79/79 [09:13<00:00,  7.01s/it] 


# Init functions

In [14]:
try:
    useless_words = pd.read_csv("most_frequent_words_mixed.csv", header=None)[0].tolist()[:100]
except:
    useless_words = []

In [15]:
STEMMER = FrenchStemmer()
spell = SpellChecker(language='fr')
pipe = pipeline("text-classification", model="tblard/tf-allocine")

def preprocess_text(text):
    # Suppression des accents
    text = unidecode(text)
    # Suppression du code HTML
    text = re.sub(re.compile("<.*?>"), "", text)
    text = re.sub(r'[^a-zA-Z0-9/s]', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    # Suppresssion des nombres
    text = re.sub(r'[0-9]+', ' ', text)
    # Supprimer les lignes vides
    text = text.split('\n')
    text = [line.strip() for line in text if len(line) > 0]
    text = ' '.join(text)
    # Supprimer les liens
    text = re.sub(r'^https?:\/\/.*[\r\n]*', '', text, flags=re.MULTILINE)
    # Lemmatiser les mots
    tokens = word_tokenize(text.lower(), language='french')
    return tokens

n_grams = lambda tokens, n: [" ".join(tokens[i:i+n]) for i in range(len(tokens)-n+1)]

def text2Token(text, spelling = True, stem = True, len_word_min = 2, spell = spell, useless_words = useless_words):
    stopword = stopwords.words('french')
    word_tokens = preprocess_text(text)
    word_tokens = [word for word in word_tokens if word not in stopword and word not in useless_words and len(word) > len_word_min]
    if spelling:
        word_tokens = [spell.correction(word) for word in word_tokens]
        word_tokens = [word for word in word_tokens if word != None]
    if stem:
        word_tokens = [STEMMER.stem(token) for token in word_tokens]
    word_tokens_with_n_grams = word_tokens + n_grams(word_tokens, 2) + n_grams(word_tokens, 3)
    return word_tokens_with_n_grams

def getMostFrequentWords(documents, top=10):
    # Compter les mots
    word_count = {}
    for doc in documents:
        for word in doc:
            if word in word_count:
                word_count[word] += 1
            else:
                word_count[word] = 1

    # Trier les mots par fréquence décroissante
    word_count = sorted(word_count.items(), key=lambda x: x[1], reverse=True)

    if top == float('inf'):
        return word_count
    
    return word_count[:top]




All model checkpoint layers were used when initializing TFCamembertForSequenceClassification.

All the layers of TFCamembertForSequenceClassification were initialized from the model checkpoint at tblard/tf-allocine.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFCamembertForSequenceClassification for predictions without further training.


# Data Cleaning

In [4]:
# Preprocess data
print("Preprocess data")
df = pd.read_csv("reviews.csv")
df = df.dropna(subset=['review_text'])
df = df[df["review_location"]  == "FR"]
df = df.drop_duplicates(subset=['review_text'])
df = df.reset_index(drop=True)
df = df[['review_text', 'review_rating']]

Preprocess data


## Get most frequent words list

In [5]:
df_prepared_data = []
for i in tqdm(range(len(df))):
    df_prepared_data.append(text2Token(df.at[i, "review_text"], stem=False, useless_words=[]))

  0%|          | 0/16933 [00:00<?, ?it/s]

  0%|          | 19/16933 [00:09<2:19:33,  2.02it/s]


KeyboardInterrupt: 

In [10]:
most_frequent_words = getMostFrequentWords(df_prepared_data, top=1000)
pd.DataFrame(most_frequent_words, columns=["word", "count"]).to_csv("most_frequent_words.csv", index=False)

## Traslate most frequent words list to english (for better performance of the pipe)

In [2]:
most_frequent_words = pd.read_csv("most_frequent_words.csv")

In [91]:
if "en" not in most_frequent_words.columns:
    most_frequent_words["en"] = [None for i in range(len(most_frequent_words))]

In [107]:
import translators as ts

def translate(word):
    try:
        return ts.tencent(word, to_language="en", from_language="fr")
    except Exception as e:
        return None

In [108]:
for i in tqdm(range(len(most_frequent_words))):
    if most_frequent_words.at[i, "en"] == None:
        most_frequent_words.at[i, "en"] = translate(most_frequent_words.at[i, "word"])

100%|██████████| 1000/1000 [47:31<00:00,  2.85s/it] 


In [109]:
most_frequent_words.to_csv("most_frequent_words.csv", index=False)

## Get the sentiment of the words

In [7]:
most_frequent_words = pd.read_csv("most_frequent_words.csv")

In [8]:
if "sentiment" in most_frequent_words.columns:
    most_frequent_words["sentiment"] = [None for i in range(len(most_frequent_words))]

In [24]:
for i in tqdm(range(len(most_frequent_words))):
    if most_frequent_words.at[i, "sentiment"] == None and str(most_frequent_words.at[i, "word"]) != "nan":
        pipe_res = pipe(most_frequent_words.at[i, "word"])[0]
        most_frequent_words.at[i, "sentiment"] = pipe_res["label"] if pipe_res["score"] > 0.65 else "MIXED"

100%|██████████| 1000/1000 [04:29<00:00,  3.71it/s]


In [25]:
most_frequent_words.to_csv("most_frequent_words.csv", index=False)

In [27]:
most_frequent_words[most_frequent_words["sentiment"] == "MIXED"][["word"]].to_csv("most_frequent_words_mixed.csv", index=False, header=False)

In [3]:
useless_words = most_frequent_words[most_frequent_words["sentiment"] == "MIXED"]["word"].tolist()[100:]

## Generate train/test

In [31]:
# Generate train dataset
print("Generate train dataset")
train = df.copy()
rus = RandomUnderSampler(random_state=0)
X = train[["review_text"]]
y = train["review_rating"]
X_resampled, y_resampled = rus.fit_resample(X, y)
train = pd.concat([X_resampled, y_resampled], axis=1)
train_idx = train.index.tolist()
train = train.reset_index(drop=True)

# Spell check for train dataset
print("Spell check for train dataset")
for i in tqdm(range(len(train))):
    train.at[i, "review_text"] = json.dumps(text2Token(train.at[i, "review_text"]))

# Save train dataset
print("Save train dataset")
train.to_csv("reviews_train.csv", index=False)

# Generate test dataset
print("Generate test dataset")
test = df.copy()
test = test.drop(train_idx)
rus = RandomUnderSampler(random_state=0)
X = test[["review_text"]]
y = test["review_rating"]
X_resampled, y_resampled = rus.fit_resample(X, y)
test = pd.concat([X_resampled, y_resampled], axis=1)

# Save test dataset
print("Save test dataset")
test.to_csv("reviews_test.csv", index=False)

Generate train dataset
Spell check for train dataset


  0%|          | 0/1905 [00:00<?, ?it/s]

100%|██████████| 1905/1905 [17:02<00:00,  1.86it/s] 

Save train dataset
Generate test dataset
Save test dataset





# Test ML

In [4]:
train = pd.read_csv("reviews_train.csv")

In [5]:
train["review_text"] = train["review_text"].apply(lambda x: ast.literal_eval(x))

In [6]:
train["review_rating"].value_counts()

review_rating
1    381
2    381
3    381
4    381
5    381
Name: count, dtype: int64

In [7]:
from rank_bm25 import BM25Okapi

# Liste d'avis
documents = train["review_text"].tolist()

# Listes de scores
ratings = train["review_rating"].tolist()

# Créer un modèle BM25
bm25 = BM25Okapi(documents)


In [8]:
def getTopDocs(bm25, query, documents, ratings, top=5):
    # Calculer les scores de similarité
    scores = bm25.get_scores(query)

    # Associer chaque avis à son score
    doc_scores = list(zip(documents, scores, ratings))

    # Trier les avis par score décroissant
    return sorted(doc_scores, key=lambda x: x[1], reverse=True)[:top]

In [57]:
""" Not used
# Obtenir les mots positifs et négatifs en utilisant bert
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")
model = AutoModelForSequenceClassification.from_pretrained("nlptown/bert-base-multilingual-uncased-sentiment")

def getSentiment(text):
    # Tokenize the text
    tokens = tokenizer.encode(text, return_tensors='pt')
    # Reduce the tokens size to 512
    tokens = tokens[:, :512]
    # Get the prediction
    result = model(tokens)
    # Return the label
    return torch.argmax(result.logits).item()
"""

In [58]:
""" Not used
def getRevelantWords(most_freq, pos_nb = 5, neg_nb = 5):
    pos_words = []
    neg_words = []
    for word, nb in most_freq:
        if type(word) == tuple:
            word = " ".join(word)
        if getSentiment(word) == 0 and len(neg_words) < neg_nb:
            neg_words.append(word)
        elif getSentiment(word) == 4 and len(pos_words) < pos_nb:
            pos_words.append(word)

        if len(neg_words) == neg_nb and len(pos_words) == pos_nb:
            break

    return pos_words, neg_words
"""

In [9]:
def separer_phrase(phrase):
    # On ajoute des points aux sauts de ligne
    phrase = phrase.replace('\n', '.')

    # Divise d'abord la phrase en utilisant les points, points d'interrogation, points d'exclamation.
    pattern = r'(?<=[.!?])(?=\s|[A-Z"\'(])'
    groupes = re.split(pattern, phrase)

    groupes_fins = []
    for groupe in groupes:
        # Combinaison des motifs de virgule et "et" en une seule expression régulière
        # Sépare sur les virgules (en évitant les nombres décimaux), sur les ; et : et sur les conjonctions de coordinations (avec un contexte spécifique)
        pattern_combined = r'(?<=.{20},)\s(?!\d)|[;:]|\b(mais|ou|et|donc|or|ni|car)\b(?=.{15,})'
        sous_groupes = re.split(pattern_combined, groupe)
        groupes_fins.extend(sous_groupes)

    return [groupe.strip() for groupe in groupes_fins if groupe is not None and groupe.strip() and groupe.strip() not in ['.', ',', 'mais', 'ou', 'et', 'donc', 'or', 'ni', 'car']]

In [26]:
def estimate_score(top_docs, origin_query = None, use_bm25 = True, FIABILITY_THRESHOLD = 0.6):
    # Calculer la note moyenne des avis
    stars_bm25 = None
    if use_bm25:
        stars_bm25 = sum([int(doc[2]) for doc in top_docs]) / len(top_docs)
        if origin_query is None:
            return stars_bm25
    try:
        score_pipe = pipe(origin_query)
        stars_pipe = 2.5 + 2.5 * FIABILITY_THRESHOLD if score_pipe[0]["label"] == "POSITIVE" else (2.5 - 2.5 * FIABILITY_THRESHOLD if score_pipe[0]["label"] == "NEGATIVE" else 2.5)
        return (stars_bm25 + stars_pipe) / 2
    except:
        return stars_bm25

In [35]:
def getRevelantSentences(origin_query, most_freq, documents, ratings, top=5, use_bm25 = True, use_pipe = True):

    # Appel de la fonction
    groupes = separer_phrase(origin_query)

    # Obtenir les scores de chaque groupe
    scores = []
    for groupe in groupes:
        if use_pipe and use_bm25:
            scores.append(estimate_score(getTopDocs(bm25, text2Token(groupe), documents, ratings), origin_query))
        elif use_pipe:
            scores.append(estimate_score(None, origin_query, use_bm25=False))
        elif use_bm25:
            scores.append(estimate_score(getTopDocs(bm25, text2Token(groupe), documents, ratings)))
            
    pos_list = []
    neg_list = []
    for groupe, score in zip(groupes, scores):
        group_tokens = text2Token(groupe)
        sumFreq = sum([freq for word, freq in most_freq if word in group_tokens])
        if score >= 3.5:
            pos_list.append((groupe, sumFreq))
        elif score <= 1.5:
            neg_list.append((groupe, sumFreq))

    pos_list = [sentence[0] for sentence in sorted(pos_list, key=lambda x: x[1], reverse=True)[:top]]
    neg_list = [sentence[0] for sentence in sorted(neg_list, key=lambda x: x[1], reverse=True)[:top]]

    return pos_list, neg_list

In [29]:
def main(origin_query, bm25=bm25, documents=documents, ratings=ratings, spell=spell, use_bm25 = True, use_pipe = True):
    query = text2Token(origin_query)
    if use_bm25:
        top_docs = getTopDocs(bm25, query, documents, ratings, top=5)
    most_freq = getMostFrequentWords([doc[0] for doc in top_docs], top=50)
    """  Not used
    relevants = getRevelantWords(most_freq, pos_nb = 5, neg_nb = 5)
    """

    pos_list, neg_list = getRevelantSentences(origin_query, most_freq, documents, ratings, top=5, use_bm25 = True, use_pipe = True)
    if not use_pipe:
        origin_query = None
    return estimate_score(top_docs, origin_query, use_bm25=use_bm25), pos_list, neg_list

# Test Model

## Use data unsed and balanced the new dataset

In [17]:
test = pd.read_csv("reviews_test.csv")

test["review_rating"].value_counts()

review_rating
1    152
3    152
4    152
5    152
Name: count, dtype: int64

## Get results

In [24]:
def get_score_only(origin_query):
    query = text2Token(origin_query)
    top_docs = getTopDocs(bm25, query, documents, ratings, top=5)
    return estimate_score(top_docs, origin_query)

In [25]:
X_test, y_test = test["review_text"], test["review_rating"]

y_pred = [get_score_only(query) for query in tqdm(X_test, total=len(X_test))]
print(f"MAE: {mean_absolute_error(y_test, y_pred)}")
print(f"MSE: {mean_squared_error(y_test, y_pred)}")

  0%|          | 0/608 [00:00<?, ?it/s]

100%|██████████| 608/608 [09:06<00:00,  1.11it/s]

MAE: 0.7905030231971882
MSE: 1.128923385959617





## Example on test dataset

In [22]:
elmt = test.iloc[random.randint(0, len(test))]
origin_query = elmt["review_text"]
score, pos_list, neg_list = main(origin_query)

print(f"Query: {origin_query}")
print(f"Vrai score: {elmt['review_rating']}")

print(f"Score: {score}")
print(f"Séquences positives: {pos_list}")
print(f"Séquences négatives: {neg_list}")

Query: Voiture achetée le 19 août, tombée en panne 1 semaine après (voyant défaut moteur rouge). Restée 2 semaines sur place pour la récupérer et retombée en panne au bout de 10 km. Véhicule immobilisé à nouveau 1 semaine chez eux. Pour retomber en panne au bout de 500km. Envoyée chez le concessionnaire Nissan car plus confiance aux compétences des mécaniciens. En espérant ne plus retomber en panne. Malgré tout, heureusement que le service commercial rattrape un peu les choses...
Vrai score: 1
Score: 1.7
Séquences positives: []
Séquences négatives: ['retombée en panne au bout de 10 km.', 'Pour retomber en panne au bout de 500km.', 'En espérant ne plus retomber en panne.', 'Véhicule immobilisé à nouveau 1 semaine chez eux.']


# Streamlit

In [32]:
import streamlit as st

# Importez ici vos fonctions de prédiction (prediction_1, prediction_2, prediction_3)

def prediction_1(origin_query):
    score, pos_list, neg_list = main(origin_query, use_pipe=False)
    return {
        "nombre d étoile sur 5": score,
        "liste phrases positives": pos_list,
        "liste phrases négatives": neg_list
    }

def prediction_2(origin_query):
    score, pos_list, neg_list = main(origin_query, use_bm25=False)
    return {
        "nombre d étoile sur 5": score,
        "liste phrases positives": pos_list,
        "liste phrases négatives": neg_list
    }

def prediction_3(origin_query):
    score, pos_list, neg_list = main(origin_query)
    return {
        "nombre d étoile sur 5": score,
        "liste phrases positives": pos_list,
        "liste phrases négatives": neg_list
    }

def afficher_resultats(resultats):
    st.subheader("Résultats de la prédiction :")
    st.write(f"Nombre d'étoiles sur 5 : {'🌟' * resultats['nombre d étoile sur 5']}")
    st.subheader("Liste de phrases positives :")
    for phrase in resultats["liste phrases positives"]:
        st.write(f"👍 {phrase}")
    st.subheader("Liste de phrases négatives :")
    for phrase in resultats["liste phrases négatives"]:
        st.write(f"👎 {phrase}")

def run():
    st.title("Analyse d'avis Internet")

    avis_utilisateur = st.text_area("Entrez votre avis ici :")

    if st.button("Prédiction BM 25"):
        resultats_1 = prediction_1(avis_utilisateur)
        afficher_resultats(resultats_1)

    if st.button("Prédiction Transformers"):
        resultats_2 = prediction_2(avis_utilisateur)
        afficher_resultats(resultats_2)

    if st.button("Prédiction BM 25 + Transformers"):
        resultats_3 = prediction_3(avis_utilisateur)
        afficher_resultats(resultats_3)

In [34]:
run()