## Load Modules

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp-Project").getOrCreate()
SparkSession.builder.enableHiveSupport()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/17 20:02:44 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/03/17 20:02:44 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/03/17 20:02:44 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/03/17 20:02:44 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


<pyspark.sql.session.SparkSession.Builder at 0x7f3d5e994c40>

In [6]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, sum

## Define Functions

In [5]:
def clean_null(df):
    df_cleaned=df
    for col in df.columns:
        df_cleaned=df_cleaned.filter(F.col(col).isNotNull())
        df_cleaned=df_cleaned.filter(F.col(col)!='None')
        df_cleaned=df_cleaned.filter(F.col(col)!='null')
    return df_cleaned

## 1. First process the business dataset

In [4]:
tbl_business = spark.read.option("header",True).json("/user/lyh/yelp_data/yelp_academic_dataset_business.json")

23/03/17 16:24:34 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [14]:
tbl_business.show(10,True)

+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|          city|               hours|is_open|  latitude|   longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+----------+------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{null, null, null...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...| Santa Barbara|                null|      0|34.4266787|-119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{null, null, null...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|        Affton|{8:0-18:30, 0:0-0...|      1| 38.551126|  -90.335695

In [12]:
tbl_business.select([sum(col(c).isNull().cast("int")).alias(c) for c in tbl_business.columns]).show()



+-------+----------+-----------+----------+----+-----+-------+--------+---------+----+-----------+------------+-----+-----+
|address|attributes|business_id|categories|city|hours|is_open|latitude|longitude|name|postal_code|review_count|stars|state|
+-------+----------+-----------+----------+----+-----+-------+--------+---------+----+-----------+------------+-----+-----+
|      0|     13744|          0|       103|   0|23223|      0|       0|        0|   0|          0|           0|    0|    0|
+-------+----------+-----------+----------+----+-----+-------+--------+---------+----+-----------+------------+-----+-----+



                                                                                

In [13]:
tbl_business.count() - tbl_business.dropDuplicates().count()

                                                                                

0

In [15]:
tbl_business.count(), tbl_business.distinct().count()

                                                                                

(150346, 150346)

In [16]:
tbl_business.columns

['address',
 'attributes',
 'business_id',
 'categories',
 'city',
 'hours',
 'is_open',
 'latitude',
 'longitude',
 'name',
 'postal_code',
 'review_count',
 'stars',
 'state']

In [17]:
tbl_business_cleaned = tbl_business
for i in tbl_business.columns:
    tbl_business_cleaned = tbl_business_cleaned.filter(F.col(i).isNotNull())

In [18]:
tbl_business_cleaned.count()

                                                                                

117618

In [19]:
tbl_business_cleaned.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in tbl_business.columns]).show()



+-------+----------+-----------+----------+----+-----+-------+--------+---------+----+-----------+------------+-----+-----+
|address|attributes|business_id|categories|city|hours|is_open|latitude|longitude|name|postal_code|review_count|stars|state|
+-------+----------+-----------+----------+----+-----+-------+--------+---------+----+-----------+------------+-----+-----+
|      0|         0|          0|         0|   0|    0|      0|       0|        0|   0|          0|           0|    0|    0|
+-------+----------+-----------+----------+----+-----+-------+--------+---------+----+-----------+------------+-----+-----+



                                                                                

In [21]:
tbl_business_cleaned.write.mode('overwrite') \
    .saveAsTable("yelp_database.business")

23/03/17 19:18:51 WARN org.apache.hadoop.hive.ql.session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


## 2. process the checking dataset

In [2]:
tbl_checkin = spark.read.option("header",True).json("/user/lyh/yelp_data/yelp_academic_dataset_checkin.json")

                                                                                

In [3]:
tbl_checkin.show(10,True)

+--------------------+--------------------+
|         business_id|                date|
+--------------------+--------------------+
|---kPU91CF4Lq2-Wl...|2020-03-13 21:10:...|
|--0iUa4sNDFiZFrAd...|2010-09-13 21:43:...|
|--30_8IhuyMHbSOcN...|2013-06-14 23:29:...|
|--7PUidqRWpRSpXeb...|2011-02-15 17:12:...|
|--7jw19RH9JKXgFoh...|2014-04-21 20:42:...|
|--8IbOsAAxjKRoYsB...|2015-06-06 01:03:...|
|--9osgUCSDUWUkoTL...|2015-06-13 02:00:...|
|--ARBQr1WMsTWiwOK...|2014-12-12 00:44:...|
|--FWWsIwxRwuw9vIM...|2010-09-11 16:28:...|
|--FcbSxK1AoEtEAxO...|2017-08-18 19:43:...|
+--------------------+--------------------+
only showing top 10 rows



In [7]:
tbl_checkin_cleaned = clean_null(tbl_checkin)

In [8]:
tbl_checkin_cleaned.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)



