#Projet : classifieur d'avis Google

##Description du projet

Ce projet a pour but de classifier des avis Google de restaurants comme positif, négatif ou neutre. Notre dataset est constitué d'avis récupérés directement sur Google Maps et nettoyés avec pour chacun d'eux : une note donnée par l'utilisateur (nombre d’étoiles), un label “positif”, “négatif” ou “neutre” et un texte d'évaluation. Nous estimons :

*   🙁 1-2 étoiles : avis négatif,
*   😐 3 étoiles : avis neutre,
*   🙂 4-5 étoiles : avis positif.

L'utilisation du modèle DistilBERT permet de classer un avis comme positif, négatif ou neutre selon le texte contenu dans l’avis posté.


##Importation de modules

In [None]:
!pip install selenium
!pip install webdriver_manager
!pip install parsel

!wget https://chromedriver.storage.googleapis.com/114.0.5735.90/chromedriver_linux64.zip
!unzip chromedriver_linux64.zip -d webdriver
!wget http://mirror.cs.uchicago.edu/google-chrome/pool/main/g/google-chrome-stable/google-chrome-stable_114.0.5735.106-1_amd64.deb
!sudo apt install libu2f-udev libvulkan1
!sudo apt --fix-broken install
!dpkg -i google-chrome-stable_114.0.5735.106-1_amd64.deb

!ls /usr/bin/google-chrome

#Pour récupérer nos données
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from urllib.parse import unquote

from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.common.by import By
from parsel import Selector
from selenium.common.exceptions import NoSuchElementException
from urllib.parse import urljoin
import time
import json

#Pour diviser nos données
from sklearn.model_selection import train_test_split

#Pour entraîner et tester notre modèle
import torch
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm

#Pour évaluer notre modèle
from sklearn.metrics import precision_score, recall_score, f1_score

##Récupération des données

Nous récupérons nos données en partant de plusieurs URLs avec la recherche "nom_de_ville restaurant" dans Google Maps.
Notre fonction crawl permet de récupérer d'autres liens à partir de ces URLs : nous allons cliquer sur chaque lien présent dans la page (liste des restaurants), puis sur l'onglet "Avis" pour chaque lien.
Notre fonction scrap récupère le nombre d'étoiles de l'avis, et le texte de l'avis.

###Installation du chromedriver

In [None]:
#installation du webdriver chrome
#from selenium import webdriver
#from selenium.webdriver.chrome.service import Service as ChromeService
service=ChromeService(executable_path='/content/webdriver/chromedriver')
options=webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--start-maximized') #Ouverture dans une fenêtre maximisée
options.add_argument('--disable-infobars') #Désactive la barre d'état
options.add_argument('--disable-extensions')
options.add_argument('--lang=fr')
options.binary_location="/usr/bin/google-chrome"

driver=webdriver.Chrome(service=service,options=options)

###Crawling et scrapping : récupération de données depuis Google

In [None]:
params={
    'start_urls': [
        {'url': 'https://www.google.fr/maps/search/restaurant+grenoble/@45.1908097,5.7247407,17z?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/restaurant+lyon/@45.7553741,4.8226548,14z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/restaurant+marseille/@44.5172086,5.1139581,8z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/restaurant+paris/@48.8618813,2.3325839,13z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/restaurant+arcachon/@44.6524523,-1.1718973,13z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/New+York+restaurant/@40.7345967,-74.0158569,14z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/Los+Angeles+restaurant/@34.0252617,-118.3896428,12z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/Kansas+City+restaurant/@39.0748862,-94.6944134,13z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/Hyderabad+restaurant/@17.4131285,78.2432366,11z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/Londres+restaurant/@51.5179866,-0.169222,13z/data=!3m1!4b1?entry=ttu'},
        {'url': 'https://www.google.fr/maps/search/%C3%89dimbourg+restaurant/@55.9528971,-3.2056526,15z/data=!3m1!4b1?entry=ttu'}
        ],
    'url_pattern': r'https://www.google.fr/maps/search/',
    'max_depth':1,
    'max_crawled':100,
    'latency':0.5,
}


