# Projet: apports de Spark à la recommandation et au clustering de vins

On considère un dataset dans lequel est proposé différents vins, une note, un prix ainsi qu'une description donnée par une personne ayant gouté ce vin. Le but de ce projet est de proposer deux chaines de traitement, une sans Spark et une avec, afin de réaliser les deux opérations suivantes:
- recommandations de vins basés sur la cosine similarité entre les descriptions des vin.
- clustering des vins en une dizaine de cluster afin de déterminer des groupes.

**Link to data**:
https://www.kaggle.com/zynicide/wine-reviews#winemag-data-130k-v2.csv

In [1]:
from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
import re
import collections
import numpy as np
import time
import pandas as pd
import nltk
from sklearn.decomposition import TruncatedSVD
from scipy.sparse import csc_matrix
from sklearn.cluster import KMeans as skKMeans
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction import FeatureHasher

import pyspark
import findspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import udf,col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans as pysparkKMeans
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Word2Vec
from pyspark.sql.types import FloatType

PATH_TO_DATA = r"C:/Users/assae/Documents/Cours CS/3A/OMA/Big Data/"

## Préparation du dataframe

In [2]:
a = time.time()

df = pd.read_csv(PATH_TO_DATA + "winemag-data-130k-v2.csv", index_col=0) 
 
todrop = ['designation', 'region_1', 'region_2', 'taster_name', 'taster_twitter_handle']    
    
df = df.drop(todrop, axis=1)
df = df.dropna().reset_index(drop=True)
df = df.drop_duplicates(subset='title').reset_index(drop=True)
df[["points", "price"]] = df[["points", "price"]].apply(pd.to_numeric)
print(df.shape)

dict_wine_description = dict(zip(list(df['title']), list(df['description'])))

df.head()

b = time.time()
print("Time to extract data: ", b-a)

(110582, 8)
Time to extract data:  1.732569932937622


# 1. Première méthode : sans Spark

## 1.1. Recommandation basée sur des vins ayant une description similaire

### 1.1.1. Nettoyage des descrptions

In [3]:
stop_words = list(stopwords.words('english'))

def stem(string):
    stemmer = SnowballStemmer("english")
    string = string.split(" ")
    chaine = ""
    for i in range(len(string)):
        chaine += " " + stemmer.stem(string[i])
    return chaine

def clean_txt(txt):
    global stop_words
    
    # lower text
    txt = txt.lower()
    ### special escaping character '...'
    txt = txt.replace(u'\u2026','.')
    txt = txt.replace(u'\u00a0',' ')
    ### remove non alphanumeric char
    txt = re.sub("[^a-zA-Z0-9 ]", '', txt)
    ### remove stop words
    txt1 = txt.split(" ")
    txt2 = [x for x in txt1 if x not in stop_words]
    txt0 = ""
    for i in range(len(txt2)):
        txt0 += " " + txt2[i]
    txt = txt0 
    ### stemming
    txt = stem(txt)
    return txt

In [4]:
# Test

title = df['title'][0]

print(dict_wine_description[title])
clean_txt(dict_wine_description[title])

This is ripe and fruity, a wine that is smooth while still structured. Firm tannins are filled out with juicy red berry fruits and freshened with acidity. It's  already drinkable, although it will certainly be better from 2016.


'  ripe fruiti wine smooth still structur firm tannin fill juici red berri fruit freshen acid  alreadi drinkabl although certain better 2016'

In [5]:
def clean_descriptions(dico):
    a = time.time()
    cleaned = []
    keys = list(dico.keys())
    values = list(dico.values())
    for i in range(len(values)):
        cleaned.append(clean_txt(values[i]))
    b = time.time()
    print("Time to clean the descriptions : ", b-a)  
    return dict(zip(keys, cleaned))

In [6]:
dict_cleaned_descriptions = clean_descriptions(dict_wine_description)

Time to clean the descriptions :  62.47929859161377


### 1.1.2. Vectorisation des descriptions

In [7]:
def one_hot_encoding(dict_wines):
    a = time.time()
    vectorizer = CountVectorizer()
    X = vectorizer.fit_transform(list(dict_wines.values()))
    b = time.time()
    print("Time to encode descriptions : ", b-a)
    return dict(zip(list(dict_wines.keys()), X)), X

def tf_idf(dict_wines):
    a = time.time()
    vectorizer = TfidfVectorizer()
    X = vectorizer.fit_transform(list(dict_wines.values()))
    b = time.time()
    print("Time to encode descriptions : ", b-a)
    return dict(zip(list(dict_wines.keys()), X)), X