In [9]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [10]:
tbl_checkin_cleaned = tbl_checkin_cleaned.withColumn("datetime",F.to_timestamp(F.col('date'),'yyyy-MM-dd HH:mm:ss'))

In [11]:
tbl_checkin_cleaned.show(10,True)

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+--------------------+-------------------+
|         business_id|                date|           datetime|
+--------------------+--------------------+-------------------+
|---kPU91CF4Lq2-Wl...|2020-03-13 21:10:...|2020-03-13 21:10:56|
|--0iUa4sNDFiZFrAd...|2010-09-13 21:43:...|2010-09-13 21:43:09|
|--30_8IhuyMHbSOcN...|2013-06-14 23:29:...|2013-06-14 23:29:17|
|--7PUidqRWpRSpXeb...|2011-02-15 17:12:...|2011-02-15 17:12:00|
|--7jw19RH9JKXgFoh...|2014-04-21 20:42:...|2014-04-21 20:42:11|
|--8IbOsAAxjKRoYsB...|2015-06-06 01:03:...|2015-06-06 01:03:19|
|--9osgUCSDUWUkoTL...|2015-06-13 02:00:...|2015-06-13 02:00:57|
|--ARBQr1WMsTWiwOK...|2014-12-12 00:44:...|2014-12-12 00:44:23|
|--FWWsIwxRwuw9vIM...|2010-09-11 16:28:...|2010-09-11 16:28:39|
|--FcbSxK1AoEtEAxO...|2017-08-18 19:43:...|2017-08-18 19:43:50|
+--------------------+--------------------+-------------------+
only showing top 10 rows



                                                                                

In [12]:
tbl_checkin_cleaned.write.mode('overwrite') \
    .saveAsTable("yelp_database.checkin")

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
23/03/17 20:06:19 WARN org.apache.hadoop.hive.ql.session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


## 3. process the review dataset, this table contains the checking time of the customer for specific business_id

In [13]:
tbl_review = spark.read.option("header",True).json("/user/lyh/yelp_data/yelp_academic_dataset_review.json")

                                                                                

In [17]:
# tbl_review_cleaned  = clean_null(tbl_review)

In [18]:
# tbl_review_cleaned.count(), tbl_review.count()

                                                                                

(0, 6990280)

In [19]:
tbl_review.show(10)
#tbl_review_cleaned.show(10,True)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [20]:
tbl_review.write.mode('overwrite') \
    .saveAsTable("yelp_database.review")

                                                                                

## 4. Process the tip data

In [21]:
tbl_tip = spark.read.option("header",True).json("/user/lyh/yelp_data/yelp_academic_dataset_tip.json")

                                                                                

In [22]:
tbl_tip.show(10)

[Stage 18:>                                                         (0 + 1) / 1]

