In [1]:
#Cell A
try:
    spark.stop()
except:
    pass

from pyspark.sql import SparkSession
JAR = "/home/jovyan/jars/postgresql-42.7.4.jar"

spark = (
    SparkSession.builder
    .appName("yelp-queries")
    # make the driver & executors see the jar no matter what
    .config("spark.jars", JAR)
    .config("spark.driver.extraClassPath", JAR)
    .config("spark.executor.extraClassPath", JAR)
    .config("spark.sql.files.maxPartitionBytes", "268435456")
    .config("spark.sql.shuffle.partitions", "64")
    .getOrCreate()
)

# sanity: this should NOT raise
spark._jvm.java.lang.Class.forName("org.postgresql.Driver")
print("Postgres JDBC driver is loaded ✅")




Postgres JDBC driver is loaded ✅


In [2]:
q1 = spark.read.parquet("/data/parquet/_tmp_q1")
q2 = spark.read.parquet("/data/parquet/_tmp_q2")
q3 = spark.read.parquet("/data/parquet/_tmp_q3")
q4 = spark.read.parquet("/data/parquet/_tmp_q4")

for name, df in {"q1": q1, "q2": q2, "q3": q3, "q4": q4}.items():
    print(name, df.count())


q1 397
q2 257
q3 10193
q4 485


In [3]:
jdbc = {
    "url": "jdbc:postgresql://db:5432/yelp",
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver",
    "batchsize": "5000",
}

for name, df in {"q1": q1, "q2": q2, "q3": q3, "q4": q4}.items():
    (df.write.format("jdbc")
       .options(**jdbc)
       .option("dbtable", f"public.{name}")
       .mode("overwrite")
       .save())
print("Wrote q1..q4 to Postgres ✅")


Wrote q1..q4 to Postgres ✅


In [None]:
#Cell B
biz = spark.read.parquet("/data/parquet/business")
tip = spark.read.parquet("/data/parquet/tip")
rev = spark.read.json("/data/raw/yelp_academic_dataset_review.json")
usr = spark.read.json("/data/raw/yelp_academic_dataset_user.json")
chk = spark.read.json("/data/raw/yelp_academic_dataset_checkin.json")

for v in ["business","tip","review","yelp_user","checkin_raw"]:
    try: spark.catalog.dropTempView(v)
    except: pass

biz.createOrReplaceTempView("business")
tip.createOrReplaceTempView("tip")
rev.createOrReplaceTempView("review")
usr.createOrReplaceTempView("yelp_user")
chk.createOrReplaceTempView("checkin_raw")





In [None]:
# Cell C
from pyspark.sql import functions as F

# Row counts
for t in ["business","review","yelp_user","checkin_raw","tip"]:
    spark.sql(f"SELECT '{t}' AS tbl, COUNT(*) AS n FROM {t}").show()

# Categories presence
cats = spark.sql("""
SELECT
  SUM(CASE WHEN categories IS NULL OR length(categories)=0 THEN 1 ELSE 0 END) AS null_or_empty,
  SUM(CASE WHEN categories IS NOT NULL AND length(categories)>0 THEN 1 ELSE 0 END) AS nonnull
FROM business
""")
cats.show()
assert cats.collect()[0]["nonnull"] > 0, "business.categories are all null/empty"

# Checkin strings presence
ckn = spark.sql("""
SELECT COUNT(*) AS nonempty_checkins
FROM checkin_raw
WHERE date IS NOT NULL AND length(date)>0
""")
ckn.show()
assert ckn.collect()[0]["nonempty_checkins"] > 0, "checkin_raw.date is empty"

# Review ↔ business join reachability
jcnt = spark.sql("SELECT COUNT(*) AS joined FROM review r JOIN business b USING (business_id)")
jcnt.show()
assert jcnt.collect()[0]["joined"] > 0, "review × business join produced zero rows"




In [None]:
# Cell D
rev_small = spark.table("review").sample(False, 0.05, seed=42)
rev_small.cache(); rev_small.count()
rev_small.createOrReplaceTempView("review")  # override with sample




In [None]:
#Cell E
from pyspark.sql import functions as F

# Business → category rows
(spark.table("business")
 .select("business_id","name","city","state","stars","review_count","is_open","categories")
 .withColumn("category", F.explode(F.split(F.col("categories"), ",\\s*")))
 .withColumn("category", F.trim(F.lower(F.col("category"))))
 .dropna(subset=["category"])
).createOrReplaceTempView("business_category")

# Checkin counts per business
(spark.table("checkin_raw")
 .withColumn("ts", F.explode(F.split(F.col("date"), ",\\s*")))
 .withColumn("checkin_ts", F.to_timestamp(F.col("ts")))
 .drop("ts","date")
 .groupBy("business_id").count().withColumnRenamed("count","checkin_events")
).createOrReplaceTempView("checkin_counts")

# User elite flag
spark.sql("""
  SELECT *, CASE WHEN elite IS NOT NULL AND length(elite)>0 THEN 1 ELSE 0 END AS is_elite
  FROM yelp_user
""").createOrReplaceTempView("yelp_user_flag")




