## Load the yelp_user_gender.tsv Data
* Insert a new cell
* From the `Insert to code` drop-down for the `yelp_user_gender.tsv` file select `Insert SparkSession Setup`
* Remove the line for setting the path
* Modify the name of the Object Storage containner specified for the `path_gender` in the next code cell if needed:
``` python
path_gender = bmos.url('Fall2017', 'yelp_user_gender.tsv')
```

### Additional files required as data assets from Round 10 of the Yelp Dataset Challenge
* business.json.gz
* review.json.gz

In [21]:
# The following variable contains the path to your file on your Object Storage.
path_gender = bmos.url('Fall2017', 'yelp_user_gender.tsv')
df_yelp_gender = spark.read.format("csv").\
option("header","true").option("delimiter","\t").load(path_gender)
df_yelp_gender.printSchema()
print "number of users: ", df_yelp_gender.count()
df_yelp_gender.createOrReplaceTempView("yelp_user_gender")

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_gender: string (nullable = true)
 |-- gender_ratio: string (nullable = true)

number of users:  1183362


In [3]:

# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# The SparkSession object is already initalized for you.
# The following variable contains the path to your file on your Object Storage.
path_reviews = bmos.url('Fall2017', 'review.json.gz')
df_reviews = spark.read.json(path_reviews)
print "Number of reviews: ", df_reviews.count()
df_reviews.printSchema()


Number of reviews:  4736897
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [27]:
df_reviews.selectExpr("review_id", "business_id","user_id", "stars")\
.createOrReplaceTempView("yelp_reviews")
spark.sql("SELECT * FROM yelp_reviews").show(truncate=False)

+----------------------+----------------------+----------------------+-----+
|review_id             |business_id           |user_id               |stars|
+----------------------+----------------------+----------------------+-----+
|VfBHSwC5Vz_pbFluy07i9Q|uYHaNptLzDLoV_JZ_MuzUA|cjpdDjZyprfyDG3RlkVG3w|5    |
|3zRpneRKDsOPq92tq7ybAA|uYHaNptLzDLoV_JZ_MuzUA|bjTcT8Ty4cJZhEOEo01FGA|3    |
|ne5WhI1jUFOcRn-b-gAzHA|uYHaNptLzDLoV_JZ_MuzUA|AXgRULmWcME7J6Ix3I--ww|3    |
|llmdwOgDReucVoWEry61Lw|uYHaNptLzDLoV_JZ_MuzUA|oU2SSOmsp_A8JYI7Z2JJ5w|4    |
|DuffS87NaSMDmIfluvT83g|uYHaNptLzDLoV_JZ_MuzUA|0xtbPEna2Kei11vsU-U2Mw|5    |
|GvLmUkjUrOyFH8KFnmT1uw|uYHaNptLzDLoV_JZ_MuzUA|rW8q706dz5-NnXDzMwVkiw|5    |
|lGEl24NGj2HVBJrodeXcjg|uYHaNptLzDLoV_JZ_MuzUA|yx8vNXUL0D0HS8rUIC7AFA|4    |
|cUgvEy5wj7zYE68v1BzzVg|uYHaNptLzDLoV_JZ_MuzUA|zXnH6W74FAJQ7q7b-NuBsA|4    |
|FSB_BnvysBgH3JYrbFNcgw|uYHaNptLzDLoV_JZ_MuzUA|c5yp5hxwC1N98MjbV2LyWQ|4    |
|dhl3ZW9aAEX_T7_um5tfaQ|uYHaNptLzDLoV_JZ_MuzUA|xJisL5w4wOgiYLokGMT_IA|4    |

In [4]:

# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# The SparkSession object is already initalized for you.
# The following variable contains the path to your file on your Object Storage.
path_business = bmos.url('Fall2017', 'business.json.gz')
df_business = spark.read.json(path_business)
print "Number of businesses: ", df_business.count()
df_business.printSchema()

