In [0]:
1. Analyze the number of users joining each year.

In [1]:


%pyspark
from pyspark.sql import HiveContext
from pyspark.sql.functions import year, to_date, col, count

hc = HiveContext(sc)

users_df = hc.table('users')

result = users_df.withColumn("join_year", year(col("user_yelping_since"))) \
                .groupBy("join_year") \
                .agg(count("*").alias("users")) \
                .orderBy("join_year") \
                .withColumnRenamed("join_year", "Joining Year") \
                .withColumnRenamed("users", "Users")

z.show(result)



In [2]:
2. Identify top reviewers based on review_count.

In [3]:


%pyspark
from pyspark.sql import HiveContext
from pyspark.sql.functions import desc

hc = HiveContext(sc)

users_df = hc.table('users')

result = users_df.select("user_name", "user_review_count") \
                .orderBy(desc("user_review_count")) \
                .withColumnRenamed("user_name", "User Name") \
                .withColumnRenamed("user_review_count", "Review Count")

z.show(result)



In [4]:
3. Identify the most popular users based on fans.

In [5]:


%pyspark
from pyspark.sql import HiveContext
from pyspark.sql.functions import desc

hc = HiveContext(sc)
users_df = hc.table('users')

result = users_df.select("user_name", "user_fans") \
                .orderBy(desc("user_fans")) \
                .withColumnRenamed("user_name", "User Name") \
                .withColumnRenamed("user_fans", "Fan Count")

z.show(result)



In [6]:
4. Calculate the ratio of elite users to regular users each year.

In [7]:


%pyspark
from pyspark.sql import HiveContext
from pyspark.sql.functions import year, col, count, when

hc = HiveContext(sc)

users_df = hc.table('users')

users_with_year = users_df.withColumn("join_year", year(col("user_yelping_since")))

users_with_year = users_with_year.withColumn("is_elite", when(col("user_elite") != "", 1).otherwise(0))

result = users_with_year.groupBy("join_year") \
                        .agg(count(when(col("is_elite") == 1, 1)).alias("elite_users"),
                             count(when(col("is_elite") == 0, 1)).alias("regular_users")) \
                        .withColumn("elite_to_regular_ratio", 
                                    (col("elite_users") / col("regular_users"))) \
                        .orderBy("join_year")

result = result.withColumnRenamed("join_year", "Joining Year") \
               .withColumnRenamed("elite_users", "Elite Users") \
               .withColumnRenamed("regular_users", "Regular Users") \
               .withColumnRenamed("elite_to_regular_ratio", "Elite to Regular Ratio")

z.show(result)



In [8]:
5. Display the proportion of total users and silent users (users who haven't written reviews) each year.

In [9]:


%pyspark
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

df = spark.table("users")

df_with_year = df.withColumn("join_year", F.year(F.col("user_yelping_since")))

total_users = df_with_year.groupBy("join_year") \
                          .count() \
                          .withColumnRenamed("join_year", "Year") \
                          .withColumnRenamed("count", "Total Users")

silent_users = df_with_year.filter(F.col("user_review_count") == 0) \
                           .groupBy("join_year") \
                           .count() \
                           .withColumnRenamed("join_year", "Year") \
                           .withColumnRenamed("count", "Silent Users")

result = total_users.join(silent_users, "Year", "left") \
                    .na.fill({"Silent Users": 0}) \
                    .withColumn("Proportion of Silent Users", 
                                F.format_number(F.col("Silent Users") / F.col("Total Users"), 8)) \
                    .orderBy("Year")

z.show(result)



In [10]:
6. Compute the yearly statistics of new users, number of reviews, elite users, tips, and check-ins.

In [11]:


%pyspark
from pyspark.sql.functions import year, to_date, split, explode, col, coalesce, lit

users_df = spark.table("users")
review_df = spark.table("review")
tip_df = spark.table("tip")
checkin_df = spark.table("checkin")

users_df = users_df.withColumn("year", year(to_date(users_df.user_yelping_since))).filter(col("year") > 0)
review_df = review_df.withColumn("year", year(to_date(review_df.rev_date))).filter(col("year") > 0)
tip_df = tip_df.withColumn("year", year(to_date(tip_df.tip_date))).filter(col("year") > 0)

checkin_df = checkin_df.withColumn("checkin_date_array", split(checkin_df.checkin_dates, ",")) \
                       .withColumn("checkin_date", explode("checkin_date_array")) \
                       .withColumn("year", year(to_date("checkin_date"))).filter(col("year") > 0)

new_users_per_year = users_df.groupBy("year").count().withColumnRenamed("count", "new_users")
reviews_per_year = review_df.groupBy("year").count().withColumnRenamed("count", "number_of_reviews")
elite_users_per_year = users_df.withColumn("elite_years", explode(split(users_df.user_elite, ","))) \
                               .withColumn("elite_year", year(to_date("elite_years"))) \
                               .filter(col("elite_year") > 0) \
                               .groupBy("elite_year").count().withColumnRenamed("count", "elite_users")
tips_per_year = tip_df.groupBy("year").count().withColumnRenamed("count", "number_of_tips")
checkins_per_year = checkin_df.groupBy("year").count().withColumnRenamed("count", "number_of_checkins")

yearly_statistics = new_users_per_year \
    .join(reviews_per_year, "year", "outer") \
    .join(elite_users_per_year.withColumnRenamed("elite_year", "year"), "year", "outer") \
    .join(tips_per_year, "year", "outer") \
    .join(checkins_per_year, "year", "outer") \
    .filter(col("year") > 0) \
    .orderBy("year")

yearly_statistics = yearly_statistics \
    .withColumn("new_users", coalesce(col("new_users"), lit(0))) \
    .withColumn("number_of_reviews", coalesce(col("number_of_reviews"), lit(0))) \
    .withColumn("elite_users", coalesce(col("elite_users"), lit(0))) \
    .withColumn("number_of_tips", coalesce(col("number_of_tips"), lit(0))) \
    .withColumn("number_of_checkins", coalesce(col("number_of_checkins"), lit(0)))

yearly_statistics = yearly_statistics \
    .withColumnRenamed("year", "Year") \
    .withColumnRenamed("new_users", "New Users") \
    .withColumnRenamed("number_of_reviews", "Number of Reviews") \
    .withColumnRenamed("elite_users", "Elite Users") \
    .withColumnRenamed("number_of_tips", "Number of Tips") \
    .withColumnRenamed("number_of_checkins", "Number of Checkins")

z.show(yearly_statistics)

