# `CORPUS NY TIMES`

In [299]:
from time import sleep
import os
import time
from pathlib import Path
import pandas as pd
import numpy as np
import pyspark 
import pyspark.sql.functions as fn
from pyspark.sql.functions import window, column, col
from pyspark.sql.functions import expr
from pyspark.sql.functions import approx_count_distinct
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.catalog import Catalog
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import RegexTokenizer
import spacy
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer
from pyspark.ml import Estimator 
from pyspark.ml import Transformer
from pyspark.ml.feature import MinHashLSH 
from pyspark.ml.feature import MinHashLSHModel
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.feature import BucketedRandomProjectionLSHModel
from pyspark.ml.evaluation import Evaluator
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors

In [54]:
conf = SparkConf().setAppName("Spark")
sc = SparkContext(conf=conf)

spark = (SparkSession
    .builder
    .appName("Spark")
    .getOrCreate()
)

# Loading as a Spark DataFrame

In [55]:
df = spark.read.json("./JSON/")

In [5]:
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- title: string (nullable = true)



In [6]:
df.show(10)

+--------------------+--------+-------------------+--------------------+
|                text|   topic|               date|               title|
+--------------------+--------+-------------------+--------------------+
|
In the spring of...|magazine|2016-06-12 00:00:00|choosing-a-school...|
|
Picture him as a...|magazine|2016-05-08 00:00:00|the-aspiring-nove...|
|
For the last hou...|magazine|2016-05-22 00:00:00|doctors-with-enem...|
|
Last November, M...|magazine|2016-05-08 00:00:00|should-prostituti...|
|
Nakuru is a lake...|magazine|2016-06-26 00:00:00|international-cri...|
|
‘Have you seen t...|magazine|2016-05-22 00:00:00|donald-trump-prim...|
|
On the morning o...|magazine|2016-05-29 00:00:00|behind-the-barric...|
|
Every week or so...|magazine|2016-06-26 00:00:00|will-trump-swallo...|
|
Hillary Clinton ...|magazine|2016-04-24 00:00:00|how-hillary-clint...|
|
One night in ear...|magazine|2016-06-19 00:00:00|can-netflix-survi...|
+--------------------+--------+-------------------+

## Queries

### - Average length (number of tokens) per post

In [99]:
df_nb_tokens_post = df.withColumn('nb_tokens_post', fn.size(fn.split(fn.col('text'),' ')))
df_nb_tokens_post.show(10)

+--------------------+--------+-------------------+--------------------+--------------+
|                text|   topic|               date|               title|nb_tokens_post|
+--------------------+--------+-------------------+--------------------+--------------+
|
In the spring of...|magazine|2016-06-12 00:00:00|choosing-a-school...|          9668|
|
Picture him as a...|magazine|2016-05-08 00:00:00|the-aspiring-nove...|          9181|
|
For the last hou...|magazine|2016-05-22 00:00:00|doctors-with-enem...|          8181|
|
Last November, M...|magazine|2016-05-08 00:00:00|should-prostituti...|          7436|
|
Nakuru is a lake...|magazine|2016-06-26 00:00:00|international-cri...|          7223|
|
‘Have you seen t...|magazine|2016-05-22 00:00:00|donald-trump-prim...|          6863|
|
On the morning o...|magazine|2016-05-29 00:00:00|behind-the-barric...|          6954|
|
Every week or so...|magazine|2016-06-26 00:00:00|will-trump-swallo...|          6533|
|
Hillary Clinton ...|magazine|2

In [420]:
df_nb_tokens_post = df.select("text").withColumn('nb_tokens_post', fn.size(fn.split(fn.col('text'),' ')))                 
df_nb_tokens_post.show(10)

