# Part 1

In [9]:
# !pip install pyspark
%ls -la

total 2931316
drwxrwxrwx 1 root   root        4096 Nov 24 11:06 [0m[34;42m.[0m/
drwxr-xr-x 1 root   root        4096 Nov 20 11:30 [01;34m..[0m/
drwxr-xr-x 1 jovyan users       4096 Nov 24 11:00 [01;34m.cache[0m/
drwxr-xr-x 1 jovyan users       4096 Nov 24 11:04 [01;34m.ipynb_checkpoints[0m/
drwxr-xr-x 1 jovyan users       4096 Nov 24 11:04 [01;34m.ipython[0m/
drwxr-xr-x 1 jovyan users       4096 Nov 24 11:04 [01;34m.jupyter[0m/
-rw-r--r-- 1 jovyan users       1707 Nov 24 11:06 lab8.ipynb
drwxr-xr-x 1 jovyan users       4096 Nov 24 10:59 [01;34m.local[0m/
-rwxrwxrwx 1 root   root  3001659271 Dec 19  2011 [01;32mtrain_triplets.txt[0m*
drwxrwxrwx 1 root   root        4096 Nov 24 10:20 [34;42myelp-dataset[0m/


In [1]:
import pyspark


In [10]:
from pyspark.sql import SparkSession

# since everyone will be using cluster at the same time
# let's make sure that everyone has resource. that is why 
# the configuration uses dynamic resource allocation and
# maximum 1 executor 
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", "1")\
    .getOrCreate()

In [21]:
songs_df = spark.read.load("./train_triplets.txt",
                     format="csv", sep="\t", inferSchema="true", 
                     header="false")

                                                                                

In [22]:
songs_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)



In [24]:
songs_df = songs_df.withColumnRenamed("_c0", "user")\
                   .withColumnRenamed("_c1", "song")\
                   .withColumnRenamed("_c2", "play_count")

In [25]:
songs_df.createOrReplaceTempView("songs")

In [26]:
played_more_than_10_times = spark.sql("select song from songs where play_count > 10")

In [27]:
played_more_than_10_times.count()

                                                                                

2043582

# Part 2

In [30]:
business = spark.read.json("./yelp-dataset/yelp_academic_dataset_business.json")
reviews = spark.read.json("./yelp-dataset/yelp_academic_dataset_review.json")
users = spark.read.json("./yelp-dataset/yelp_academic_dataset_user.json")
business.createOrReplaceTempView("business")
reviews.createOrReplaceTempView("reviews")
users.createOrReplaceTempView("users")

21/11/24 11:14:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [31]:
# Query 1
spark.sql("select state, count(state) as count from business group by state order by count(state) desc").show()

[Stage 20:>                                                         (0 + 8) / 8]

+-----+-----+
|state|count|
+-----+-----+
|   AZ|56686|
|   NV|36312|
|   ON|33412|
|   NC|14720|
|   OH|14697|
|   PA|11216|
|   QC| 9219|
|   AB| 8012|
|   WI| 5154|
|   IL| 1932|
|   SC| 1162|
|   NY|   22|
|   CA|   19|
|   TX|    6|
|   FL|    4|
|  XGM|    4|
|   AL|    3|
|   WA|    3|
|   CT|    3|
|   VA|    2|
+-----+-----+
only showing top 20 rows



                                                                                

In [36]:
# Query 2
spark.sql("""
select count(distinct(*)) from (
    select explode(split(categories, \",\s*\")) as category from business
)
""").show()



+------------------------+
|count(DISTINCT category)|
+------------------------+
|                    2468|
+------------------------+



                                                                                

In [33]:
# Query 3
spark.sql("""
select category, count(category) from 
    (
        select explode(split(categories, \",\s*\")) as category 
        from business where city=\"Phoenix\"
    )
group by category order by count(category) desc limit 10
""").show()

+-----------------+---------------+
|         category|count(category)|
+-----------------+---------------+
|      Restaurants|           2815|
|         Shopping|           2416|
|    Home Services|           2302|
|             Food|           1672|
| Health & Medical|           1577|
|   Local Services|           1444|
|      Restaurants|           1184|
|       Automotive|           1164|
|    Beauty & Spas|           1115|
|    Home Services|            843|
+-----------------+---------------+





In [34]:
# Query 4 
spark.sql("""
select 
    count(*) as friend_count 
from 
    users 
where 
    size(split(friends, \",\s*\")) > 1000
""").show()



+------------+
|friend_count|
+------------+
|        4166|
+------------+



                                                                                

In [35]:
# Query 5
spark.sql("""
with business_ratings as (
    select 
        business_id, year(to_date(date)) as year, avg(stars) as rating 
    from 
        reviews group by business_id, year(to_date(date))
),
business_2014 as (
    select 
        business_id, rating 
    from 
        business_ratings 
    where 
        year=2014
),
business_2017 as (
    select 
        business_id, rating 
    from 
        business_ratings where year=2017
)
select 
    business_2014.business_id, business_2014.rating, business_2017.rating 
from 
    business_2014 
inner join 
    business_2017 
on 
    business_2014.business_id=business_2017.business_id 
where 
    business_2017.rating < business_2014.rating 
""").show()

                                                                                

+--------------------+------------------+------------------+
|         business_id|            rating|            rating|
+--------------------+------------------+------------------+
|WU6mFeLp8PASoA9jd...|               4.0|               2.2|
|45rWYQPlQ4x5cFU0u...|              4.25|               3.0|
|MGsV9nuGOr9fxtzJP...|3.4285714285714284|               3.0|
|tWjfgVtTD5n01Cq9d...|3.6666666666666665|3.6451612903225805|
|JO5_Frcbp9J732VNn...|               2.2|1.2857142857142858|
|btQ4Rc7am0KWNIcgt...|               4.5| 2.857142857142857|
|FXdAittxUsIR-SWPu...|3.1666666666666665|2.3333333333333335|
|Ky67Nk2SLRRaHSYuz...|               5.0|1.3333333333333333|
|AiEKjZPj2J3MpnBZk...|               4.0|               3.5|
|Ve_RgUoXVEeNnpvmS...|               4.0|               3.5|
|p-8PgN7S4VUUXH6y5...|3.3333333333333335|2.2222222222222223|
|kZ36LGvnwetEq-seq...| 3.769230769230769|3.6666666666666665|
|px2ZZOPzA8-xG_VhE...| 2.533333333333333|1.6964285714285714|
|UoPOED2pSAQjf4Gz4...|  

In [123]:
# Query 6
spark.sql(
"""with p1 as 
    (SELECT business_id, user_id, MAX(to_date(date))
        FROM reviews GROUP BY user_id, business_id),
    p2 as (SELECT business_id FROM business WHERE categories LIKE ('%Chinese%') 
        AND categories LIKE ('%Restaurants%')
    ),
    p3 as (SELECT p1.user_id 
        from p1 inner 
        join p2 
        on p2.business_id = p1.business_id)
    SELECT COUNT(*) FROM users inner join p3 on p3.user_id = users.user_id
""").show()

                                                                                

+--------+
|count(1)|
+--------+
|  252537|
+--------+