def crawl(params):
    start_urls=params['start_urls']
    max_depth=params['max_depth']
    max_crawled=params['max_crawled']

    for start_url in params['start_urls']:
      start_url['url']=unquote(start_url['url'])


    if not start_urls:
        print('No start URLs specified in actor input, exiting...')

    default_queue=[]
    crawled_data=[]
    already_crawled={}
    nb_crawled=0

    for start_url in start_urls:
        url=start_url.get('url')
        print(f'Enqueuing {url} ...')
        default_queue.append({
            'url': url,
            'userData': {'depth': 0},
        })

        print('Launching Chrome WebDriver...')

        chrome_options=ChromeOptions()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')

        driver=webdriver.Chrome(options=chrome_options)


        #On gère la page de cookies si y'en a une
        try:
          accept_button=driver.find_element(By.XPATH, '//button[contains(@aria-label, "Accept all")]')
          accept_button.click()
          print("Clicked on 'Accept all' button")
        except NoSuchElementException:
          print("No 'Accept all' button found")

        time.sleep(2)

        try:
          driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'end', inline: 'nearest'});", driver.find_element(By.CLASS_NAME, "k7jAl miFGmb lJ3Kh PLbyfe"))
          print("Page scrolled")
        except NoSuchElementException:
          print("Couldn't scroll")

        time.sleep(2)


        while len(default_queue)>0:
            request=default_queue.pop()
            nb_crawled+=1
            if nb_crawled>max_crawled:
                print(f"{max_crawled=} limit has been reached. Crawling is finished")
                break
            time.sleep(params['latency'])
            url=request['url']
            depth=request['userData']['depth']

            #On commence à crawl depuis les URL de départ
            print(f'Scraping {url} ...')


            try:
                driver.get(url)

                if depth<max_depth:
                  #on cherche ici à récupérer les liens sur la page (pages de resto)
                  for link in driver.find_elements(By.XPATH, '//a[contains(@class, "hfpxzc")]'):
                      link_href=link.get_attribute('href')
                      link_url=urljoin(url, link_href)
                      if link_url.startswith(('http://www.google.fr/', 'https://www.google.fr/','google.fr/','http://www.google.com/','https://www.google.com/')):
                          if url not in already_crawled:
                              print(f'Enqueuing {link_url} ...')
                              default_queue.append({
                                  'url': link_url,
                                  'userData':{'depth': depth+1},
                              })

                #On récup la page des avis des pages de resto
                try:
                    avis_button=driver.find_element(By.XPATH, '//button[contains(@aria-label, "Reviews")]')
                    #avis_bouton=driver.find_element(By.XPATH, '//button[contains(@class,"hh2c6 ")]')
                    avis_button.click()
                    #avis_bouton.click()
                    print("Clicked on 'Reviews'")

                    time.sleep(3)

                    reviews_url=driver.current_url

                    if url!=start_url['url']:
                        crawled_data.append({'restaurant_url': url, 'reviews_url': reviews_url})

                except NoSuchElementException:
                    print("No 'Reviews' button found")

            except Exception as e:
                print(f'Cannot extract data from {url}. Error: {e}')
            finally:
                already_crawled[url]=1

        driver.quit()

    return crawled_data

In [None]:
restos=crawl(params)
#for resto in restos:
  #print(resto)

reviews_urls=[resto['reviews_url'] for resto in restos]
print(reviews_urls)
len(reviews_urls)

In [None]:
#print(restos)
reviews_urls=[resto['reviews_url'] for resto in restos]
print(reviews_urls)
print("Nombres d'URLs récupérées : ",len(reviews_urls))

In [None]:
def scrap(driver,url_list):
    all_results=[]

    print('Launching Chrome WebDriver...')

    chrome_options=webdriver.ChromeOptions()
    chrome_options.add_argument('--headless')
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')

    driver = webdriver.Chrome(options=chrome_options)

    print('Successfully launched')

    #On initie un set ici pour ne pas avoir de reviews en double
    seen_reviews = set()

    for url in url_list:
        driver.get(url)
        print(f'Start scraping {url} ...')
        try:
            accept_button = driver.find_element(By.XPATH, '//button[contains(@aria-label, "Accept all")]')
            accept_button.click()
            time.sleep(2)
            print("Clicked on 'Accept all' button")
        except NoSuchElementException:
            print("No 'Accept all' button found")

        try:
            driver.execute_script("arguments[0].scrollIntoView({behavior: 'smooth', block: 'end', inline: 'nearest'});", driver.find_element(By.CLASS_NAME, "bJzME"))
            print("Page scrolled")
            time.sleep(2)
        except NoSuchElementException:
            print("Couldn't scroll")

        page_content=driver.page_source
        response=Selector(text=page_content)

        results=[]

        for el in response.xpath('//div[contains(@class, "m6QErb ")]'):
            try:
                more_button=driver.find_element(By.XPATH, '//button[contains(@aria-label,"See more")]')
                more_button.click()
                print("Clicked on 'See more'")
                time.sleep(2)
            except NoSuchElementException:
                print("No 'See more' button found")

            rating_list=el.xpath('.//span[contains(@class, "kvMYJc")]/@aria-label').extract()
            body_list=el.xpath('.//span[contains(@class, "wiI7pd")]/text()').extract()
            if rating_list and body_list:
                for rating,body in zip(rating_list,body_list):
                    #ici on va vérifier si la review n'a pas déjà été scrap pour éviter les doublons
                    review_key=f'{rating}-{body}'
                    if review_key not in seen_reviews:
                        results.append({
                            'rating': rating.replace('stars', '').replace('star', '').strip(),
                            'body': body.strip(),
                        })
                        seen_reviews.add(review_key)

        all_results.extend(results)

    print('Closing Chrome WebDriver...')
    driver.quit()
    print('Successfully closed')

    print("Nombre d'avis récupérés : ", len(all_results))
    return all_results