def hachage(dict_wines):
    a = time.time()
    values = list(dict_wines.values())
    tohash = []
    for sent in values:
        cnt = collections.Counter()
        for word in sent.split():
            cnt[word] += 1
        tohash.append(dict(cnt))
    h = FeatureHasher(n_features=10)
    X = h.transform(tohash)
    b = time.time()
    print("Time to encode descriptions : ", b-a)
    return dict(zip(list(dict_wines.keys()), X)), X

In [8]:
title = df['title'][0]
a,b = one_hot_encoding(dict_cleaned_descriptions)

Time to encode descriptions :  3.187100887298584


### 1.1.3. Recommandations

In [9]:
def score(v1, v2):
    cos_sim = np.dot(v1, v2) / (np.linalg.norm(v1)*np.linalg.norm(v2))
    return cos_sim


def recom_knn(wine, dict_wines, k, model):
    keys = list(dict_wines.keys())
    if model == 'one-hot':
        dict_encoding, encoding = one_hot_encoding(dict_wines)
    elif model == 'tfidf':
        dict_encoding, encoding = tf_idf(dict_wines)
    elif model == 'hash':
        dict_encoding, encoding = hachage(dict_wines)
    else:
        print("Model can be 'one-hot', 'tfidf', or 'hash' only")
        return None
    a = time.time()
    scores = []
    values = list(dict_encoding.values())
    for i in range(len(dict_encoding)):
        considered_wine = np.array(dict_encoding[wine].todense())[0]
        scores.append(score(considered_wine, np.array(values[i].todense())[0]))
    sorted_index = np.argsort(-np.array(scores))
    wines = []
    for j in range(1, k+1):
        wines.append((keys[sorted_index[j]], scores[sorted_index[j]]))
    b = time.time()
    print("Time to compute cosinus similarities : ", b-a)
    return wines

In [10]:
title = df['title'][0]
print(title)
recom_knn(title, dict_cleaned_descriptions, 10, 'one-hot')

Quinta dos Avidagos 2011 Avidagos Red (Douro)
Time to encode descriptions :  3.210298538208008
Time to compute cosinus similarities :  32.19903612136841


[('Enoport 2015 Vilalva Reserva Red (Douro)', 0.5821817364274594),
 ('Château de Pizay 2014  Régnié', 0.5455447255899809),
 ('Les Frères Perroud 2014 Vieilles Vignes  (Brouilly)', 0.5238095238095238),
 ('Château Bertinerie 2011 Grande Cuvée  (Blaye Côtes de Bordeaux)',
  0.5238095238095238),
 ('Domaine Chasselay 2010 Les Grands Eparcieux  (Beaujolais)',
  0.5238095238095238),
 ('Domaine les Pins 2012 Le Clos Cabernet Franc (Bourgueil)',
  0.5135525910130956),
 ('Schröder & Schÿler 2012 Private Réserve  (Médoc)', 0.5095101710852534),
 ('Manuel Olivier 2012  Pommard', 0.5039526306789697),
 ('Château Bélingard 2012 Réserve Red (Côtes de Bergerac)',
  0.5039526306789697),
 ('Casa Santos Lima 2012 Quinta dos Bons Ventos Red (Lisboa)',
  0.5006261743217588)]

In [11]:
title = df['title'][0]
print(title)
recom_knn(title, dict_cleaned_descriptions, 10, 'tfidf')

Quinta dos Avidagos 2011 Avidagos Red (Douro)
Time to encode descriptions :  3.6084554195404053
Time to compute cosinus similarities :  29.895278215408325


[('Enoport 2015 Vilalva Reserva Red (Douro)', 0.49097115516236123),
 ('Joseph Drouhin 2012  Moulin-à-Vent', 0.42944709961586114),
 ('Casa Santos Lima 2012 Quinta dos Bons Ventos Red (Lisboa)',
  0.4246471882115379),
 ("Fabien Collonge 2014 L'Aurore des Côtes  (Chiroubles)", 0.4010955738952229),
 ('Les Frères Perroud 2014 Vieilles Vignes  (Brouilly)', 0.39931391451266035),
 ('Château Taussin 2014 Premium  (Bordeaux)', 0.39754375425327115),
 ("Emile Beyer 2013 L'Hostellerie Pinot Gris (Alsace)", 0.39257398556680123),
 ('Antonin Rodet 2011 Château de Mercey  (Santenay)', 0.39225414809727677),
 ('Château Coutinel 2011 Red (Fronton)', 0.3916813309252866),
 ('Cave de Saumur 2014 Réserve des Vignerons  (Saumur)', 0.383444784144407)]

