In [1]:
from datetime import datetime
import os
import numpy
from pyspark.sql.functions import *
from pyspark.sql.types import DateType
spark_home = os.environ.get('SPARK_HOME', None)

import plotly
plotly.tools.set_credentials_file(username='amcire96', api_key='sej35ud4YbSOfIshhhZg')
# print(plotly.__version__)

import plotly.plotly as py
from plotly.graph_objs import *
import plotly.figure_factory as FF
import requests
requests.packages.urllib3.disable_warnings()


In [2]:
review = spark.read.json("/user/hduser1/Yelp/review.json").repartition(300)

print(review.count())
review.printSchema()
# review.head(10)

4153150
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [3]:
business = spark.read.json("/user/hduser1/Yelp/business.json").repartition(300)

print(business.count())
business.printSchema()

144072
root
 |-- address: string (nullable = true)
 |-- attributes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- business_id: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- hours: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_open: long (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)
 |-- type: string (nullable = true)



In [4]:
valid_businesses = business.select("business_id").where(col("review_count") >= 5)

print(valid_businesses.count())
valid_businesses.printSchema()

106685
root
 |-- business_id: string (nullable = true)



In [5]:
valid_reviews = (valid_businesses.alias("t1").join(review.alias("t2"), col("t1.business_id") == col("t2.business_id"), "left")
                 .select(["t1.business_id", "t2.date"]))

print(valid_reviews.count())
valid_reviews.printSchema()

4025504
root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [6]:
conv_to_date = udf(lambda date_str: datetime.strptime(date_str, '%Y-%m-%d'), DateType())
review_dates = valid_reviews.withColumn("date_obj", conv_to_date(col("date"))).select([col("business_id"), col("date_obj").alias("date")])

# review.dates.cast(DateType).alias("date")
# review_dates = review.select([col("business_id"), col("date").cast(DateType).alias("date")])

# review_dates = (review.withColumn("date", col("date").cast("timestamp"))
#                 .groupBy("business_id")
#                 .agg(max("date")).alias("most_recent_date"))

review_dates.head(5)

[Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2015, 9, 6)),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2016, 2, 14)),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2016, 12, 28)),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2010, 4, 19)),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2013, 6, 18))]

In [7]:
valid_business_date_count = review_dates.groupby("business_id").agg(count("date").alias("review_count")).orderBy("review_count")

valid_business_date_count.head(10)

[Row(business_id=u'18ol4DIHGnqMMKfDblBLHA', review_count=5),
 Row(business_id=u'-PF_w4OxpzHTzGyULWWaSw', review_count=5),
 Row(business_id=u'1NbNo3v8XO9Sze_nqG2fyg', review_count=5),
 Row(business_id=u'2nx9qXnVkgf9ZR6RHn33TQ', review_count=5),
 Row(business_id=u'3swjoD8A_7_0nJa_r36xCA', review_count=5),
 Row(business_id=u'65iIzq80WQ1nMqj_rLWNYQ', review_count=5),
 Row(business_id=u'6A6jlv6LP0SyXf4UzfIwvA', review_count=5),
 Row(business_id=u'6UbcMDyimSHpPJGkMOQftw', review_count=5),
 Row(business_id=u'8EZZKsZ9Iuwz7Lem9NfN_w', review_count=5),
 Row(business_id=u'8W_qw1d7z2xwZyB5RU8GUg', review_count=5)]

In [8]:
most_recent_dates = (review_dates.groupby("business_id").agg(max("date").alias("most_recent_review"))
                     .orderBy("most_recent_review", ascending=[1]))

most_recent_dates.head(20)

