# Part 1

In [3]:
!pip install pyspark
%ls -la

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 80 kB/s  eta 0:00:01
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 9.0 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=13c3cfbe84957ea59257c1737513e745e08f76875d0b60622ef12e05f108a1ae
  Stored in directory: /home/laggy/.cache/pip/wheels/23/f6/d3/110e53bd43baeb8d7d38049733d48e39cbecd056f01dba7ee8
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
total 7458644
drwxr-xr-x 44 laggy laggy        4096 Dec  1 16:24  [0m[01;34m.[0m/
drwxr-xr-x  3 root  root         4096 Jun 10 17:28  [01;34m..[0m/
-rw-------  1 laggy laggy         104

In [4]:
import pyspark


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", "1")\
    .getOrCreate()

In [21]:
songs_df = spark.read.load("/train_triplets.txt",
                     format="csv", sep="\t", inferSchema="true", 
                     header="false")

In [14]:
songs_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)



In [15]:
songs_df = songs_df.withColumnRenamed("_c0", "user")\
                   .withColumnRenamed("_c1", "song")\
                   .withColumnRenamed("_c2", "play_count")

In [16]:
songs_df.createOrReplaceTempView("songs")

In [17]:
played_more_than_10_times = spark.sql("select song from songs where play_count > 10")

In [18]:
played_more_than_10_times.count()

2043582

# Part 2

In [25]:
business = spark.read.json("yelp-dataset/yelp_academic_dataset_business.json")
reviews = spark.read.json("yelp-dataset/yelp_academic_dataset_review.json")
users = spark.read.json("yelp-dataset/yelp_academic_dataset_user.json")
business.createOrReplaceTempView("business")
reviews.createOrReplaceTempView("reviews")
users.createOrReplaceTempView("users")

In [26]:
spark.sql("select state, count(state) as count from business group by state order by count(state) desc").show()

+-----+-----+
|state|count|
+-----+-----+
|   AZ|56686|
|   NV|36312|
|   ON|33412|
|   NC|14720|
|   OH|14697|
|   PA|11216|
|   QC| 9219|
|   AB| 8012|
|   WI| 5154|
|   IL| 1932|
|   SC| 1162|
|   NY|   22|
|   CA|   19|
|   TX|    6|
|   FL|    4|
|  XGM|    4|
|   AL|    3|
|   WA|    3|
|   CT|    3|
|   VA|    2|
+-----+-----+
only showing top 20 rows



In [27]:
spark.sql("""
select count(distinct(*)) from (
    select explode(split(categories, \",\s*\")) as category from business
)
""").show()

+------------------------+
|count(DISTINCT category)|
+------------------------+
|                    2468|
+------------------------+



In [28]:
spark.sql("""
select category, count(category) from 
    (
        select explode(split(categories, \",\s*\")) as category 
        from business where city=\"Phoenix\"
    )
group by category order by count(category) desc limit 10
""").show()

+-----------------+---------------+
|         category|count(category)|
+-----------------+---------------+
|      Restaurants|           2815|
|         Shopping|           2416|
|    Home Services|           2302|
|             Food|           1672|
| Health & Medical|           1577|
|   Local Services|           1444|
|      Restaurants|           1184|
|       Automotive|           1164|
|    Beauty & Spas|           1115|
|    Home Services|            843|
+-----------------+---------------+



In [29]:
spark.sql("""
select 
    count(*) as friend_count 
from 
    users 
where 
    size(split(friends, \",\s*\")) > 1000
""").show()

+------------+
|friend_count|
+------------+
|        4166|
+------------+



In [30]:
spark.sql("""
with business_ratings as (
    select 
        business_id, year(to_date(date)) as year, avg(stars) as rating 
    from 
        reviews group by business_id, year(to_date(date))
),
business_2014 as (
    select 
        business_id, rating 
    from 
        business_ratings 
    where 
        year=2014
),
business_2017 as (
    select 
        business_id, rating 
    from 
        business_ratings where year=2017
)
select 
    business_2014.business_id, business_2014.rating, business_2017.rating 
from 
    business_2014 
inner join 
    business_2017 
on 
    business_2014.business_id=business_2017.business_id 
where 
    business_2017.rating < business_2014.rating 
""").show()

+--------------------+------------------+------------------+
|         business_id|            rating|            rating|
+--------------------+------------------+------------------+
|VHsNB3pdGVcRgs6C3...|3.3461538461538463|2.7857142857142856|
|avljb14OB8UkFTHVo...|               4.0|               3.0|
|dk1MV0MP32Xq-iBxz...|2.1379310344827585|2.0392156862745097|
|GGxnlrfvWy7LFvjN5...| 4.214285714285714| 4.097560975609756|
|GJ2TXArxyuF8f79Wb...|               5.0|               4.8|
|vnNRBq0zVIH-k1BA9...|               4.0|               3.5|
|M4D-cZ9_9Bw-gMi0d...|               5.0| 4.923076923076923|
|RMjCnixEY5i12Ciqn...|3.3333333333333335|               2.7|
|jfdUtdkXogP2kjK5K...|              3.25|               3.0|
|XgX0JhqleOnH-ezSe...|               4.0|2.3333333333333335|
|yMKisHBS_Ia8Dr27A...|               3.5|               2.6|
|C5H-eZfnxBkYN40xc...| 4.333333333333333| 2.111111111111111|
|9HG09ZNqzrEUz-ipS...|               2.6|               2.0|
|yNp0G1G4-iYNnuC7V...|  

In [35]:
spark.sql(
"""
with 
last_reviews AS (
    SELECT user_id, business_id
    FROM reviews
    WHERE EXISTS (
        SELECT user_id, max(to_date(date)) AS last 
        FROM reviews
        GROUP BY user_id
    )
),
user_friends AS (
    SELECT user_id, explode(split(friends, \",\s*\")) 
    AS friend
    FROM users
),
business_categories AS (
    SELECT business_id, explode(split(categories, \",\s*\")) 
    AS category
    FROM business
),
chinese_businesses AS (
    SELECT *
    FROM business_categories
    WHERE category='Chinese'
)
SELECT friend as user_id
FROM last_reviews
LEFT JOIN user_friends ON last_reviews.user_id=user_friends.user_id 
INNER JOIN chinese_businesses ON last_reviews.business_id=chinese_businesses.business_id
""").show()

+--------------------+
|             user_id|
+--------------------+
|AfuFQIWXz50GE4TZ5...|
| ivm6bLIy2kGx-tuj...|
| L6wUxmUErIR7FJij...|
| kNuQsqcqTOy_5lQ_...|
| Xxvz5g67eaCr3emn...|
| hk8o_YrQY31c9Sm5...|
| SeWZYXztsqDvuMgF...|
| HMfEpwxTmlxxw0Zo...|
| leGhN-KSfIdIho2I...|
| Z_ZLQ9mj03sVZ9G7...|
| mzwmFGcLqaR0IHqd...|
| OSG63vuflUELLIJT...|
| 7qyzGvU8NN3psfQi...|
| DKsGJpKOjRWSeWbs...|
| ugDuOj80BPxXsPSP...|
| _hm8U41mh5uhd4t6...|
| INbtAUNKtZSXr20_...|
| x3Z6mJPgtnF5ni1e...|
|0MUXAy_zbclghJe6F...|
| fb9sMy7ixsOQyW4B...|
+--------------------+
only showing top 20 rows