+--------------------+--------------+
|                text|nb_tokens_post|
+--------------------+--------------+
|
In the spring of...|          9668|
|
Picture him as a...|          9181|
|
For the last hou...|          8181|
|
Last November, M...|          7436|
|
Nakuru is a lake...|          7223|
|
‘Have you seen t...|          6863|
|
On the morning o...|          6954|
|
Every week or so...|          6533|
|
Hillary Clinton ...|          6490|
|
One night in ear...|          6542|
+--------------------+--------------+
only showing top 10 rows



In [427]:
df_avg_tokens = df_text.agg({"nb_tokens_post": "avg"})
df_avg_tokens.show()

+-------------------+
|avg(nb_tokens_post)|
+-------------------+
|  798.4488287468729|
+-------------------+



### - Average number of Spark stopwords per post 

In [425]:
regexTokenizer = RegexTokenizer()\
        .setInputCol('text')\
        .setOutputCol('text_tokenized')\
        .setPattern(" ")\
        .setToLowercase(True)
df_tokenized = regexTokenizer.transform(df_nb_tokens_post)
df_tokenized.show(5)

+--------------------+--------------+--------------------+
|                text|nb_tokens_post|      text_tokenized|
+--------------------+--------------+--------------------+
|
In the spring of...|          9668|[
in, the, spring...|
|
Picture him as a...|          9181|[
picture, him, a...|
|
For the last hou...|          8181|[
for, the, last,...|
|
Last November, M...|          7436|[
last, november,...|
|
Nakuru is a lake...|          7223|[
nakuru, is, a, ...|
+--------------------+--------------+--------------------+
only showing top 5 rows



In [426]:
stopwords = StopWordsRemover.loadDefaultStopWords('english')
remover = StopWordsRemover()\
            .setStopWords(stopwords)\
            .setInputCol('text_tokenized')\
            .setOutputCol('filtered')
df_filtered = remover.transform(df_tokenized)
df_filtered.show(5)

+--------------------+--------------+--------------------+--------------------+
|                text|nb_tokens_post|      text_tokenized|            filtered|
+--------------------+--------------+--------------------+--------------------+
|
In the spring of...|          9668|[
in, the, spring...|[
in, spring, 201...|
|
Picture him as a...|          9181|[
picture, him, a...|[
picture, young,...|
|
For the last hou...|          8181|[
for, the, last,...|[
for, last, hour...|
|
Last November, M...|          7436|[
last, november,...|[
last, november,...|
|
Nakuru is a lake...|          7223|[
nakuru, is, a, ...|[
nakuru, lakesid...|
+--------------------+--------------+--------------------+--------------------+
only showing top 5 rows



In [424]:
df_stop_post = df_filtered\
        .withColumn('nb_stopwords',fn.col('nb_tokens_post') - fn.size(fn.col('filtered')))\
        .select(['text','nb_tokens_post','nb_stopwords'])
df_stop_post.show(5)

+--------------------+--------------+------------+
|                text|nb_tokens_post|nb_stopwords|
+--------------------+--------------+------------+
|
In the spring of...|          9668|        4063|
|
Picture him as a...|          9181|        4146|
|
For the last hou...|          8181|        3536|
|
Last November, M...|          7436|        2999|
|
Nakuru is a lake...|          7223|        3065|
+--------------------+--------------+------------+
only showing top 5 rows



In [429]:
df_avg_stop = df_stop_post.agg({"nb_stopwords": "avg"})
df_avg_stop.show()

+------------------+
| avg(nb_stopwords)|
+------------------+
|325.97725722083237|
+------------------+



### - Number of post per days

In [139]:
df_post_day = df.groupBy('date').count()
df_post_day.show(10)

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2016-05-01 00:00:00|  176|
|2016-05-19 00:00:00|  145|
|2016-05-03 00:00:00|  109|
|2016-06-10 00:00:00|  130|
|2016-05-14 00:00:00|   96|
|2016-04-17 00:00:00|    1|
|2016-05-04 00:00:00|  128|
|2016-05-05 00:00:00|  149|
|2016-06-17 00:00:00|  115|
|2016-06-14 00:00:00|  112|
+-------------------+-----+
only showing top 10 rows



### - Number of post per topics and subtopics

In [141]:
df_post_topic = df.groupBy('topic').count()
df_post_topic.show(10)

+--------------------+-----+
|               topic|count|
+--------------------+-----+
|              travel|  140|
| sports/horse-racing|   31|
|t-magazine/entert...|   15|
|       sports/hockey|  119|
|       sports/soccer|  180|
|sports/ncaabasket...|    8|
|                  us|  585|
|  arts/international|   33|
|     sports/olympics|   49|
|     arts/television|  126|
+--------------------+-----+
only showing top 10 rows



# Loading as Spark Streaming DataFrame

One file per trigger

In [431]:
spark.conf.set("spark.sql.shuffle.partitions",5)
df_streaming = spark.readStream.schema(df.schema).option("maxFilesPerTrigger",1).json("./JSON/")

In [432]:
type(df_streaming)

pyspark.sql.dataframe.DataFrame

## Streaming Queries

### - Average length (number of tokens) per post

In [439]:
nb_tokens_post_query = df_streaming\
.withColumn('nb_tokens', fn.size(fn.split(fn.col('text'),' ')))\
.writeStream\
.queryName("nb_tokens_post")\
.format("memory")\
.outputMode("append")\
.start()

In [436]:
for x in range(5):
    spark.sql("SELECT * FROM nb_tokens_post")\
    .show(10) 
    sleep(1)

+--------------------+--------+-------------------+--------------------+---------+
|                text|   topic|               date|               title|nb_tokens|
+--------------------+--------+-------------------+--------------------+---------+
|
How does a young...|  movies|2016-05-29 00:00:00|the-fits-its-all-...|     1055|
|
Humans may be qu...|nyregion|2016-05-29 00:00:00|time-for-horsesho...|      541|
+--------------------+--------+-------------------+--------------------+---------+

+--------------------+--------+-------------------+--------------------+---------+
|                text|   topic|               date|               title|nb_tokens|
+--------------------+--------+-------------------+--------------------+---------+
|
How does a young...|  movies|2016-05-29 00:00:00|the-fits-its-all-...|     1055|
|
Humans may be qu...|nyregion|2016-05-29 00:00:00|time-for-horsesho...|      541|
|
SAN FRANCISCO — ...|  movies|2016-05-29 00:00:00|duncan-jones-bowi...|     1552|
+--

In [440]:
nb_tokens_post_query.stop()

### - Average number of stopwords (Spark stopwords) per post

In [109]:
nb_stopwords_post_query = remover\
    .transform(regexTokenizer.transform(df_streaming.withColumn('nb_tokens', fn.size(fn.split(fn.col('text'),' ')))))\
    .withColumn('nb_stop',fn.col('nb_tokens') - fn.size(fn.col('filtered')))\
    .select(['text','topic','date','title','nb_tokens','nb_stop'])\
    .withColumn('nb_tokens', fn.size(fn.split(fn.col('text'),' ')))\
    .writeStream\
    .queryName("nb_stopwords")\
    .format("memory")\
    .outputMode("append")\
    .start()           
               

In [110]:
for x in range(5):
    spark.sql("SELECT * FROM nb_stopwords")\
    .show(10) 
    sleep(1)

+--------------------+----------------+-------------------+--------------------+---------+-------+
|                text|           topic|               date|               title|nb_tokens|nb_stop|
+--------------------+----------------+-------------------+--------------------+---------+-------+
|
How does a young...|          movies|2016-05-29 00:00:00|the-fits-its-all-...|     1055|    439|
|
Humans may be qu...|        nyregion|2016-05-29 00:00:00|time-for-horsesho...|      541|    214|
|
SAN FRANCISCO — ...|          movies|2016-05-29 00:00:00|duncan-jones-bowi...|     1552|    650|
|
Kamala Harris wa...|        magazine|2016-05-29 00:00:00|kamala-harris-a-t...|     3849|   1572|
|
Brittany Anne Ca...|fashion/weddings|2016-05-29 00:00:00|brittany-campbell...|      232|     89|
|
The title track ...|      arts/music|2016-05-29 00:00:00|ariana-grande-fif...|      794|    301|
|
One of the lates...|        business|2016-05-29 00:00:00|a-worrisome-pileu...|      954|    371|
|
PARIS — 

In [111]:
nb_stopwords_post_query.stop()

### - Number of post per days

In [15]:
post_day = df_streaming.groupby('date').count()

In [16]:
post_day_query = post_day.writeStream\
    .queryName("post_day")\
    .format("memory")\
    .outputMode("complete")\
    .start()

In [18]:
for i in range(5):
    spark.sql("SELECT * FROM post_day").show(10)
    sleep(1)

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2016-05-29 00:00:00|   23|
+-------------------+-----+

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2016-05-29 00:00:00|   24|
+-------------------+-----+

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2016-05-29 00:00:00|   25|
+-------------------+-----+

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2016-05-29 00:00:00|   26|
+-------------------+-----+

+-------------------+-----+
|               date|count|
+-------------------+-----+
|2016-05-29 00:00:00|   27|
+-------------------+-----+



In [21]:
post_day_query.stop()

### - Number of post per topics and subtopics

In [10]:
post_topic = df_streaming.groupby('topic').count()

In [11]:
post_topic_query = post_topic.writeStream\
    .queryName("post_topic")\
    .format("memory")\
    .outputMode("complete")\
    .start()

In [12]:
for i in range(5):
    spark.sql("SELECT * FROM post_topic").show(10)
    sleep(1)

+--------+-----+
|   topic|count|
+--------+-----+
|nyregion|    1|
|  movies|    1|
+--------+-----+

+--------+-----+
|   topic|count|
+--------+-----+
|nyregion|    1|
|  movies|    2|
+--------+-----+

+--------+-----+
|   topic|count|
+--------+-----+
|nyregion|    1|
|  movies|    2|
|magazine|    1|
+--------+-----+

+----------------+-----+
|           topic|count|
+----------------+-----+
|        nyregion|    1|
|fashion/weddings|    1|
|          movies|    2|
|        magazine|    1|
+----------------+-----+

+----------------+-----+
|           topic|count|
+----------------+-----+
|        nyregion|    1|
|fashion/weddings|    1|
|      arts/music|    1|
|          movies|    2|
|        magazine|    1|
+----------------+-----+



In [14]:
post_topic_query.stop()

# Loading as Pandas DataFrame

In [8]:
df_pd = df.toPandas()

In [10]:
df_pd

Unnamed: 0,text,topic,date,title
0,"\nIn the spring of 2014, when our daughter, Na...",magazine,2016-06-12,choosing-a-school-for-my-daughter-in-a-segrega...
1,"\nPicture him as a young man, standing on the ...",magazine,2016-05-08,the-aspiring-novelist-who-became-obamas-foreig...
2,"\nFor the last hour, the American gunship had ...",magazine,2016-05-22,doctors-with-enemies-did-afghan-forces-target-...
3,"\nLast November, Meg Muñoz went to Los Angeles...",magazine,2016-05-08,should-prostitution-be-a-crime.html
4,\nNakuru is a lakeside city in Kenya’s Rift Va...,magazine,2016-06-26,international-criminal-court-moreno-ocampo-the...
...,...,...,...,...
8789,"\nCheeky twists on an old classic, straight fr...",t-magazine/fashion,2016-05-09,fashion-nautical-stripes.html
8790,\n\n,world/middleeast,2016-05-03,submit-your-questions-about-syria-for-declan-w...
8791,\n\n,arts/design,2016-04-18,labor-protesters-to-resume-guggenheim-demonstr...
8792,\n\n,arts/design,2016-05-28,court-denies-extradition-request-for-spaniard-...


In [9]:
df_pd.text = df_pd.text.apply(lambda x : x.replace('\n',''))
df_pd.title = df_pd.title.apply(lambda x : x.replace('-',' ').replace('.html',''))
df_pd.date = pd.to_datetime(df_pd.date)
df_pd

Unnamed: 0,text,topic,date,title
0,"In the spring of 2014, when our daughter, Najy...",magazine,2016-06-12,choosing a school for my daughter in a segrega...
1,"Picture him as a young man, standing on the wa...",magazine,2016-05-08,the aspiring novelist who became obamas foreig...
2,"For the last hour, the American gunship had be...",magazine,2016-05-22,doctors with enemies did afghan forces target ...
3,"Last November, Meg Muñoz went to Los Angeles t...",magazine,2016-05-08,should prostitution be a crime
4,Nakuru is a lakeside city in Kenya’s Rift Vall...,magazine,2016-06-26,international criminal court moreno ocampo the...
...,...,...,...,...
8789,"Cheeky twists on an old classic, straight from...",t-magazine/fashion,2016-05-09,fashion nautical stripes
8790,,world/middleeast,2016-05-03,submit your questions about syria for declan w...
8791,,arts/design,2016-04-18,labor protesters to resume guggenheim demonstr...
8792,,arts/design,2016-05-28,court denies extradition request for spaniard ...


## Annotate the corpus

In [4]:
nlp = spacy.load('en_core_web_sm')

In [82]:
def noun_nent_verb(row):
    doc = nlp(row) 
    list_nnv =[]
    for token in doc:  
        if token.pos_ in ['NOUN','NENT','VERB']:
                list_nnv.append(token.text)
    return list_nnv

In [83]:
df_pd['token'] = df_pd['text'].apply(lambda row : noun_nent_verb(row))

In [88]:
df_pd.head(5)

Unnamed: 0,text,topic,date,title,token
0,"In the spring of 2014, when our daughter, Najy...",magazine,2016-06-12,choosing a school for my daughter in a segrega...,"[spring, daughter, turning, husband, found, fa..."
1,"Picture him as a young man, standing on the wa...",magazine,2016-05-08,the aspiring novelist who became obamas foreig...,"[Picture, man, standing, waterfront, polling, ..."
2,"For the last hour, the American gunship had be...",magazine,2016-05-22,doctors with enemies did afghan forces target ...,"[hour, gunship, circling, city, observing, tar..."
3,"Last November, Meg Muñoz went to Los Angeles t...",magazine,2016-05-08,should prostitution be a crime,"[went, speak, conference, months, meeting, att..."
4,Nakuru is a lakeside city in Kenya’s Rift Vall...,magazine,2016-06-26,international criminal court moreno ocampo the...,"[lakeside, city, destination, tourists, part, ..."


In [89]:
df_pd.to_csv('./data_ny.csv', index=True, header=True)

# Build a Spark dataframe with annotations

- One row per post topic
- Columns : topic, title, date, text, Sparse vector with entries denoting the indices of tokens in the vocabulary Spacy's Vocab) for tokens tagged as NOUNs, VERBs and NENTs (Named Entities) and their number of occurrences

In [90]:
df_ann = spark.read\
             .option("header", "true")\
             .format('csv')\
             .load("./data_ny.csv")

In [93]:
df_ann.show(5)

+---+--------------------+--------+----------+--------------------+--------------------+
|_c0|                text|   topic|      date|               title|               token|
+---+--------------------+--------+----------+--------------------+--------------------+
|  0|In the spring of ...|magazine|2016-06-12|choosing a school...|['spring', 'daugh...|
|  1|Picture him as a ...|magazine|2016-05-08|the aspiring nove...|['Picture', 'man'...|
|  2|For the last hour...|magazine|2016-05-22|doctors with enem...|['hour', 'gunship...|
|  3|Last November, Me...|magazine|2016-05-08|should prostituti...|['went', 'speak',...|
|  4|Nakuru is a lakes...|magazine|2016-06-26|international cri...|['lakeside', 'cit...|
+---+--------------------+--------+----------+--------------------+--------------------+
only showing top 5 rows



In [94]:
tokenizer = Tokenizer()\
        .setInputCol('token')\
        .setOutputCol('tokenized')
tokenized = tokenizer.transform(df_ann).select(['_c0','text','topic','date','title', 'tokenized'])

In [96]:
tokenized.show(5)

+---+--------------------+--------+----------+--------------------+--------------------+
|_c0|                text|   topic|      date|               title|           tokenized|
+---+--------------------+--------+----------+--------------------+--------------------+
|  0|In the spring of ...|magazine|2016-06-12|choosing a school...|[['spring',, 'dau...|
|  1|Picture him as a ...|magazine|2016-05-08|the aspiring nove...|[['picture',, 'ma...|
|  2|For the last hour...|magazine|2016-05-22|doctors with enem...|[['hour',, 'gunsh...|
|  3|Last November, Me...|magazine|2016-05-08|should prostituti...|[['went',, 'speak...|
|  4|Nakuru is a lakes...|magazine|2016-06-26|international cri...|[['lakeside',, 'c...|
+---+--------------------+--------+----------+--------------------+--------------------+
only showing top 5 rows



In [182]:
cv = CountVectorizer()\
    .setInputCol("tokenized")\
    .setOutputCol("countVec")\
    .setVocabSize(262144)\
    .setMinTF(1)\
    .setMinDF(2)

fittedCV = cv.fit(tokenized)
df_cv = fittedCV.transform(tokenized)
df_cv.show(5)

+---+--------------------+--------+----------+--------------------+--------------------+--------------------+
|_c0|                text|   topic|      date|               title|           tokenized|            countVec|
+---+--------------------+--------+----------+--------------------+--------------------+--------------------+
|  0|In the spring of ...|magazine|2016-06-12|choosing a school...|[['spring',, 'dau...|(33753,[0,1,2,3,4...|
|  1|Picture him as a ...|magazine|2016-05-08|the aspiring nove...|[['picture',, 'ma...|(33753,[0,1,2,3,4...|
|  2|For the last hour...|magazine|2016-05-22|doctors with enem...|[['hour',, 'gunsh...|(33753,[0,1,2,3,4...|
|  3|Last November, Me...|magazine|2016-05-08|should prostituti...|[['went',, 'speak...|(33753,[0,1,2,3,4...|
|  4|Nakuru is a lakes...|magazine|2016-06-26|international cri...|[['lakeside',, 'c...|(33753,[0,1,2,3,4...|
+---+--------------------+--------+----------+--------------------+--------------------+--------------------+
only showi

# LSH model

Random split of the corpus in a training set and testing set (the training set is used to train LSH models and the testing set is used to evaluate the LSH models).

In [129]:
df_train, df_test = df_cv.select('topic','text','title','date', 'countVec').randomSplit([0.75, 0.25])

## MinHash model

### Training

In [375]:
mh1 = MinHashLSH(inputCol="countVec", outputCol="hashes", seed=12345) #numHashTables = 1 par défaut
model_mh1 = mh1.fit(df_train)
df_mh1 = model1.transform(df_train)

In [None]:
key = df_train.filter("_c0 == 1")
model_mh1.approxNearestNeighbors(df_train, key, 1)

In [386]:
approxsim1 = model_mh1.approxSimilarityJoin(df_train, df_train, 0.8 , distCol="JaccardDistance")\
    .select(col("datasetA.countVec").alias("cvA"), col("datasetB.countVec").alias("cvB"),col("JaccardDistance"))

In [388]:
approxsim1.filter("JaccardDistance != 0").show()

In [381]:
mh2 = MinHashLSH(inputCol="countVec", outputCol="hashes", seed=12345, numHashTables=8)
model_mh2 = mh2.fit(df_train)
df_mh2 = model_mh2.transform(df_train)

In [383]:
model_mh2.approxNearestNeighbors(df_train, key, 1)

In [389]:
approxsim2 = model_mh2.approxSimilarityJoin(df_train, df_train, 0.8 , distCol="JaccardDistance")\
    .select(col("datasetA.countVec").alias("cvA"), col("datasetB.countVec").alias("cvB"),col("JaccardDistance"))

In [391]:
approxsim2.filter("JaccardDistance != 0").show()

### Evaluate

In [393]:
df_eval_mh1 = mh1.fit(df_test)\
    .transform(df_test)

In [395]:
evaluator = Evaluator()
evaluator.evaluate(df_eval_mh1)

In [396]:
df_eval_mh2 = mh2.fit(df_test)\
    .transform(df_test)

In [None]:
evaluator.evaluate(df_eval_mh2)

### With Pipeline

In [399]:
pipeline = Pipeline(stages=[tokenizer, cv, mh1])
pipelineFit = pipeline.fit(df_ann)
df_final = pipelineFit.transform(df_ann)

### Saving and loading

In [400]:
mh1Path = temp_path + "/mh1" 
mh1.write().overwrite().save(mh1Path) 
mh1bis = MinHashLSH.load(mh1Path) 
mh1bis.getOutputCol() == mh1.getOutputCol()

True

In [401]:
model1Path = temp_path + "/mh1-model"
model_mh1.save(model1Path)
model_mh1bis = MinHashLSHModel.load(model1Path) 
model_mh1.transform(df_train).head().hashes == model_mh1bis.transform(df_train).head().hashes

True

## BucketedRandomProjection

### Training

In [402]:
brp = BucketedRandomProjectionLSH( inputCol="countVec", outputCol="hashes", seed=12345, bucketLength=1.0)
isinstance(brp, Estimator) & isinstance(brp, BucketedRandomProjectionLSH)

True

In [404]:
model_brp = brp.fit(df_train)
df_brp = model_brp.transform(df_train)

In [405]:
type(model_brp)

pyspark.ml.feature.BucketedRandomProjectionLSHModel

In [407]:
model_brp.explainParams()

'bucketLength: the length of each hash bucket, a larger bucket lowers the false negative rate. (current: 1.0)\ninputCol: input column name (current: countVec)\nnumHashTables: number of hash tables, where increasing number of hash tables lowers the false negative rate, and decreasing it improves the running performance (default: 1)\noutputCol: output column name (default: BucketedRandomProjectionLSH_62598a18d510__output, current: hashes)'

In [412]:
model_brp.approxNearestNeighbors(df_train, key, 1)

In [410]:
approxsim3 = model_mh1.approxSimilarityJoin(df_train, df_train, 0.8 , distCol="JaccardDistance")\
    .select(col("datasetA.countVec").alias("cvA"), col("datasetB.countVec").alias("cvB"),col("JaccardDistance"))

In [413]:
approxsim3.filter("JaccardDistance != 0").show()

### With Pipeline

In [414]:
pipeline2 = Pipeline(stages=[tokenizer, cv, brp])
pipelineFit2 = pipeline.fit(df_ann)
df_final2 = pipelineFit.transform(df_ann)

### Saving and loading

In [415]:
temp_path = os.getcwd()
brpPath = temp_path + "/brp" 
brp.write().overwrite().save(brpPath)
brp2 = BucketedRandomProjectionLSH.load(brpPath) 
brp2.getBucketLength() == brp.getBucketLength()

True

In [417]:
modelPath = temp_path + "/brp-model" 
model_brp.write().overwrite().save(modelPath)
modelSaved = BucketedRandomProjectionLSHModel.load(modelPath) 
modelSaved.transform(df_train).head().hashes == modelSaved.transform(df_train).head().hashes

True

In [53]:
spark.stop()