In [None]:
scrapped_reviews=scrap(driver,reviews_urls)

for result in scrapped_reviews:
    print(f"Note : {result['rating']}")
    print(f"Texte : {result['body']}")
    print("="*50)

###Division de nos données en partition train et test

In [None]:
def split_train_test_data(data,test_size=0.2,random_state=None):
    """
    Cette fonction divise les données en ensembles d'entraînement et de test.

    Entrée:
        data (list): liste des données à diviser
        test_size (float): proportion des données à inclure dans test
        random_state (int): graine pour la génération de nombres aléatoires pour la reproductibilité

    Sortie:
        tuple contenant les ensembles d'entraînement et de test (X_train, X_test)
    """
    X=[review['body'] for review in data]
    y=[review['rating'] for review in data]

    X_train, X_test,y_train,y_test=train_test_split(X,y,test_size=test_size,random_state=random_state)

    print("Les données ont bien été divisées.")
    print(f"Taille de l'ensemble d'entraînement : {len(X_train)}")
    print(f"Taille de l'ensemble de test : {len(X_test)}")

    return (X_train,X_test,y_train,y_test)

###Labellisation de nos données
Pour chaque avis récupéré, on lui donne un label "positif", "négatif" ou "neutre" selon son nombre d'étoiles.

In [None]:
def label_reviews(reviews:list,ratings:list):
    """
    Cette fonction va donner un label à chaque review récupérée :
      - 1-2 étoiles : avis négatif,
      - 3 étoiles : avis neutre,
      - 4-5 étoiles : avis positif.

    Entrées :
      reviews : liste des reviews à labelliser
      ratings : liste des notes (1 à 5) associées aux reviews

    Sortie :
      les reviews labellisées

    Nécessite les modules suivants :
    """
    labeled_reviews=[]
    num_negatif=0
    num_neutre=0
    num_positif=0

    for review,rating in zip(reviews,ratings):
        stars=rating
        if stars in ['1','2']:
            label='negatif'
            num_negatif+=1
        elif stars=='3':
            label='neutre'
            num_neutre+=1
        elif stars in ['4','5']:
            label='positif'
            num_positif+=1
        else:
            label='unknown'  #au cas où on a mal ou pas récup le nb d'étoiles

        labeled_reviews.append({'rating': stars, 'label': label, 'body': review})

    print("Nombre d'avis négatifs :",num_negatif)
    print("Nombre d'avis neutres :",num_neutre)
    print("Nombre d'avis positifs :",num_positif)
    print(labeled_reviews)

    return labeled_reviews

##Entraînement du modèle

###Chargement du modèle

In [None]:
#On charge le modèle et le tokenizer DistilBERT
modele_nom='distilbert-base-uncased'
tokenizer=DistilBertTokenizer.from_pretrained(modele_nom)
modele=DistilBertForSequenceClassification.from_pretrained(modele_nom,num_labels=3)  #3 classes : positif, neutre, négatif

###Prétraitement des données : tokenisation et encodage