Number of businesses:  156639
root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: boolean (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: struct (nullable = true)
 |    |    |-- casual: boolean (nullable = true)
 |    |    |-- classy: boolean (nullable = true)
 |    |    |-- divey: boolean (nullable = true)
 |    |    |-- hipster: boolean (nullable = true)
 |    |    |-- intimate: boolean (nullable = true)
 |    |    |-- romantic: boolean (nullable = true)
 |    |    |-- touristy: boolean (nullable = true)
 |    |    |-- trendy: boolean (nullable = true)
 |    |    |-- upscale: boolean (nullable = true)
 |    |-- BYOB: boolean (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: struct (nullable = true)
 |    |    |-- friday: boolean (nullable = true)
 |    |    |-- monday: boolean (nullable = true)
 |    |   

In [23]:
df_business.createOrReplaceTempView("yelp_business")
df_restaurants = spark.sql("SELECT business_id, stars \
FROM yelp_business \
WHERE array_contains(categories,'Restaurants')")
print "Number of restaurants: ", df_restaurants.count()
df_restaurants.show(5, truncate=False)
df_restaurants.createOrReplaceTempView("yelp_restaurants")

Number of restaurants:  51613
+----------------------+-----+
|business_id           |stars|
+----------------------+-----+
|mLwM-h2YhXl2NCgdS84_Bw|4.5  |
|duHFBe87uNSXImQmvBh87Q|4.5  |
|SDMRxmcKPNt1AHPBKqO64Q|2.0  |
|iFEiMJoEqyB9O8OUNSdLzA|3.0  |
|HmI9nhgOkrXlUr6KZGZZew|3.0  |
+----------------------+-----+
only showing top 5 rows



## Join the Three Views
* yelp_user_gender
* yelp_reviews
* yelp_restaurants


In [24]:
df_business_ratings = spark.sql(\
"SELECT business_id, ROUND(AVG(stars),2) AS average_stars \
FROM yelp_reviews \
GROUP BY business_id")
df_business_ratings.show(10, truncate=False)
df_business_ratings.createOrReplaceTempView('business_ratings')


+----------------------+-------------+
|business_id           |average_stars|
+----------------------+-------------+
|VbxdWrQ7WHSN7S0RUL7Pqg|3.5          |
|_vj8nh6MceJLInLoMRA38g|2.86         |
|VDRlaNF0iohnYTZzmEKzBQ|3.5          |
|OGQ_6nIn4QQL2U6t0XeHtA|4.58         |
|nkh7rzfDVuyKncuKvnUx3Q|3.4          |
|ymsmO5Vy6xKZayua9R0qgA|1.5          |
|AdPuYsduU1o1FAqlDwh8Dw|3.0          |
|rWxAgLSI0Sk1MCpQGd5HDQ|3.92         |
|19m3NtbbP2VX-tDFJB1lKQ|4.0          |
|yGjuMgFdYML350jYX42qvg|2.6          |
+----------------------+-------------+
only showing top 10 rows



In [29]:
df_restaurant_reviews = spark.sql(\
"SELECT R.business_id, R.user_id, R.stars \
FROM yelp_reviews AS R, yelp_restaurants AS B \
WHERE R.business_id = B.business_id")

print "restaurant review count:",  df_restaurant_reviews.count()
df_restaurant_reviews.show(10, truncate=False)


restaurant review count: 2927731
+----------------------+----------------------+-----+
|business_id           |user_id               |stars|
+----------------------+----------------------+-----+
|jQsNFOzDpxPmOurSWCg1vQ|kzyLOqiJvyw_FWFTw2rjiQ|1    |
|jQsNFOzDpxPmOurSWCg1vQ|WZXp9-V2dqRRJqhGgRqueA|4    |
|jQsNFOzDpxPmOurSWCg1vQ|XylT12exfdLiI_3uDLVIpw|5    |
|jQsNFOzDpxPmOurSWCg1vQ|Ji9PeffxjwqPLO7pEfSpKQ|3    |
|jQsNFOzDpxPmOurSWCg1vQ|TLIWzAJPrET0zX4_vgvLhg|3    |
|jQsNFOzDpxPmOurSWCg1vQ|JZEiTNWBwmv6MOOXYCAaMQ|1    |
|jQsNFOzDpxPmOurSWCg1vQ|E56sVQT5-OWfSejJrma8_w|5    |
|jQsNFOzDpxPmOurSWCg1vQ|4WYICo4emecA9r7sPYQkBw|4    |
|jQsNFOzDpxPmOurSWCg1vQ|P8mVj7AZwJTFFH5FXbbmUg|1    |
|jQsNFOzDpxPmOurSWCg1vQ|7Y4NEBQqWg7j-TvrQi6UZQ|2    |
+----------------------+----------------------+-----+
only showing top 10 rows



In [30]:
df_restaurant_reviews = spark.sql(\
"SELECT R.business_id, R.user_id, R.stars, A.average_stars \
FROM yelp_reviews AS R, yelp_restaurants AS B, business_ratings AS A \
WHERE R.business_id = B.business_id AND \
R.business_id = A.business_id")

print "restaurant review count:",  df_restaurant_reviews.count()
df_restaurant_reviews.show(10, truncate=False)


restaurant review count: 2927731
+----------------------+----------------------+-----+-------------+
|business_id           |user_id               |stars|average_stars|
+----------------------+----------------------+-----+-------------+
|--9e1ONYQuAa-CB_Rrw7Tw|4LxKRRIikhr65GfPDW626w|5    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|96aWRa-gy1RrsrFQURagvg|2    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|FQTfUYsCnBryER6L8NFyYA|3    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|nT8zgjoc-PbdBoQsFEXFLw|5    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|7RlyCglsIzhBn081inwvcg|5    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|V6VsbyKsbP555ZbqEJ96Lw|3    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|XBDdatICQD6IL9jfvuWBPw|5    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|vXvsRujK6L8K_aozSVmt_g|4    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|rOIrilMC7VFwFVBeQNiKMw|3    |4.09         |
|--9e1ONYQuAa-CB_Rrw7Tw|Vkj42md-5O3VoOD9pBg6Sw|4    |4.09         |
+----------------------+----------------------+-----+-------------+
only showing to

In [32]:
df_restaurant_reviews = spark.sql(\
"SELECT R.business_id, R.user_id, R.stars, A.average_stars, U.user_gender, U.gender_ratio \
FROM yelp_reviews AS R, yelp_restaurants AS B, business_ratings AS A, yelp_user_gender AS U \
WHERE R.business_id = B.business_id AND \
R.business_id = A.business_id AND \
R.user_id = U.user_id")

print "restaurant review count:",  df_restaurant_reviews.count()
df_restaurant_reviews.show(10, truncate=False)


restaurant review count: 2927731
+----------------------+----------------------+-----+-------------+-----------+------------------+
|business_id           |user_id               |stars|average_stars|user_gender|gender_ratio      |
+----------------------+----------------------+-----+-------------+-----------+------------------+
|pmrHuQiy25xKB86tbOLBlA|-0Ji0nOyFe-4yo8BK4aRLA|4    |4.12         |3          |1.0               |
|L772e6l2Yd0DJEyCBxBNng|-1KKYzibGPyUX-MwkBTlrg|4    |2.95         |1          |0.6869830213321724|
|aNe8ofTYrealxqv7VtFTuw|-1zQA2f_syMAdA04PUWNNw|5    |3.37         |2          |1.0               |
|DyYS-JKXlW2PCr_Gt5JOvQ|-1zQA2f_syMAdA04PUWNNw|5    |3.46         |2          |1.0               |
|HhVmDybpU7L50Kb5A0jXTg|-2Pb5d2WBPtbyGT_be6NDQ|5    |3.45         |1          |0.9983999617901324|
|q3dJQtwZQrrurNT-1bNKgQ|-3bsS2i9xqjNnIA1fRnzIQ|1    |4.06         |1          |0.8373119564000149|
|yNVcnSaMSrTmjJBMrHTy9w|-3i9bhfvrM3F1wsC9XIB8g|3    |3.46         |1        

## Exporting a DataFrame as a CSV File

In [1]:
df_restaurant_reviews_out = df_restaurant_reviews.repartition(1)

path_out = bmos.url('Fall2017', 'export_restaurant_reviews.csv')
df_restaurant_reviews_out.write.csv(path_out, mode="overwrite", \
                                    header="true")

NameError: name 'df_restaurant_reviews' is not defined