In [None]:
#Cell F
q1 = spark.sql("""
WITH r AS (
  SELECT business_id, avg(stars) AS avg_star, count(*) AS n_reviews
  FROM review GROUP BY business_id
)
SELECT bc.category,
       round(avg(r.avg_star),3) AS avg_stars_across_biz,
       sum(r.n_reviews) AS total_reviews,
       count(*) AS n_businesses
FROM r JOIN business_category bc ON r.business_id = bc.business_id
GROUP BY bc.category
HAVING sum(r.n_reviews) >= 300 AND count(*) >= 8
ORDER BY avg_stars_across_biz DESC, total_reviews DESC
""")
q1.show(20, truncate=False)



In [None]:
#Cell G
q2 = spark.sql("""
WITH rb AS (
  SELECT r.user_id, b.city, b.state, r.stars
  FROM review r JOIN business b ON r.business_id = b.business_id
)
SELECT rb.city, rb.state, u.is_elite,
       count(*) AS n_reviews,
       round(avg(stars),3) AS avg_stars,
       round(stddev_pop(stars),3) AS sd_stars
FROM rb JOIN yelp_user_flag u USING(user_id)
GROUP BY rb.city, rb.state, u.is_elite
HAVING count(*) >= 120
ORDER BY rb.city, rb.state, u.is_elite DESC
""")
q2.show(20, truncate=False)



In [None]:
#Cell H
q3 = spark.sql("""
WITH agg AS (
  SELECT business_id, avg(stars) AS avg_star, count(*) AS n_reviews
  FROM review GROUP BY business_id
)
SELECT b.business_id, b.city, b.state, b.name,
       ck.checkin_events, agg.avg_star, agg.n_reviews
FROM agg
JOIN checkin_counts ck USING(business_id)
JOIN business b USING(business_id)
WHERE agg.n_reviews >= 8 AND ck.checkin_events >= 1
ORDER BY agg.n_reviews DESC
""")
q3.show(20, truncate=False)


In [None]:
#Cell I
q4 = spark.sql("""
WITH r AS (
  SELECT business_id, avg(stars) AS avg_star, count(*) AS n_reviews
  FROM review GROUP BY business_id
)
SELECT bc.category, b.is_open,
       count(*) AS n_businesses,
       round(avg(r.avg_star),3) AS avg_star_across_biz,
       sum(r.n_reviews) AS total_reviews
FROM business b
JOIN r USING(business_id)
JOIN business_category bc USING(business_id)
GROUP BY bc.category, b.is_open
HAVING sum(r.n_reviews) >= 300 AND count(*) >= 8
ORDER BY bc.category, b.is_open DESC
""")
q4.show(20, truncate=False)



In [None]:
# Cell J
q1.limit(50).write.mode("overwrite").parquet("/data/parquet/q1")
q2.limit(200).write.mode("overwrite").parquet("/data/parquet/q2")
q3.limit(200).write.mode("overwrite").parquet("/data/parquet/q3")
q4.limit(200).write.mode("overwrite").parquet("/data/parquet/q4")

q1.limit(50).coalesce(1).write.mode("overwrite").option("header","true").csv("/data/csv/q1")
q2.limit(200).coalesce(1).write.mode("overwrite").option("header","true").csv("/data/csv/q2")
q3.limit(200).coalesce(1).write.mode("overwrite").option("header","true").csv("/data/csv/q3")
q4.limit(200).coalesce(1).write.mode("overwrite").option("header","true").csv("/data/csv/q4")


In [None]:
# TEMP: persist current results so we don't lose work on restart
q1.write.mode("overwrite").parquet("/data/parquet/_tmp_q1")
q2.write.mode("overwrite").parquet("/data/parquet/_tmp_q2")
q3.write.mode("overwrite").parquet("/data/parquet/_tmp_q3")
q4.write.mode("overwrite").parquet("/data/parquet/_tmp_q4")
print("Saved q1..q4 to /data/parquet/_tmp_*")


In [None]:
# Cell: write q1..q4 to Postgres
jdbc = {
    "url": "jdbc:postgresql://db:5432/yelp",
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver",
    "batchsize": "5000",
}

for name, df in {"q1": q1, "q2": q2, "q3": q3, "q4": q4}.items():
    (df.write.format("jdbc")
       .options(**jdbc)
       .option("dbtable", f"public.{name}")
       .mode("overwrite")
       .save())


In [None]:
# Create helpful indexes (safe to re-run)
import os
os.system(r"""
docker exec -i yelp-db psql -U postgres -d yelp <<'SQL'
CREATE INDEX IF NOT EXISTS idx_business_city_state ON business(city, state);
CREATE INDEX IF NOT EXISTS idx_q1_cat        ON q1(category);
CREATE INDEX IF NOT EXISTS idx_q2_city_state ON q2(city, state);
CREATE INDEX IF NOT EXISTS idx_q3_biz        ON q3(business_id);
CREATE INDEX IF NOT EXISTS idx_q4_cat_open   ON q4(category, is_open);
SQL
""")
