## Prepare Spark

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", "1")\
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/20 23:08:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Songs Dataset

In [3]:
songs_df = spark.read.load("./songs_dataset/train_triplets.txt",
                     format="csv", sep="\t", inferSchema="true", 
                     header="false")

                                                                                

In [4]:
songs_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)



In [5]:
songs_df = songs_df.withColumnRenamed("_c0", "user")\
                   .withColumnRenamed("_c1", "song")\
                   .withColumnRenamed("_c2", "play_count")

In [6]:
songs_df.printSchema()

root
 |-- user: string (nullable = true)
 |-- song: string (nullable = true)
 |-- play_count: integer (nullable = true)



## Saving to Parquet

In [7]:
songs_df.write.parquet("songs.parquet")

21/11/20 23:08:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
21/11/20 23:08:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
21/11/20 23:08:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

## Trivial Queries

In [8]:
songs_df.createOrReplaceTempView("songs")

In [9]:
play_count_10 = spark.sql("select song from songs where play_count > 10")

In [10]:
play_count_10.show()

+------------------+
|              song|
+------------------+
|SOQTRKE12A6701C596|
|SOVVNSS12A58291F72|
|SOEPZQS12A8C1436C7|
|SOWPAXV12A67ADA046|
|SOXGQEM12AB0181D35|
|SOWUVFQ12AB018740E|
|SOVFDZD12A6D4F8EAE|
|SOADQPP12A67020C82|
|SOBONKR12A58A7A7E0|
|SOPSOHT12A67AE0235|
|SOTTNZU12A6D4FA237|
|SOWNNPR12A6D4FB51B|
|SOADGFH12A8C143D89|
|SOAFOBL12AF72A25BA|
|SOAMPRJ12A8AE45F38|
|SOAUXEN12A81C23960|
|SOCFPSZ12A6D4FCA89|
|SOCJCVE12A8C13CDDB|
|SODTJFU12B0B80C9BE|
|SOFFGTH12A67AE0925|
+------------------+
only showing top 20 rows



In [11]:
play_count_10.count()

                                                                                

2043582

## Yelp Dataset

In [12]:
business = spark.read.json("./yelp-dataset/yelp_academic_dataset_business.json")
reviews = spark.read.json("./yelp-dataset/yelp_academic_dataset_review.json")
users = spark.read.json("./yelp-dataset/yelp_academic_dataset_user.json")

21/11/20 23:09:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [13]:
business.createOrReplaceTempView("business")
reviews.createOrReplaceTempView("reviews")
users.createOrReplaceTempView("users")

### Inspecting Schemas

In [14]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [15]:
reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [16]:
users.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



### Query 1

In [17]:
spark.sql(
"""
SELECT state, count(state) 
AS count 
FROM business 
GROUP BY state 
ORDER BY count(state) DESC
""").show()



+-----+-----+
|state|count|
+-----+-----+
|   AZ|56686|
|   NV|36312|
|   ON|33412|
|   NC|14720|
|   OH|14697|
|   PA|11216|
|   QC| 9219|
|   AB| 8012|
|   WI| 5154|
|   IL| 1932|
|   SC| 1162|
|   NY|   22|
|   CA|   19|
|   TX|    6|
|   FL|    4|
|  XGM|    4|
|   AL|    3|
|   WA|    3|
|   CT|    3|
|   VA|    2|
+-----+-----+
only showing top 20 rows



                                                                                

### Query 2

In [18]:
spark.sql(
"""
SELECT count(distinct(*)) FROM (
    SELECT explode(split(categories, \",\s*\")) 
    AS category 
    FROM business
)
""").show()

[Stage 13:>                                                         (0 + 8) / 8]

+------------------------+
|count(DISTINCT category)|
+------------------------+
|                    2468|
+------------------------+





### Query 3

In [19]:
spark.sql(
"""
SELECT category, count(category) FROM 
    (
        SELECT explode(split(categories, \",\s*\")) 
        AS category 
        FROM business WHERE city=\"Phoenix\"
    )
GROUP BY category 
ORDER BY count(category) DESC LIMIT 10
""").show()

+-----------------+---------------+
|         category|count(category)|
+-----------------+---------------+
|      Restaurants|           2815|
|         Shopping|           2416|
|    Home Services|           2302|
|             Food|           1672|
| Health & Medical|           1577|
|   Local Services|           1444|
|      Restaurants|           1184|
|       Automotive|           1164|
|    Beauty & Spas|           1115|
|    Home Services|            843|
+-----------------+---------------+



### Query 4

In [20]:
spark.sql(
"""
SELECT count(*) 
AS friend_count 
FROM users 
WHERE size(split(friends, \",\s*\")) > 1000
""").show()



+------------+
|friend_count|
+------------+
|        4166|
+------------+



                                                                                

