In [None]:
from __future__ import print_function

import json, codecs, os
import numpy as np
from timeit import default_timer as timer

from pyspark.ml.feature import IDF
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list

In [None]:
arr_collections_debug = ["8000000"]
arr_collections = ["100","500","1000","5000","10000","50000","100000","500000","1000000"]#,"5000000","8000000"]
MONGO_HOST = "192.168.0.15"
MONGO_PORT = "27017"
firstRun = True

review_df = None
spark = SparkSession.builder \
    .appName("tfidf_spark") \
    .master("spark://spark:7077") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") \
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
sqlContext = SQLContext(spark)

def getCollectionReadURL(collection):
    return "mongodb://" + MONGO_HOST + ":" + MONGO_PORT + "/yelp_filtered_read." + collection + "?ssl=false"

def readFromCollection(collection, profile=False):
    url = getCollectionReadURL(collection)
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", url).load()
    return df

In [None]:
def tokenize(review_df):
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    return tokenizer.transform(review_df)

def vectorize(review_df):
    countVectorizer = CountVectorizer(inputCol='words', outputCol='vectorizer', minDF=3).fit(review_df)
    return countVectorizer.transform(review_df)

def idf(review_df):
    idf = IDF(inputCol="vectorizer", outputCol="tfidf_features").fit(review_df)
    return idf.transform(review_df)

def process(review_df):
    grouped_df = review_df.groupBy("business_id").agg(collect_list('text').alias("text"))
    grouped_df = grouped_df.withColumn("text", concat_ws(" ", col("text")))
    grouped_df = tokenize(grouped_df)
    grouped_df = vectorize(grouped_df)
    grouped_df = idf(grouped_df)
    return grouped_df

In [None]:
arr_collection_timings = {}
for collection in arr_collections_debug:
    review_df = readFromCollection(collection)
    count = review_df.count()
    arr_timings = []
    
    for i in range(3):
        starttime = timer()
        grouped_df = process(review_df)
        endtime = timer()
        
        if(firstRun):
            print("Ignored: " + str(endtime-starttime))
            firstRun = False
            starttime = timer()
            grouped_df = process(review_df)
            endtime = timer()
        
        arr_timings.append(endtime-starttime)
        print("["+"{:02d}".format(i+1)+"] "+ str(count) + ": " + str(round(endtime-starttime, 3)) + " segundos")
        
    arr_collection_timings[collection] = round(np.mean(arr_timings), 3)

In [None]:
review_df.show()

In [None]:
# Bellagio Gallery of Fine Art
#company_df1 = review_df[review_df['business_id'] == "-MhfebM0QIsKt87iDN-FNw"]

#The Empanadas House
#company_df2 = review_df[review_df['business_id'] == "pQeaRpvuhoEqudo3uymHIQ"]

In [None]:
#company_df1.select("vectorizer").collect()

In [None]:
#company_df2.select("vectorizer").collect()

In [None]:
arr_collection_timings