In [None]:
def tokenize_reviews(reviews:list):
    """
    Cette fonction tokenise les avis et prépare les masques d'attention.

    Entrées:
        reviews : Liste des avis à tokenizer
        labels : Liste des étiquettes associées aux avis

    Sortie:
        Les entrées, les masques d'attention et les étiquettes d'entraînement et de test encodés.
    """

    #Pour récup les identifiants des tokens d'entrée
    input_ids=[]
    #Pour stocker les masques d'attention
    attention_masks=[]
    #Pour stocker les étiquettes
    labels=[]

    #Sur chaque avis, on fait une boucle
    for review in tqdm(reviews):

        body=review['body']  #On récup le corps de l'avis
        label=review['label']  #On récup l'étiquette de l'avis

        encoded_dict=tokenizer.encode_plus( #On encode l'avis avec le tokenizer
            body,
            add_special_tokens=True, #Ajout des tokens spéciaux [CLS] (début de la séquence) et [SEP] (fin de séquence)
            max_length=128,  #Taille max des séquences
            padding='max_length', #Remplissage des séquences à la taille max
            truncation=True, #Si la séquence dépasse la taille max, on la tronque
            return_attention_mask=True, #On active masque d'attention
            return_tensors='pt' #Retour des tensors Pytorch
        )

        #Pour chaque review, on ajoute les identifiants des tokens d'entrée à la liste et les masques d'attention également
        input_ids.append(encoded_dict['input_ids'])
        attention_masks.append(encoded_dict['attention_mask'])

        #On ajoute l'étiquette
        labels.append(label)

    #On concatène les identifiants, puis les masques d'attention en un seul tensor
    input_ids=torch.cat(input_ids, dim=0)
    attention_masks=torch.cat(attention_masks, dim=0)

    #Ici on convertit les étiquettes textuelles en identifiants numériques en utilisant le dictionnaire
    label_dict={"positif":0,"negatif":1,"neutre":2}
    labels=[label_dict[label] for label in labels]
    #On convertit nos étiquettes en tensor PyTorch
    labels=torch.tensor(labels)


    return input_ids,attention_masks,labels

In [None]:
#Utilisation de la fonction pour diviser les données
X_train,X_test,y_train,y_test=split_train_test_data(scrapped_reviews)

#Étiquetage des données d'entraînement et de test
labeled_train_reviews=label_reviews(X_train,y_train)
labeled_test_reviews=label_reviews(X_test,y_test)

#Utilisation des données étiquetées pour la tokenization et encodage
train_inputs,train_masks,train_labels=tokenize_reviews(labeled_train_reviews)
test_inputs,test_masks,test_labels=tokenize_reviews(labeled_test_reviews)

###Entraînement du modèle

In [None]:
def train_model(model,train_dataloader,optimizer,epochs=3):
    """
    La fonction train_model entraîne notre modèle sur les données d'entraînement que nous avons récupérées.

    Entrées :
      model : le modèle à entraîner
      train_dataloader : le dataloader contenant nos données d'entraînement
      optimizer : l'optimiseur qui est utilisé pour mettre à jour les poids du modèle
      epochs : le nombre d'époques pour lesquelles entraîner le modèle

    Sortie :
      end_train, chaîne de caractères indiquant l'état de l'entraînement
    """

    try:
        #On vérifie si le GPU est dispo (plus rapide), sinon on utilise le CPU
        device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        #On déplace le modèle sur le GPU ou le CPU
        model.to(device)

        #On boucle sur le nombre d'époques donné en entrée :
        for epoch in range(epochs):

            #On met le modèle en mode "entraînement"
            model.train()

            #Ici on initialise la perte totale pour l'époque en cours
            perte_totale=0

            #On boucle sur les lots dans le dataloader d'entraînement
            for batch in tqdm(train_dataloader):
                b_input_ids=batch[0].to(device)
                b_input_mask=batch[1].to(device)
                b_labels=batch[2].to(device)

                #Réinitialisation des gradients du modèle
                model.zero_grad()

                #On passe les données au modèle pour obtenir des sorties
                outputs=model(b_input_ids,attention_mask=b_input_mask,labels=b_labels)
                loss=outputs.loss #On obtient ici la perte à partir des sorties obtenues

                #Ajout de cette perte à la perte totale
                perte_totale+=loss.item()

                #On calcule les gradients
                loss.backward()

                #On met à jour le poids du modèle avec l'optimiseur
                optimizer.step()

            #Retour sur la perte totale pour cette époque
            print(f'Epoque : {epoch+1}/{epochs}, Perte : {perte_totale}')

        end_train="Entraînement terminé !"


    except Exception as e:
      end_train=f"Entraînement impossible : {str(e)}"

    return end_train

In [None]:
#On crée les dataloaders
train_data=TensorDataset(train_inputs,train_masks,train_labels)
train_dataloader=DataLoader(train_data,batch_size=32,shuffle=True)

#Optimiseur
optimizer=torch.optim.AdamW(modele.parameters(),lr=2e-5)

#Entraînement du modèle
train_model(modele,train_dataloader,optimizer)

##Test du modèle

