In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz  
!tar xf spark-2.3.0-bin-hadoop2.7.tgz  
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.0-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
import itertools
import pyspark
import sys
import time
import json
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions  import date_format
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import LDA, LDAModel, LocalLDAModel
from wordcloud import WordCloud


spark = SparkSession \
    .builder \
    .appName("Content Module") \
    .getOrCreate()

sqlContext = SQLContext(spark)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
class HybridRecommender:
  this.maxCount=maxCount

  output_path='/content/drive/MyDrive/'
  model_path='/content/drive/MyDrive/'

  def inputDataLoading(self):
    /**



    */
    #dataset loading

    #user Dataframe Loading
    user_df=spark.read.json('/content/drive/MyDrive/user.json')
    user_avg_dict = spark.sparkContext.textFile('/content/drive/MyDrive/user_avg.json').persist().map(lambda x:json.loads(x)).take(1)
    user_avg_dict= list(map(list, user_avg_dict[0].items()))
    user_avg_df = spark.createDataFrame(user_avg_dict, ["user_id", "average_stars"])
    user_df=user_df.join(user_avg_df,on='user_id',how='inner')

    #business Dataframe loading
    business_df=spark.read.json('/content/drive/MyDrive/business.json')
    business_avg_dict = spark.sparkContext.textFile('/content/drive/MyDrive/business_avg.json').persist().map(lambda x:json.loads(x)).take(1)
    business_avg_dict= list(map(list, business_avg_dict[0].items()))
    business_avg_df = spark.createDataFrame(business_avg_dict, ["business_id", "average_stars"])
    business_df=business_df.join(business_avg_df,on='business_id',how='inner')

    #review Dataframe Loading
    review_df=spark.read.json('train_review.json')

    #preparing Dataframe for ALS Collaborative filtering
    #adding userId integer column to business dataframe
    b_idDict=business_df.select('business_id').rdd.map(lambda x:x[0]).zipWithIndex()
    b_idDataFrame=sqlContext.createDataFrame(b_idDict,StructType([StructField("business_id", StringType(), True),StructField("businessId", IntegerType(), True)]))
    a = business_df.alias("a")
    b = b_idDataFrame.alias("b")
    business_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
                     .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])
    HybridRecommender.business_df=business_df                    
    
    #adding userId integer column to user dataframe
    u_idDict=user_df.select('user_id').rdd.map(lambda x:x[0]).zipWithIndex()
    u_idDataFrame=sqlContext.createDataFrame(b_idDict,StructType([StructField("user_id", StringType(), True),StructField("userId", IntegerType(), True)]))
    a = user_df.alias("a")
    b = u_idDataFrame.alias("b")
    user_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
                     .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])
    HybridRecommender.user_df=user_df 

    #adding both userId and businessId integer columns to review dataframe
    a = review_df.alias("a")
    b = user_df.alias("b")
    review_df = a.join(b, col("a.user_id") == col("b.user_id"), 'inner') \
                     .select([col('a.'+xx) for xx in a.columns] + [col('b.userId')])

    a = review_df.alias("a")
    b = business_df.alias("b")
    review_df = a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
                         .select([col('a.'+xx) for xx in a.columns] + [col('b.businessId')])
    HybridRecommender.review_df=review_df                 

    
    #creating views to be used
    business_df.createOrReplaceTempView("businesses")
    user_df.createOrReplaceTempView("users")
    review_df.createOrReplaceTempView("reviews")

In [None]:
def textpreprocessing(self):
  /**


  */
  review_df=review_df.rdd.map(lambda x:(x['business_id'],x['text'])).reduceByKey(add)
  review_df=review_df.toDF().withColumnRenamed("_1","business_id").withColumnRenamed("_2","reviewText")
  
  regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'reviewText', outputCol = 'generatedTokens')
  stopWordsRemover = StopWordsRemover(inputCol = 'generatedTokens', outputCol = 'stopwordsRemoved')
  countVectorizer = CountVectorizer(inputCol="stopwordsRemoved", outputCol="VectorizedFeatures")
  iDF = IDF(inputCol="VectorizedFeatures", outputCol="idfVector")
  word2Vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = 'stopwordsRemoved', outputCol = 'wordVectors', seed=123)
  vectorAssembler = VectorAssembler(inputCols=['idfVector', 'wordVectors'], outputCol='combinedVectors')

  pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, iDF, word2Vec, vectorAssembler])
  pipeline_mdl = pipeline.fit(review_df)
  pipeline_mdl.write().overwrite().save('savedpipeLine_txt')