In [12]:
title = df['title'][0]
print(title)
recom_knn(title, dict_cleaned_descriptions, 10, 'hash')

Quinta dos Avidagos 2011 Avidagos Red (Douro)
Time to encode descriptions :  3.287447690963745


  


Time to compute cosinus similarities :  4.988306522369385


[('CARM 2005 Reserva Red (Douro)', 0.9772545497599154),
 ('Château Jeanguillon 2014  Bordeaux Supérieur', 0.9713237285143653),
 ('Columbia Crest 2009 Two Vines Pinot Grigio (Washington)',
  0.9607689228305228),
 ('Manciat-Poncet 2014 Les Crays  (Pouilly-Fuissé)', 0.9607689228305228),
 ('Wellington 2004 Estate Merlot (Sonoma Valley)', 0.9502621934663978),
 ('Quinta do Crasto 2012 Flor de Crasto Red (Douro)', 0.949157995752499),
 ('Château de Gaudou 2010 Grand Lignée Malbec-Merlot (Cahors)',
  0.9467292624062573),
 ('Château de Santenay 2013  Mercurey', 0.9435641951204965),
 ('Steininger 2011 Rosé Sekt Cabernet Sauvignon (Österreichischer Sekt)',
  0.9435641951204965),
 ('Canard-Duchêne NV Charles VII Grande Cuvée de la Rosé Brut  (Champagne)',
  0.9433700705169153)]

## 1.2. Clustering des vins en prenant en compte l'ensemble des informations

In [13]:
def clustering_wines(data, clean_dict_wines, model='tfidf', nb_clusters=10):
    if model == 'one-hot':
        dict_encoding, encoding = one_hot_encoding(clean_dict_wines)
        a = time.time()
        pca_subs = TruncatedSVD(n_components=50)
        reduced_encoding = pd.DataFrame(pca_subs.fit_transform(encoding))
        b = time.time()
        pca_time = b-a
    elif model == 'tfidf':
        a = time.time()
        dict_encoding, encoding = tf_idf(clean_dict_wines)
        pca_subs = TruncatedSVD(n_components=50)
        reduced_encoding = pd.DataFrame(pca_subs.fit_transform(encoding))
        b = time.time()
        pca_time = b-a
    elif model == 'hash':
        dict_encoding, encoding = hachage(clean_dict_wines)
        a = time.time()
        reduced_encoding = pd.DataFrame(encoding.todense())
        b = time.time()
        pca_time = b-a
    else:
        print("Model can be 'one-hot', 'tfidf', or 'hash' only")
        return None
    c = time.time()
    df_clustering = pd.concat([df["points"], df["price"], pd.get_dummies(df["country"]), pd.get_dummies(df["province"]), pd.get_dummies(df["variety"]), reduced_encoding], axis = 1)
    clustering = skKMeans(n_clusters=nb_clusters)
    clustered = clustering.fit_predict(df_clustering)
    df_final = pd.concat([df["title"], pd.DataFrame(clustered, columns=['cluster'])], axis=1)  
    d = time.time()
    print("Time for clustering: ", d-c+pca_time)
    return df_final        

In [14]:
df_final = clustering_wines(df, dict_cleaned_descriptions, model='one-hot', nb_clusters=10)
df_final['cluster'].value_counts()

Time to encode descriptions :  3.6558380126953125
Time for clustering:  155.63385343551636


0    46515
7    31201
2    20922
8     8960
4     2199
1      559
5      179
9       38
6        6
3        3
Name: cluster, dtype: int64

In [15]:
df_final = clustering_wines(df, dict_cleaned_descriptions, model='tfidf', nb_clusters=10)
df_final['cluster'].value_counts()

Time to encode descriptions :  4.033212661743164
Time for clustering:  151.19664549827576


0    48724
9    34472
2    18676
5     6142
1     1921
6      438
3      162
8       37
4        7
7        3
Name: cluster, dtype: int64

In [16]:
df_final = clustering_wines(df, dict_cleaned_descriptions, model='hash', nb_clusters=10)
df_final['cluster'].value_counts()

Time to encode descriptions :  3.962859630584717
Time for clustering:  158.92983031272888


1    48168
5    33748
8    18564
0     7400
4     2054
6      439
2      163
7       37
9        6
3        3
Name: cluster, dtype: int64

# 2. Deuxième méthode : avec Spark

In [17]:
findspark.init("C:/spark/spark-2.3.2-bin-hadoop2.7")
sc=pyspark.SparkContext("local[*]","Project")
sqlc=SQLContext(sc) 

