In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.2-bin-hadoop2.7"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("Python Spark Stage four") \
    .getOrCreate()
music_data = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz'
musics= spark.read.csv(music_data,header=True,sep='\t')

In [None]:
import nltk
nltk.download('punkt')

In [None]:
from pyspark.sql.functions import *
from nltk.tokenize import sent_tokenize
def sent_token(s):
    sent_list = sent_tokenize(str(s))
    return sent_list
seg = udf(lambda s: sent_token(s), ArrayType(StringType()))
musics = musics.withColumn('sentences',seg(musics.review_body).alias('seg'))
musics.show()
musics_review = musics.select(musics.review_id, musics.star_rating, explode(musics.sentences).alias("sentence"))

In [None]:
musics_review = musics_review.filter(length(musics_review.sentence)>1)
end = musics_review.count()
musics_review.show()

In [None]:
musicsP=musics_review.filter(musics.star_rating>=4)
musicsN=musics_review.filter(musics.star_rating<=2)

In [None]:
musicsP= musicsP.withColumn("id", monotonically_increasing_id())
musicsN = musicsN.withColumn("id", monotonically_increasing_id())

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsDataP = tokenizer.transform(musicsP)

word2Vec = Word2Vec(vectorSize=300, minCount=0, inputCol="words", outputCol="result")
modelP = word2Vec.fit(wordsDataP)
resultP = modelP.transform(wordsDataP)
wordsDataN = tokenizer.transform(musicsN)

modelN = word2Vec.fit(wordsDataN)
resultN = modelN.transform(wordsDataN)

In [None]:
review_embedding_P = resultP.select('result')
review_embedding_N = resultN.select('result')

In [None]:
list_P = review_embedding_P.collect()
list_N = review_embedding_N.collect()

In [None]:
import numpy as np
list_P = np.array(list_P)
list_N = np.array(list_N)

In [None]:
list_P=list_P.reshape(-1,300)
list_N=list_N.reshape(-1,300)

In [None]:
def calculate_similarity(rew1,rew2):
    mul=np.dot(rew1,rew2)
    norm=np.linalg.norm(rew1)*np.linalg.norm(rew2)
    return mul/norm
    

In [None]:
def min_sim(List):
    for i in range(0,len(List)):
        sumOfsim = 0
        for j in range(0,len(List)):
            sumOfsim = sumOfsim + (1 - calculate_similarity(List[i],List[j]))        
        if (i == 0 ):
            min_sum = sumOfsim
            index = 0
        else:
            if (sumOfsim < min_sum):
                min_sum = sumOfsim
                index = i
    return min_sum , index

In [None]:
min_P,index_P = min_sim(list_P)
min_N,index_N = min_sim(list_N)

In [None]:
print(min_P,index_P)
print(min_N,index_N)

In [None]:
def TenNeighbors(List , index):
    list = []
    for i in range(len(List)):
        sim = (1-calculate_similarity(List[i],List[index]))
        list.append(sim)
    indexes = np.argsort(list)
    return indexes[:11]

In [None]:
indexes_P = TenNeighbors(list_P , index_P)
indexes_N = TenNeighbors(list_N , index_N)
print(indexes_P,indexes_N)

In [None]:
indexes_P = indexes_P.tolist()
indexes_N = indexes_N.tolist()

In [None]:
musicsP.select('review_id','sentence').filter(musicsP.id.isin(indexes_P)).collect()

In [None]:
musicsN.select('review_id','sentence').filter(musicsN.id.isin(indexes_N)).collect()