In [None]:
def transformReviewText(self):
  /**


  */
  pipeline_mdl = PipelineModel.load('savedpipeLine_txt')
  review_df=review_df.rdd.map(lambda x:(x['business_id'],x['text'])).reduceByKey(add)
  review_df=review_df.toDF().withColumnRenamed("_1","business_id").withColumnRenamed("_2","reviewText")
  transformedReviewDF=pipeline_mdl.transform(review_df)
  HybridRecommender.itemVectors=transformedReviewDF.rdd.map(lambda x:(x['business_id'],x['wordVectors'])).collect()
  business_vectors = transformedReviewDF.select('business_id', 'wordVectors')
  business_vectors.write.mode('overwrite').parquet('businessVectors.parquet')

In [None]:
def cosineSimilarity(v1,v2):
    '''
    function explaination
    similarity bet
    userInput
    return
    '''
    return float(np.dot(v1,v2)/(np.sqrt(np.dot(v1,v1))*(np.sqrt(np.dot(v2,v2)))))

In [None]:
def businessDetails(input_business):
    '''
    function explaination
    '''
    inputDF=input_business.alias("input")
    businessDF=business_df.alias("businessDF")
    df=inputDF.join(businessDF,col("inputDF.business_id")=col("businessDF.business_id"),inner)
    df= df.select([col('inputDF.'+xx) for xx in inputDF.columns] + [col('businessDF.business_name'),col('businessDF.categories'),
                                                           col('businessDF.stars'),col('businessDF.review_count'),
                                                           col('businessDF.latitude'),col('businessDF.longitude')])
    return df

In [None]:
def contentbasedRecommendations(self,userId,recommendationsCount=15):
  /**




  */
  query="""
  SELECT distinct business_id from reviews
  where stars>=3.0 and user_id="{}"
  """.format(userId)
  userRecommendedBusinesses= sqlContext.sql(query)
  userRecommendedBusinesses = userRecommendedBusinesses.sample(False, 0.5).limit(5)
  businessIds=[]
  for ids in businessIds.collect():
    businessIds.append(ids)
  
  allbusinesswordVectors=HybridRecommender.itemVectors
  #extracting word vector(of review contents) for the business Vector
  for businessId in businessIds:
    wordVector=[vector[1] for vector in allbusinesswordVectors if vector[0]==businessId]
    similarBusinesses=[]
    #finding cosine similarities with respect to each business id
    for bid in allbusinesswordVectors:
      similarBusinesses.append((bid[0],float(cosineSimilarity(wordVector,bid[1]))))
    similarbusinessRDD=sc.parallelize(similarBusinesses)
    #sorting businessIds by cosine similarity values
    similarbusinessRDD.sortBy(lambda a: -a[1])
    similarbusinessRDD=similarbusinessRDD.filter(lambda x:x[0]!=businessId).take(recommendationsCount)

    #removing businessIds already reviewed by given userId
    a = similarbusinessRDD.alias("a")
    b = userRecommendedBusinesses.alias("b")
    bidScoreDF = s.join(r, col("a.business_id") == col("b.business_id"), 'left_outer').where(col("b.business_id").isNull()) \
             .select([col('a.business_id'),col('b.score')])    
    bidScoreDF = bidScoreDF.groupby('business_id').agg(max('score').alias('score'))
    sortedbIDS = bidScoreDF.orderBy("score", ascending = False).limit(max_recoms)
    recommendedDF = self.businessDetails(sortedbIDS)
  return DF