In [None]:
def test_model(model,dataloader):
    """
    Cette fonction teste notre modèle sur des données test.

    Entrées :
      model : le modèle à tester
      dataloader : le dataloader contenant les données de test

    Sortie :
      les étiquettes prédites par notre modèle
    """
    try:
        #On vérifie si le GPU est dispo (plus rapide), sinon on utilise le CPU
        device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        #On bouge le modèle sur le GPU ou CPU
        model.to(device)

        #On met le modèle en mode "eval" (pas de maj des gradients)
        model.eval()

        #Pour stocker les prédictions faites par notre modèle
        predictions=[]

        #On boucle sur les lots des données dans le dataloader test
        with torch.no_grad():
            for batch in dataloader:
                inputs=batch[0].to(device)
                masks=batch[1].to(device)

                #On passe les données au modèle pour obtenir des logits
                outputs=model(inputs,attention_mask=masks)
                logits=outputs.logits
                _,predicted_labels=torch.max(logits, 1) #fct argmax pour obtenir étiquettes prédites

                #On ajoute les labels prédits à notre liste
                predictions.extend(predicted_labels.tolist())

        #print("Indices avant conversion : ",predictions)

        try:
          #Ici on convertit les indices prédits (0, 1, 2) en étiquettes (positif, négatif et neutre)
          label_dict={"positif":0,"negatif":1,"neutre":2}
          label_map_inverse={v: k for k, v in label_dict.items()} #on inverse le dico
          predicted_labels=[label_map_inverse[prediction] for prediction in predictions]

        except Exception as e:
          print("La conversion des indices en labels est impossible : ",str(e))


        end_test="Test terminé !"
        #print("Labels prédits : ",predicted_labels)

        count_positif=predicted_labels.count("positif")
        count_negatif=predicted_labels.count("negatif")
        count_neutre=predicted_labels.count("neutre")

    except Exception as e:
        end_test=f"Test impossible : {str(e)}"
        predicted_labels=0
        count_positif=0
        count_negatif=0
        count_neutre=0

    print(end_test)
    print("Avis positifs prédits :",count_positif)
    print("Avis négatifs prédits :",count_negatif)
    print("Avis neutres prédits :",count_neutre)

    return predicted_labels

In [None]:
#On crée un dataloader pour les données de test
test_data=TensorDataset(test_inputs, test_masks, test_labels)
test_dataloader=DataLoader(test_data, batch_size=32)

#Utilisation de ce dataloader pour tester le modèle
test_predictions=test_model(modele,test_dataloader)

##Evaluation du modèle

In [None]:
def evaluate(true_labels:list,predicted_labels:list):
    """
    Cette fonction évalue les performances du modèle sur ses prédictions par rapport aux vraies étiquettes.

    Entrées :
      true_labels : vraies étiquettes des données
      predicted_labels : étiquettes prédites par le modèle

    Sortie :
      les métriques : précision, rappel et Fmesure
    """
    try:
      #Calcul de la précision, du rappel et de la Fmesure
      precision=precision_score(true_labels,predicted_labels, average='weighted',zero_division=0)
      rappel=recall_score(true_labels,predicted_labels,average='weighted',zero_division=0)
      fmesure=f1_score(true_labels,predicted_labels,average='weighted',zero_division=0)

      #print(true_labels)
      #print(predicted_labels)

      count_positif_pred=predicted_labels.count("positif")
      count_negatif_pred=predicted_labels.count("negatif")
      count_neutre_pred=predicted_labels.count("neutre")
      count_positif_true=true_labels.count("positif")
      count_negatif_true=true_labels.count("negatif")
      count_neutre_true=true_labels.count("neutre")

      print("PREDICTIONS :")
      print("\tAvis positifs prédits :",count_positif_pred)
      print("\tAvis négatifs prédits :",count_negatif_pred)
      print("\tAvis neutres prédits :",count_neutre_pred)
      print("\tTOTAL : ",len(predicted_labels),"\n")
      print("="*50)
      print("LABELS REELS :")
      print("\tAvis positifs réels :",count_positif_true)
      print("\tAvis négatifs réels :",count_negatif_true)
      print("\tAvis neutres réels :",count_neutre_true)
      print("\tTOTAL : ",len(true_labels),"\n")
      print("="*50)

      metriques=print(f"EVALUATION DU MODELE :\n\tPrécision : {precision}\n\tRappel : {rappel}\n\tFmesure : {fmesure}\n")

    except Exception as e:
      metriques=f"Evaluation impossible : {str(e)}"

    return metriques

In [None]:
true_labels=[review['label'] for review in labeled_test_reviews]
evaluate(true_labels,test_predictions)

In [None]:
print(true_labels)
print(test_predictions)