In [2]:
from pyspark.sql import SparkSession

# since everyone will be using cluster at the same time
# let's make sure that everyone has resource. that is why 
# the configuration uses dynamic resource allocation and
# maximum 1 executor 
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", "1")\
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/23 07:00:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
songs_df = spark.read.load("./train_triplets.txt",
                     format="csv", sep="\t", inferSchema="true", 
                     header="false")


                                                                                

In [3]:
songs_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)



In [6]:
songs_df.createOrReplaceTempView("songs")
songs_df = songs_df.withColumnRenamed("_c0", "user")\
                   .withColumnRenamed("_c1", "song")\
                   .withColumnRenamed("_c2", "play_count")
songs_df.printSchema()

root
 |-- user: string (nullable = true)
 |-- song: string (nullable = true)
 |-- play_count: integer (nullable = true)



In [7]:
played_more_than_10_times = spark.sql("select song from songs where play_count > 10")

In [8]:
played_more_than_10_times.count()

                                                                                

2043582

In [17]:
username = "b80344d063b5ccb3212f76538f3d9e43d87dca9e"
played_by_user = spark.sql(f"select song from songs where user=\"{username}\"")
played_by_user.count()

                                                                                

104

In [16]:
first_ten_entries = spark.sql(f"select user from songs limit 10")
print(first_ten_entries.collect()[0])

Row(user='b80344d063b5ccb3212f76538f3d9e43d87dca9e')


In [18]:
username = "b80344d063b5ccb3212f76538f3d9e43d87dca9e"
played_by_user_more_than_10_times = spark.sql(f"select song from songs where user=\"{username}\" and play_count > 10")
played_by_user_more_than_10_times.count()

                                                                                

0

## Yelp dataset

In [3]:
business = spark.read.json("./yelp-dataset/yelp_academic_dataset_business.json")
reviews = spark.read.json("./yelp-dataset/yelp_academic_dataset_review.json")
users = spark.read.json("./yelp-dataset/yelp_academic_dataset_user.json")
business.createOrReplaceTempView("business")
reviews.createOrReplaceTempView("reviews")
users.createOrReplaceTempView("users")

21/11/23 07:02:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [4]:
spark.sql("select state, count(state) as count from business group by state order by count(state) desc").show()



+-----+-----+
|state|count|
+-----+-----+
|   AZ|56686|
|   NV|36312|
|   ON|33412|
|   NC|14720|
|   OH|14697|
|   PA|11216|
|   QC| 9219|
|   AB| 8012|
|   WI| 5154|
|   IL| 1932|
|   SC| 1162|
|   NY|   22|
|   CA|   19|
|   TX|    6|
|   FL|    4|
|  XGM|    4|
|   AL|    3|
|   WA|    3|
|   CT|    3|
|   VA|    2|
+-----+-----+
only showing top 20 rows



                                                                                

In [7]:
spark.sql("""
select count(distinct(*)) from (
    select explode(split(categories, \",\s*\")) as category from business
)
""").show()

+------------------------+
|count(DISTINCT category)|
+------------------------+
|                    2468|
+------------------------+



                                                                                

In [36]:
spark.sql("""
select category, count(category) from 
    (
        select explode(split(categories, \",\s*\")) as category 
        from business where city=\"Phoenix\"
    )
group by category order by count(category) desc limit 10
""").show()



+-----------------+---------------+
|         category|count(category)|
+-----------------+---------------+
|      Restaurants|           2815|
|         Shopping|           2416|
|    Home Services|           2302|
|             Food|           1672|
| Health & Medical|           1577|
|   Local Services|           1444|
|      Restaurants|           1184|
|       Automotive|           1164|
|    Beauty & Spas|           1115|
|    Home Services|            843|
+-----------------+---------------+



                                                                                

In [18]:
spark.sql("""
with business_category (select *, explode(split(categories, \",\s*\")) as category from business)
select categories from business where categories like '%Restaurant%' and categories like '%Chinese%'
""").show()

