In [1]:
from pyspark.sql import SparkSession
import os

# Set google service account
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/eduardchai/Workspace/NUS/eb5001-big-data-for-analytics/CA/big-data-ca-svc-acc.json"

Configure spark config:

In [2]:
sc._jsc.hadoopConfiguration().set("fs.gs.project.id", "big-data-project-272506")
sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", "/Users/eduardchai/Workspace/NUS/eb5001-big-data-for-analytics/CA/big-data-ca-svc-acc.json")

Start the spark session

In [3]:
spark = SparkSession.builder\
    .config(conf=sc.getConf())\
    .getOrCreate()

Read from bigquery table

In [15]:
from google.cloud import bigquery

In [27]:
# Currently this only supports queries which have at least 10 MB of results
QUERY = "SELECT * FROM `big-data-project-272506.mock.restaurant_reviews_raw` WHERE DATE(timestamp) = '2020-04-04'"
bq = bigquery.Client()
query_job = bq.query(QUERY)
query_job.result()

<google.cloud.bigquery.table.RowIterator at 0x12040a550>

In [28]:
df = spark.read\
        .format("bigquery")\
        .option("dataset", query_job.destination.dataset_id)\
        .option("table", query_job.destination.table_id)\
        .load()\
        .persist()

In [29]:
df.count()

751

## Define UDFs

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, IntegerType, FloatType

In [None]:
def remove_stopwords(words):
    """Remove stop words from list of tokenized words"""
    from nltk.corpus import stopwords
    
    new_words = []
    stop = stopwords.words('english')
    try:
        for word in words.split():
            if word not in stop:
                new_words.append(word)
    except:
        pass
    return ' '.join(new_words)

In [None]:
def preprocess_content(sentence):
    from nltk.tokenize import RegexpTokenizer
    tokenizer = RegexpTokenizer(r'\w+')
    sentence = ' '.join(word for word in tokenizer.tokenize(sentence))
    sentence = sentence.lower()
    
    return sentence

In [None]:
def compute_content_length(review):
    return int(len(review.split(' ')))

In [None]:
def compute_rating_deviation(user_rating, restaurant_rating):
    return float(abs(float(user_rating) - float(restaurant_rating)) / 4)

In [None]:
def compute_cosine_similarity(reviews):
    import numpy as np
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics import pairwise_distances
    
    vector = TfidfVectorizer(min_df=0)
    max = 0
    try:
        tfidf = vector.fit_transform(reviews)
        cosine = 1 - pairwise_distances(tfidf, metric='cosine')
        np.fill_diagonal(cosine, -np.inf)
        max = cosine.max()
    except:
        pass
        
    return float(max)

In [None]:
remove_stopwords_udf = udf(remove_stopwords)
preprocess_content_udf = udf(preprocess_content)
compute_content_length_udf = udf(compute_content_length, IntegerType())
compute_rating_deviation_udf = udf(compute_rating_deviation, FloatType())
compute_cosine_similarity_udf = udf(compute_cosine_similarity, FloatType())

### Start preprocessing

In [None]:
df_processed = df.withColumn("review_content", remove_stopwords_udf("review_content"))
df_processed = df_processed.withColumn("review_content", preprocess_content_udf("review_content"))
df_processed = df_processed.withColumn("review_length", compute_content_length_udf("review_content"))
df_processed = df_processed.withColumn("rating_deviation", compute_rating_deviation_udf("rating", "restaurant_rating"))

#### Compute maximum number of review per user

In [None]:
max_review_count = df_processed.groupby('reviewer_id', 'timestamp').count().groupby().max().collect()[0][0]

In [None]:
maximum_review_per_user_df = df_processed.groupby('reviewer_id', 'timestamp').count()

In [None]:
df_processed = df_processed.join(
    maximum_review_per_user_df, 
    (df_processed.reviewer_id == maximum_review_per_user_df.reviewer_id) & (df_processed.timestamp == maximum_review_per_user_df.timestamp)
).select(df_processed["*"], maximum_review_per_user_df["count"])

In [None]:
mnr_udf = udf(lambda x: x / max_review_count, FloatType())
df_processed = df_processed.withColumn("maximum_review_per_user", mnr_udf("count"))
df_processed = df_processed.drop("count")

In [None]:
df_processed.limit(2).collect()

#### Compute cosine similarity

In [None]:
cos_sim_df = df_processed.select("reviewer_id", "review_content").rdd.reduceByKey(lambda a, b: a.append(b) if type(a) == list else ([a] + [b])).map(lambda x: (x[0], compute_cosine_similarity(x[1]))).toDF(["reviewer_id", "cos_sim"])
# cos_sim_df = cos_sim_df.withColumn("cos_sim", compute_cosine_similarity_udf("review_content"))

In [None]:
df_processed = df_processed.join(cos_sim_df, df_processed.reviewer_id == cos_sim_df.reviewer_id).select(df_processed["*"], cos_sim_df["cos_sim"])

In [None]:
df_processed.printSchema()

#### Save data to BigQuery

In [None]:
df_processed.repartition(4).write\
  .format("bigquery")\
  .mode("Overwrite")\
  .option("table","big-data-project-272506:yelp_dataset.restaurant_reviews_final")\
  .option("temporaryGcsBucket","big-data-project-272506-temp")\
  .option("createDisposition", "CREATE_NEVER")\
  .save()

In [None]:
test = df.filter("timestamp < '2014-04-05 00:00:00'")

In [None]:
test.