In [32]:
%run ../data_prep/data_preperation.ipynb


Loading configs.py


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /Users/hims/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [33]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import explode, create_map
from pyspark.sql.functions import size
from pyspark.sql.types import IntegerType, MapType
from pyspark.sql.types import StringType

In [34]:
# def cache_to_disk_for_test(user_df, review_df, friends_df):
#     user_df.write.mode("overwrite").parquet("/tmp/test-1/cache/user_df")
#     user_df = spark.read.parquet("/tmp/test-1/cache/user_df")
#      
#     review_df.write.mode("overwrite").parquet("/tmp/test-1/cache/review_df")
#     review_df = spark.read.parquet("/tmp/test-1/cache/review_df")
#     
#     friends_df.write.mode("overwrite").parquet("/tmp/test-1/cache/friends_df")
#     friends_df = spark.read.parquet("/tmp/test-1/cache/friends_df")
#     
#     return user_df, joined_df, review_df, friends_df
#     
    

In [35]:
# business_df = process_business_data(spark).withColumnRenamed("review_count", "bus_review_count").withColumnRenamed("stars", "bus_stars").withColumnRenamed("name", "bus_name").cache()
# 
# checkin_df = process_checkin_data(spark)
# full_review_df = process_review_data(spark)
# 
# selected_user_id = full_review_df.groupBy("user_id").count().orderBy(col("count").desc()).select("user_id").sample(0.001)
# full_review_df.groupBy("user_id").count().orderBy(col("count").desc()).show()
# selected_user_id.write.mode("overwrite").parquet("/tmp/test-1/cache/selected_user_id")
# selected_user_id = spark.read.parquet("/tmp/test-1/cache/selected_user_id")
# 
# 
# review_df = process_review_data(spark).repartition(40).select("user_id", "business_id", "sentiment", "date", "frequent_words").join(selected_user_id, on = ["user_id"])
# review_df.write.mode("overwrite").parquet("/tmp/test-1/cache/review_df")
# review_df = spark.read.parquet("/tmp/test-1/cache/review_df")
# 
# 
# friends_df = process_friends_data(spark).join(selected_user_id, on = ["user_id"])
# friends_df.write.mode("overwrite").parquet("/tmp/test-1/cache/friends_df")
# friends_df = spark.read.parquet("/tmp/test-1/cache/friends_df")
# 
# 
# tip_df = process_tip_data(spark).join(selected_user_id, on = ["user_id"])
# user_df = process_user_data(spark).join(selected_user_id, on = ["user_id"]).withColumnRenamed("review_count", "user_review_count").withColumnRenamed("useful", "user_useful").withColumnRenamed("funny", "user_funny").withColumnRenamed("cool", "user_cool")
# user_df.write.mode("overwrite").parquet("/tmp/test-1/cache/user_df")
# user_df = spark.read.parquet("/tmp/test-1/cache/user_df")
# 
# 
# 
# review_df.count()
# # user_df, review_df, friends_df = cache_to_disk_for_test(user_df, review_df, friends_df)

In [36]:
user_df.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fans: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- yelping_since: timestamp (nullable = true)


In [37]:
def get_customer_agg_value(spark):
    review_df.createOrReplaceTempView("review")
    df = spark.sql("""
        select 
            user_id, 
            min(date) as first_seen, 
            max(date) as last_seen, 
            DATEDIFF(max(date), min(date)) as date_diff,
            count(distinct business_id) as different_business_count,
            avg(stars) as avg_rating,
            min(stars) as min_stars,
            max(stars) as max_stars
        from review
        group by user_id 
    """)
    return df

In [38]:

@udf(MapType(StringType(), IntegerType()))
def merge_maps_array(map_array):
    result = {}
    for m in map_array:
        for k, v in m.items():
            result[k] = result.get(k, 0) + v
    return result


def get_customer_category_counts():
    df = review_df.select("user_id", "business_id") \
        .join(business_df.select("business_id", "categories"), on = ["business_id"]) \
        .select("user_id", explode("categories").alias("category")) \
        .groupBy("user_id", "category").count() \
        .withColumn("category_map", create_map(col("category"), col("count"))) \
        .groupBy("user_id").agg(collect_list(col("category_map")).alias("category_map")) \
        .withColumn("category_map", merge_maps_array(col("category_map")))
        
    return df

# get_customer_category_counts(joined_df).show(5, False)


In [39]:
def get_friends_count():
    df = friends_df.select("user_id", size(col("friends")).alias("friends_count"))
    return df

In [40]:
def get_sentiments_count():
    df = review_df.select("user_id", "sentiment").groupBy("user_id", "sentiment").count() \
        .withColumnRenamed("count", "sentiment_count") \
        .withColumn("sentiment_map", create_map(col("sentiment"), col("sentiment_count"))) \
        .groupBy("user_id").agg(collect_list(col("sentiment_map")).alias("sentiment_map")) \
        .withColumn("sentiment_map", merge_maps_array(col("sentiment_map"))) 
    
    return df

# get_sentiments_count().show()

In [41]:
def most_frequent_words():
    return review_df.select("user_id", explode("frequent_words").alias("frequent_words")) \
        .groupBy("user_id", "frequent_words").count() \
        .withColumn("frequent_words_map", create_map(col("frequent_words"), col("count"))) \
        .groupBy("user_id").agg(collect_list(col("frequent_words_map")).alias("frequent_words_map")) \
        .withColumn("frequent_words_map", merge_maps_array(col("frequent_words_map")))

#     return df

# Merging all the dataframes

In [45]:

user_agg_df = get_customer_agg_value(spark)
user_category_df = get_customer_category_counts()
friends_count_df = get_friends_count()
sentiment_count_df = get_sentiments_count()
frequent_words_df = most_frequent_words()

complete_user_df = user_df \
                    .join(user_agg_df, on = ["user_id"]) \
                    .join(user_category_df, on = ["user_id"]) \
                    .join(friends_count_df, on = ["user_id"]) \
                    .join(sentiment_count_df, on = ["user_id"]).cache()

complete_user_df.printSchema()
complete_user_df.count()

complete_user_df.write.mode("overwrite").parquet(f"{sampleOutputPath}/combined")
# save_spark_df_to_db(complete_user_df, "users")
# complete_user_df.count()

root
 |-- user_id: string (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fans: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- yelping_since: timestamp (nullable = true)
 |-- first_seen: timestamp (nullable = true)
 |-- last_seen

In [43]:
save_spark_df_to_db(complete_user_df, "users")