In [18]:
a = time.time()

sdf = sqlc.read.csv(PATH_TO_DATA + "winemag-data-130k-v2.csv", header=True)

todrop = ['designation', 'region_1', 'region_2', 'taster_name', 'taster_twitter_handle']

for c in todrop:
    sdf = sdf.drop(c)
    
sdf = sdf.na.drop()
sdf = sdf.dropDuplicates(['title'])
sdf.count()

b = time.time()

print("Time to extract data:", b-a)

Time to extract data: 23.030930757522583


## 2.1. Recommandation basée sur des vins ayant une description similaire

### 2.1.1. Traitement du texte

In [19]:
def stem(liste):
    stemmer = SnowballStemmer("english")
    l2 = []
    for i in range(len(liste)):
        l2.append(stemmer.stem(liste[i]))
    return l2

def clean_txt(df):
    a = time.time()
    regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W")
    df = regexTokenizer.transform(df)
    df = df.drop('description')
    remover = StopWordsRemover(inputCol="words", outputCol="desc")
    df = remover.transform(df)
    df = df.drop('words')
    fct = udf(lambda x: stem(x), ArrayType(StringType()))
    df = df.withColumn("description", fct("desc"))
    df = df.drop('desc')
    b = time.time()
    print("Time to clean the descriptions : ", b-a)
    return df
    

In [20]:
sdfc = clean_txt(sdf)

Time to clean the descriptions :  1.8364968299865723


### 2.1.2. Vectorisation des descriptions

In [21]:
def tfidf(df):
    a = time.time()
    hashingTF = HashingTF(inputCol="description", outputCol="rawFeatures", numFeatures=10)
    df = hashingTF.transform(df)
    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(df)
    df = idfModel.transform(df)
    df = df.drop('description')
    df = df.drop('rawFeatures')
    b = time.time()
    print("Time to encode the descriptions : ", b-a)
    return df

def word2vec(df):
    a = time.time()
    word2Vec = Word2Vec(vectorSize=10, minCount=0, inputCol="description", outputCol="features")
    model = word2Vec.fit(df)
    df = model.transform(df)
    df = df.drop('description')
    b = time.time()
    print("Time to encode the descripions : ", b-a)
    return df
    

In [22]:
sdf_tfidf = tfidf(sdfc)

Time to encode the descriptions :  285.72354221343994


In [23]:
sdf_w2v = word2vec(sdfc)

Time to encode the descripions :  599.2327582836151


### 2.1.3. Recommandations

In [28]:
def score(v1, v2):
    cos_sim = np.dot(v1, v2) / (np.linalg.norm(v1)*np.linalg.norm(v2))
    return cos_sim

def reco_spark(wine_name, sdf, k=10, model='tfidf'):
    a = time.time()
    try:
        sdf.createTempView(model)
        df = sqlc.sql("SELECT title, features FROM " + model).toPandas()
    except:
        df = sqlc.sql("SELECT title, features FROM " + model).toPandas()
    b = time.time()
    print("Time to select the data :", b-a)
    c  = time.time()
    l = zip(list(df['title']), list(df['features']))
    rdd = sc.parallelize(l)
    wine_encoding = list(df[df['title'] == wine_name].features)[0]
    rdd1 = rdd.map(lambda s: (s[0], score(s[1], wine_encoding)))
    d  = time.time()
    print("Time for similiraty calculations :", d-c)
    rdd2 = rdd1.sortBy(lambda x: -x[1])
    e = time.time()
    print("Time to sort results :", e-d)
    final = rdd2.take(k)
    f = time.time()
    print("Time to collect results :", f-e)
    return final

In [29]:
wine = 'Quinta dos Avidagos 2011 Avidagos Red (Douro)'
reco_spark(wine, sdf_tfidf, k=10, model='tfidf')

Time to select the data : 287.59312176704407
Time for similiraty calculations : 0.6919095516204834
Time to sort results : 70.39353370666504
Time to collect results : 34.79569673538208


[('Quinta dos Avidagos 2011 Avidagos Red (Douro)', 0.9999999999999999),
 ('Edmeades 2011 Zinfandel (Mendocino County)', 0.9643488391799788),
 ('Winzerkeller Andau 2013 St. Laurent (Burgenland)', 0.9622234035643586),
 ('Manuel Olivier 2011  Bourgogne', 0.9608204941230747),
 ('Casal do Conde 2012 Quinta da Arrancosa Moscatel Graúdo (Tejo)',
  0.9591004951560843),
 ('F X Pichler 2006 Von de Terrassen Federspiel Riesling (Wachau)',
  0.9590022053156014),
 ('Maryhill 2009 Cabernet Sauvignon (Columbia Valley (WA))',
  0.956376213374129),
 ('Apsara 2014 Kick Ranch Sauvignon Blanc (Sonoma County)',
  0.9527419069308315),
 ('Harlow Ridge 2011 Chardonnay (Lodi)', 0.9520437122175613),
 ('Quinta do Casal Branco 2015 Tinto Red (Tejo)', 0.9508761648408962)]