+--------------------+----------------+-------------------+--------------------+--------------------+
|         business_id|compliment_count|               date|                text|             user_id|
+--------------------+----------------+-------------------+--------------------+--------------------+
|3uLgwr0qeCNMjKenH...|               0|2012-05-18 02:17:21|Avengers time wit...|AGNUgVwnZUey3gcPC...|
|QoezRbYQncpRqyrLH...|               0|2013-02-05 18:35:10|They have lots of...|NBN4MgHP9D3cw--Sn...|
|MYoRNLb5chwjQe3c_...|               0|2013-08-18 00:56:08|It's open even wh...|-copOvldyKh1qr-vz...|
|hV-bABTK-glh5wj31...|               0|2017-06-27 23:05:38|Very decent fried...|FjMQVZjSqY8syIO-5...|
|_uN0OudeJ3Zl_tf6n...|               0|2012-10-06 19:43:09|Appetizers.. plat...|ld0AperBXk1h6Ubqm...|
|7Rm9Ba50bw23KTA8R...|               0|2012-03-13 04:00:52|Chili Cup + Singl...|trf3Qcz8qvCDKXiTg...|
|kH-0iXqkL7b8UXNpg...|               0|2013-12-03 23:42:15|Saturday, Dec 7th...|SM

                                                                                

In [23]:
# tbl_tip_cleaned = clean_null(tbl_tip)

In [26]:
tbl_tip.count()

                                                                                

908915

In [27]:
tbl_tip.write.mode('overwrite') \
    .saveAsTable("yelp_database.tip")

                                                                                

## 5. Process the user data

In [28]:
tbl_user = spark.read.option("header",True).json("/user/lyh/yelp_data/yelp_academic_dataset_user.json")

                                                                                

In [29]:
tbl_user.show(10)

+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+--------+------------+------+--------------------+-------------------+
|average_stars|compliment_cool|compliment_cute|compliment_funny|compliment_hot|compliment_list|compliment_more|compliment_note|compliment_photos|compliment_plain|compliment_profile|compliment_writer| cool|               elite|fans|             friends|funny|    name|review_count|useful|             user_id|      yelping_since|
+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+--------+------------+------+--------------------+-------------------+
|         3.9

[Stage 29:>                                                         (0 + 1) / 1]                                                                                

In [30]:
# tbl_user_cleaned = clean_null(tbl_user)

In [32]:
# tbl_user_cleaned.show(10)

In [33]:
tbl_user.write.mode('overwrite') \
    .saveAsTable("yelp_database.user")

                                                                                

## Load data from Hive

In [38]:
df = spark.read.table("yelp_database.tip")
df.show(10,True)

[Stage 33:>                                                         (0 + 1) / 1]

+--------------------+----------------+-------------------+--------------------+--------------------+
|         business_id|compliment_count|               date|                text|             user_id|
+--------------------+----------------+-------------------+--------------------+--------------------+
|3uLgwr0qeCNMjKenH...|               0|2012-05-18 02:17:21|Avengers time wit...|AGNUgVwnZUey3gcPC...|
|QoezRbYQncpRqyrLH...|               0|2013-02-05 18:35:10|They have lots of...|NBN4MgHP9D3cw--Sn...|
|MYoRNLb5chwjQe3c_...|               0|2013-08-18 00:56:08|It's open even wh...|-copOvldyKh1qr-vz...|
|hV-bABTK-glh5wj31...|               0|2017-06-27 23:05:38|Very decent fried...|FjMQVZjSqY8syIO-5...|
|_uN0OudeJ3Zl_tf6n...|               0|2012-10-06 19:43:09|Appetizers.. plat...|ld0AperBXk1h6Ubqm...|
|7Rm9Ba50bw23KTA8R...|               0|2012-03-13 04:00:52|Chili Cup + Singl...|trf3Qcz8qvCDKXiTg...|
|kH-0iXqkL7b8UXNpg...|               0|2013-12-03 23:42:15|Saturday, Dec 7th...|SM

                                                                                

In [39]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)