[Row(business_id=u'YeCZ17qKPRHg_UUBoZ8CEQ', most_recent_review=datetime.date(2006, 8, 8)),
 Row(business_id=u'X_qqvaCL9zPxDf_Xp150nQ', most_recent_review=datetime.date(2007, 5, 5)),
 Row(business_id=u'T1-kPPbyO2kDiHVqgikCjQ', most_recent_review=datetime.date(2007, 5, 17)),
 Row(business_id=u'uz77ng57t-sFnEjKbTAPrQ', most_recent_review=datetime.date(2007, 8, 10)),
 Row(business_id=u'XVj_qD6hz4OJprt1bOAjWA', most_recent_review=datetime.date(2007, 8, 22)),
 Row(business_id=u'JSxxur20MpmL1DBUdQVf1w', most_recent_review=datetime.date(2007, 8, 24)),
 Row(business_id=u'hJgAUo66s6v0ndqeMdW2mw', most_recent_review=datetime.date(2007, 10, 2)),
 Row(business_id=u'2_-1k5i1R2Rfs-mHcme8kg', most_recent_review=datetime.date(2007, 10, 6)),
 Row(business_id=u'KLJmSYpq81fy0gXri-tAAw', most_recent_review=datetime.date(2007, 10, 15)),
 Row(business_id=u'HlwbuZnJubtHBfXlxWDXpw', most_recent_review=datetime.date(2007, 10, 30)),
 Row(business_id=u'yQfiIg-HDflMzBKOfyTqQA', most_recent_review=datetime.date(200

In [9]:
review_dates.createOrReplaceTempView("review_dates")

review_date_diffs = spark.sql("""
SELECT  t1.business_id, 
        t1.date, 
        MIN(t2.date) AS date2, 
        DATEDIFF(MIN(t2.Date), t1.Date) AS DaysDiff
FROM    review_dates t1
        LEFT JOIN review_dates t2
            ON t1.business_id = t2.business_id
            AND t2.date > t1.date
GROUP BY t1.business_id, t1.date
ORDER BY t1.business_id, t1.date ASC
""")

review_date_diffs.head(20)

[Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2008, 8, 7), date2=datetime.date(2010, 11, 25), DaysDiff=840),
 Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2010, 11, 25), date2=datetime.date(2010, 12, 6), DaysDiff=11),
 Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2010, 12, 6), date2=datetime.date(2010, 12, 22), DaysDiff=16),
 Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2010, 12, 22), date2=datetime.date(2011, 1, 8), DaysDiff=17),
 Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2011, 1, 8), date2=datetime.date(2011, 5, 30), DaysDiff=142),
 Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2011, 5, 30), date2=datetime.date(2012, 4, 15), DaysDiff=321),
 Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2012, 4, 15), date2=datetime.date(2013, 9, 17), DaysDiff=520),
 Row(business_id=u'--6MefnULPED_I942VcFNA', date=datetime.date(2013, 9, 17), date2=datetime.date(2014, 1, 29), DaysDiff=

In [10]:
valid_business_datediff = (review_date_diffs.groupby("business_id").agg(count("DaysDiff").alias("num_day_diffs"))
                           .where(col("num_day_diffs") >= 2).orderBy("num_day_diffs").select("business_id"))

valid_business_datediff.head(10)



[Row(business_id=u'-5-YYvq8Pel6aHt7Wu_IHA'),
 Row(business_id=u'ZpKOX5N0avg5lmSi3ndc4w'),
 Row(business_id=u'2WJ4s3PcLN4PI-WUGIS9IQ'),
 Row(business_id=u'bzLSsQ0Sbxt_BjlURH7l-A'),
 Row(business_id=u'nMIkE3p4BzAA9Pd3H8yJfQ'),
 Row(business_id=u'rX93JUIxtTFC3p6Y1eaUCQ'),
 Row(business_id=u'JF1SNzrkiQdlGYOwtOjl9w'),
 Row(business_id=u'2px0fsTfKb-9iiESw9c2Lg'),
 Row(business_id=u'27nF2tLrwH5GyhojeFIs_w'),
 Row(business_id=u'rwMsL9BhGDG6r0J5zJEpYQ')]

In [11]:
valid_review_date_diffs = (valid_business_datediff.alias("t1")
                           .join(review_date_diffs.alias("t2"), col("t1.business_id")==col("t2.business_id"))
                           .select(["t2."+c for c in review_date_diffs.columns]))
valid_review_date_diffs.printSchema()
valid_review_date_diffs.head(10)

root
 |-- business_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date2: date (nullable = true)
 |-- DaysDiff: integer (nullable = true)



[Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2005, 4, 27), date2=datetime.date(2006, 1, 7), DaysDiff=255),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2006, 1, 7), date2=datetime.date(2006, 1, 20), DaysDiff=13),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2006, 1, 20), date2=datetime.date(2006, 3, 7), DaysDiff=46),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2006, 3, 7), date2=datetime.date(2006, 3, 23), DaysDiff=16),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2006, 3, 23), date2=datetime.date(2006, 5, 27), DaysDiff=65),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2006, 5, 27), date2=datetime.date(2006, 7, 15), DaysDiff=49),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2006, 7, 15), date2=datetime.date(2006, 7, 27), DaysDiff=12),
 Row(business_id=u'--9e1ONYQuAa-CB_Rrw7Tw', date=datetime.date(2006, 7, 27), date2=datetime.date(2006, 9, 14), DaysDiff=49),
 Ro

In [31]:
valid_review_date_diffs.createOrReplaceTempView("valid_review_date_diffs")
# review_diff_stats = spark.sql("SELECT t1.business_id, MIN(t1.DaysDiff) AS min_days_diff FROM review_date_diffs t1 GROUP By t1.business_id")
review_diff_stats = spark.sql("""
SELECT t1.business_id, MEAN(t1.DaysDiff) AS mean_days_diff, STDDEV(t1.DaysDiff) AS stddev_days_diff
FROM valid_review_date_diffs t1
GROUP BY t1.business_id
ORDER BY mean_days_diff
""")