In [30]:
wine = 'Quinta dos Avidagos 2011 Avidagos Red (Douro)'
reco_spark(wine, sdf_w2v, k=10, model='w2v')

Time to select the data : 230.6880075931549
Time for similiraty calculations : 0.3878154754638672
Time to sort results : 8.374927282333374
Time to collect results : 5.355997800827026


[('Quinta dos Avidagos 2011 Avidagos Red (Douro)', 1.0),
 ('Wines & Winemakers 2011 Companhia das Lezírias Catapereiro Red (Tejo)',
  0.9959854052114829),
 ("Poças 2013 Coroa d'Ouro Red (Douro)", 0.9938831789197227),
 ('Château du Perier 2011  Médoc', 0.993788320104824),
 ('Château Lestage 2013  Listrac-Médoc', 0.9931340478901978),
 ('Antonin Rodet 2011 Château de Mercey  (Santenay)', 0.99267374176346),
 ('Cave du Marmandais 2014 Château Campot Lafont  (Bordeaux)',
  0.9924307669793289),
 ('Château Peneau 2015 Cuvée Tradition  (Côtes de Bordeaux)',
  0.9922441143723999),
 ('Château Beauséjour 2014  Fronsac', 0.9915758324828349),
 ('Louis Max 2014 Domaine la Marche  (Mercurey)', 0.9914636322257657)]

## 2.2. Clustering des vins

In [31]:
def encode_categorical_features(sdf):
    a = time.time()
    sdf = sdf.drop('winery')
    sdf = sdf.drop('title')
    sdf = sdf.drop('_c0')
    to_encode = ['country', 'province', 'variety']
    for cat in to_encode:
        stringIndexer = StringIndexer(inputCol=cat, outputCol=cat + "Index")
        model = stringIndexer.fit(sdf)
        indexed = model.transform(sdf)
        encoder = OneHotEncoder(inputCol=cat + "Index", outputCol=cat + "Vec")
        sdf = encoder.transform(indexed)
        sdf = sdf.drop(cat)
        sdf = sdf.drop(cat+'Index')
    sdf = sdf.withColumn("points", sdf["points"].cast(FloatType()))
    sdf = sdf.withColumn("price", sdf["price"].cast(FloatType()))
    sdf = sdf.na.drop()
    assembler = VectorAssembler(inputCols=["provinceVec", "countryVec", "varietyVec", "features", "points", "price"], outputCol="feat")
    output = assembler.transform(sdf)
    for cat in ["provinceVec", "countryVec", "varietyVec", "features", "points", "price"]:
        output = output.drop(cat)
    b = time.time()
    print("Time to prepare the dataset for clustering : ", b-a)
    return output

In [32]:
sdf_enc_tfidf = encode_categorical_features(sdf_tfidf)

Time to prepare the dataset for clustering :  22.97869563102722


In [33]:
sdf_enc_w2v = encode_categorical_features(sdf_w2v)

Time to prepare the dataset for clustering :  20.17354416847229


In [34]:
def clustering(sdf):
    a = time.time()
    kmeans = pysparkKMeans(featuresCol='feat', predictionCol='prediction', k=10)
    model = kmeans.fit(sdf)
    predictions = model.transform(sdf)
    b = time.time()
    print("Time for clustering : ", b-a)
    return predictions

In [35]:
predictions_tfidf = clustering(sdf_enc_tfidf)
predictions_tfidf.groupBy('prediction').count().show()

Time for clustering :  280.6621174812317
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    7|
|         6|  140|
|         3|32840|
|         5| 1281|
|         9|12276|
|         4|  350|
|         8| 4039|
|         7|  120|
|         2|   33|
|         0|59471|
+----------+-----+



In [36]:
predictions_w2v = clustering(sdf_enc_w2v)
predictions_w2v.groupBy('prediction').count().show()

Time for clustering :  273.58593583106995
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    7|
|         6|  140|
|         3|32840|
|         5| 1281|
|         9|12276|
|         4|  350|
|         8| 4039|
|         7|  120|
|         2|   33|
|         0|59471|
+----------+-----+