### Query 5

In [21]:
spark.sql(
"""
WITH business_ratings AS (
    SELECT business_id, year(to_date(date)) AS year, avg(stars) AS rating 
    FROM reviews GROUP BY business_id, year(to_date(date))
),
business_2014 AS (
    SELECT business_id, rating 
    FROM business_ratings
    WHERE year=2014
),
business_2017 AS (
    SELECT business_id, rating 
    FROM business_ratings WHERE year=2017
)
SELECT business_2014.business_id, business_2014.rating, business_2017.rating 
FROM business_2014 
INNER JOIN business_2017 
ON business_2014.business_id=business_2017.business_id 
WHERE business_2017.rating < business_2014.rating 
""").show()

                                                                                

+--------------------+------------------+------------------+
|         business_id|            rating|            rating|
+--------------------+------------------+------------------+
|VHsNB3pdGVcRgs6C3...|3.3461538461538463|2.7857142857142856|
|avljb14OB8UkFTHVo...|               4.0|               3.0|
|dk1MV0MP32Xq-iBxz...|2.1379310344827585|2.0392156862745097|
|GGxnlrfvWy7LFvjN5...| 4.214285714285714| 4.097560975609756|
|GJ2TXArxyuF8f79Wb...|               5.0|               4.8|
|vnNRBq0zVIH-k1BA9...|               4.0|               3.5|
|M4D-cZ9_9Bw-gMi0d...|               5.0| 4.923076923076923|
|RMjCnixEY5i12Ciqn...|3.3333333333333335|               2.7|
|jfdUtdkXogP2kjK5K...|              3.25|               3.0|
|XgX0JhqleOnH-ezSe...|               4.0|2.3333333333333335|
|yMKisHBS_Ia8Dr27A...|               3.5|               2.6|
|C5H-eZfnxBkYN40xc...| 4.333333333333333| 2.111111111111111|
|9HG09ZNqzrEUz-ipS...|               2.6|               2.0|
|yNp0G1G4-iYNnuC7V...|  

### HW: Query 6

The goal is to get the list of people whose friends last time wrote a review for a chinese restaurant.

**The outline**: 

You will need to look at `reviews`, group by `user_id`. You are interested only in the last review that a person wrote. The `date` is stored as a string. Convert it to date format using `to_date`. Find the most recent date for a user using aggregation function `max`. Note that a person can write several reviews on the same day. Treat all of them as most recent. If the review was written for a Chinese restaurant, we should find who has this person in their friend list. The list of `friends` in stored in table `users`. The field `friends` is also stored as string, and you should split it first.

In [22]:
spark.sql(
"""
WITH 
last_reviews AS (
    SELECT user_id, business_id
    FROM reviews
    WHERE EXISTS (
        SELECT user_id, max(to_date(date)) AS last 
        FROM reviews
        GROUP BY user_id
    )
),
user_friends AS (
    SELECT user_id, explode(split(friends, \",\s*\")) 
    AS friend
    FROM users
),
business_categories AS (
    SELECT business_id, explode(split(categories, \",\s*\")) 
    AS category
    FROM business
),
chinese_businesses AS (
    SELECT *
    FROM business_categories
    WHERE category='Chinese'
)
SELECT friend as user_id
FROM last_reviews
LEFT JOIN user_friends ON last_reviews.user_id=user_friends.user_id 
INNER JOIN chinese_businesses ON last_reviews.business_id=chinese_businesses.business_id
""").show()

[Stage 46:>                                                         (0 + 1) / 1]

+--------------------+
|             user_id|
+--------------------+
|AfuFQIWXz50GE4TZ5...|
| ivm6bLIy2kGx-tuj...|
| L6wUxmUErIR7FJij...|
| kNuQsqcqTOy_5lQ_...|
| Xxvz5g67eaCr3emn...|
| hk8o_YrQY31c9Sm5...|
| SeWZYXztsqDvuMgF...|
| HMfEpwxTmlxxw0Zo...|
| leGhN-KSfIdIho2I...|
| Z_ZLQ9mj03sVZ9G7...|
| mzwmFGcLqaR0IHqd...|
| OSG63vuflUELLIJT...|
| 7qyzGvU8NN3psfQi...|
| DKsGJpKOjRWSeWbs...|
| ugDuOj80BPxXsPSP...|
| _hm8U41mh5uhd4t6...|
| INbtAUNKtZSXr20_...|
| x3Z6mJPgtnF5ni1e...|
|0MUXAy_zbclghJe6F...|
| fb9sMy7ixsOQyW4B...|
+--------------------+
only showing top 20 rows



                                                                                

To determine number of rows returned, we can call `count` of a dataframe. However, for me it yields memory error as well as count in SQL query.