review_diff_stats.head(20)

[Row(business_id=u'sQfM2b9GmNzfJs6tiAPbzA', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'wnInrkth3_KXLxI9SniU2g', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'D9UIFXYDjvpfu0desRKXzg', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'BsDf3L7p_O4tEvllQGOq2A', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'1QU21VZX6oFQHmRCbC9Kcw', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'c5CWmwh0-WFIjn8_V2DECQ', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'Uiwgr1m6qgpVWP_AOr7-UA', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'nAE1llsVp39g_HgO0fOUuQ', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'yY_AmOiFF09Ph7EkKC9RzA', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'LGSoe8Gb7pmRmmDjqSbYIQ', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'mU3vlAVzTxgmZUu6F4XixA', mean_days_diff=1.0, stddev_days_diff=0.0),
 Row(business_id=u'MuaUtNrlJwJwZ8EGpBf6Tg',

In [13]:
review_diff_stats_df = review_diff_stats.toPandas()


In [None]:
review_diff_stats_mean_data = Data([Histogram(x=review_diff_stats_df["mean_days_diff"])])

layout = Layout(
    xaxis=dict(
        title='Mean Days Between Reviews',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    ),
    yaxis=dict(
        title='Number of Businesses',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    )
)
fig = Figure(data=review_diff_stats_mean_data, layout=layout)
py.iplot(fig)

In [None]:
review_diff_stats_stddev_data = Data([Histogram(x=review_diff_stats_df["stddev_days_diff"])])
layout = Layout(
    xaxis=dict(
        title='Stddev of Days Between Reviews',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    ),
    yaxis=dict(
        title='Number of Businesses',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    )
)
fig = Figure(data=review_diff_stats_stddev_data, layout=layout)
py.iplot(fig)

In [16]:
print(review_diff_stats_df.sort_values(by="stddev_days_diff"))

                   business_id  mean_days_diff  stddev_days_diff
0       yY_AmOiFF09Ph7EkKC9RzA        1.000000          0.000000
253     V-vwFZAlLRMXqCQKKVBM4g        2.000000          0.000000
250     jQ-kmllrSXcIJN7At3Dvkw        2.000000          0.000000
15      D9UIFXYDjvpfu0desRKXzg        1.000000          0.000000
13      nAE1llsVp39g_HgO0fOUuQ        1.000000          0.000000
12      OZEpz2S7kKaa_8vQz2ATqQ        1.000000          0.000000
11      BsDf3L7p_O4tEvllQGOq2A        1.000000          0.000000
10      G7aY-t3rcZyZEQ0ZdYMe-A        1.000000          0.000000
9       Uiwgr1m6qgpVWP_AOr7-UA        1.000000          0.000000
14      1QU21VZX6oFQHmRCbC9Kcw        1.000000          0.000000
7       MuaUtNrlJwJwZ8EGpBf6Tg        1.000000          0.000000
6       LGSoe8Gb7pmRmmDjqSbYIQ        1.000000          0.000000
5       IqXWpiyd6sAAtQrboDUe5g        1.000000          0.000000
4       yBmDNVdEzW1ARUXYlKACIw        1.000000          0.000000
1       c5CWmwh0-WFIjn8_V

In [37]:
review_diff_stats_counts = (business.select(["business_id", "review_count"]).alias("t1")
                            .join(review_diff_stats.alias("t2"), col("t1.business_id") == col("t2.business_id"))
                            .select(["t2." + col for col in review_diff_stats.columns] + ["t1.review_count"]))
                           
# print(review_diff_stats_counts.head(10))
review_diff_stats_counts_df = review_diff_stats_counts.toPandas()

In [None]:
trace = Scatter(
    x = review_diff_stats_counts_df["review_count"],
    y = review_diff_stats_counts_df["mean_days_diff"],
    mode = 'markers'
)

data = [trace]

layout = Layout(
    xaxis=dict(
        title='Review Count',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    ),
    yaxis=dict(
        title='Mean Days Between Reviews',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    )
)
fig = Figure(data=data, layout=layout)

py.iplot(fig)

In [None]:
trace = Scatter(
    x = review_diff_stats_counts_df["review_count"],
    y = review_diff_stats_counts_df["stddev_days_diff"],
    mode = 'markers'
)

data = [trace]

layout = Layout(
    xaxis=dict(
        title='Review Count',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    ),
    yaxis=dict(
        title='Stddev of Days Between Reviews',
        titlefont=dict(
            family='Arial, sans-serif',
            size=18,
            color='grey'
        ),
    )
)
fig = Figure(data=data, layout=layout)

py.iplot(fig)