+--------------------+
|          categories|
+--------------------+
|Specialty Food, R...|
|Chinese, Dim Sum,...|
|Chinese, Restaurants|
|Restaurants, Chinese|
|Chinese, Restaurants|
|Chinese, Restaurants|
|Dim Sum, Chinese,...|
|Chinese, Restaurants|
|Local Flavor, Chi...|
|Restaurants, Hawa...|
|Sushi Bars, Buffe...|
|Chinese, Restaura...|
|Hakka, Indian, As...|
|Buffets, Chinese,...|
|Seafood, Chinese,...|
|Chinese, Restaurants|
|Do-It-Yourself Fo...|
|Chinese, Restaurants|
|Chinese, Seafood,...|
|Restaurants, Fast...|
+--------------------+
only showing top 20 rows



In [11]:
spark.sql("""
select 
    count(*) as friend_count 
from 
    users 
where 
    size(split(friends, \",\s*\")) > 1000
""").show()



+------------+
|friend_count|
+------------+
|        4166|
+------------+



                                                                                

In [37]:
spark.sql("""
with business_ratings as (
    select 
        business_id, year(to_date(date)) as year, avg(stars) as rating 
    from 
        reviews group by business_id, year(to_date(date))
),
business_2014 as (
    select 
        business_id, rating 
    from 
        business_ratings 
    where 
        year=2014
),
business_2017 as (
    select 
        business_id, rating 
    from 
        business_ratings where year=2017
)
select 
    business_2014.business_id, business_2014.rating, business_2017.rating 
from 
    business_2014 
inner join 
    business_2017 
on 
    business_2014.business_id=business_2017.business_id 
where 
    business_2017.rating < business_2014.rating 
""").show()



+--------------------+------------------+------------------+
|         business_id|            rating|            rating|
+--------------------+------------------+------------------+
|WU6mFeLp8PASoA9jd...|               4.0|               2.2|
|45rWYQPlQ4x5cFU0u...|              4.25|               3.0|
|MGsV9nuGOr9fxtzJP...|3.4285714285714284|               3.0|
|tWjfgVtTD5n01Cq9d...|3.6666666666666665|3.6451612903225805|
|JO5_Frcbp9J732VNn...|               2.2|1.2857142857142858|
|btQ4Rc7am0KWNIcgt...|               4.5| 2.857142857142857|
|FXdAittxUsIR-SWPu...|3.1666666666666665|2.3333333333333335|
|Ky67Nk2SLRRaHSYuz...|               5.0|1.3333333333333333|
|AiEKjZPj2J3MpnBZk...|               4.0|               3.5|
|Ve_RgUoXVEeNnpvmS...|               4.0|               3.5|
|p-8PgN7S4VUUXH6y5...|3.3333333333333335|2.2222222222222223|
|kZ36LGvnwetEq-seq...| 3.769230769230769|3.6666666666666665|
|px2ZZOPzA8-xG_VhE...| 2.533333333333333|1.6964285714285714|
|UoPOED2pSAQjf4Gz4...|  

                                                                                

## Last query

In [35]:
# business.printSchema()
# reviews.printSchema()
# users.printSchema()

spark.sql("""



with last_reviews(
    with chinese_restaurant(
    select * from business where categories like '%Chinese%' and categories like '%Restaurant%'
    )
    select user_id, max(to_date(date)) 
    from reviews join chinese_restaurant on reviews.business_id=chinese_restaurant.business_id
    group by user_id
)

select distinct(explode(split(friends, \",\s*\"))) as friend
from last_reviews join users on last_reviews.user_id=users.user_id

""").show()



+--------------------+
|              friend|
+--------------------+
| -00Egx3njsVJcffb...|
| -04Jyy-WuPWpllQW...|
| -0LF0KoZLL8OiDU4...|
| -0oIA-m3Wc11-jc0...|
| -0zUpn_6kTWXHWzB...|
| -1A3SCg5pLrHw348...|
| -1vHfy-zKYr0BEyv...|
| -2_quDPr8JZezokU...|
| -2ng4a1kUcsyuqa-...|
| -2vxL7OSMUAuQl2c...|
| -42EDLVIoALtrtBM...|
| -50r1_gkfzZKKeTZ...|
| -5LnQ_ArAn0o_RCc...|
| -6DoXmdXEy_P5N-Q...|
| -6uXqbL1TvTHLyfs...|
| -7jaf-ejcyTX_JTD...|
| -8Rj4t1fFxpWNCs9...|
| -9Eg_UzsR8YiV3UY...|
| -9Ji22rM656HohYE...|
| -9nBr-QuSuHVHT6l...|
+--------------------+
only showing top 20 rows



                                                                                