In [0]:
%run "../Yelp Data Analysis/ADLS Gen2 Config"

In [0]:
%run "../Yelp Data Analysis/Convert JSON to Parquet"

In [0]:
%run "../Yelp Data Analysis/Load Data"

In [0]:
from pyspark.sql.functions import col, to_date, year, month, desc, count, row_number, rank
from pyspark.sql import Window
from pyspark.sql.functions import broadcast

##### Covert JSON to DELTA

In [0]:
df_business = spark.read.json("abfss://raw@sadataanalysisyelp.dfs.core.windows.net/yelp_academic_dataset_business.json")
df_business.write.mode("overwrite").parquet("abfss://raw@sadataanalysisyelp.dfs.core.windows.net/json_to_delta/yelp_academic_dataset_business.delta")

#### Partition tip dataset by year and month using date column

In [0]:
df_tip = df_tip.withColumn("tip_year", year(to_date(col("date"))))
df_tip = df_tip.withColumn("tip_month", month(to_date(col("date"))))

df_tip.write.mode("overwrite").partitionBy("tip_year","tip_month").parquet("abfss://raw@sadataanalysisyelp.dfs.core.windows.net/tip_partitioned_by_year_and_month/")


#### repartition vs coalesce

In [0]:
df_reduced_partitions = df_user.coalesce(10)

df_increased_paritions = df_user.repartition(30)

In [0]:
df_user.coalesce(10).write.mode("overwrite").parquet("abfss://raw@sadataanalysisyelp.dfs.core.windows.net/coalesce/user.parquet")

df_user.repartition(10).write.mode("overwrite").parquet("abfss://raw@sadataanalysisyelp.dfs.core.windows.net/repartition/user.parquet")

#### Find the top 3 users based on their total number of reviews.

In [0]:
df_user.createOrReplaceTempView("user")

In [0]:
%sql
select
  user_id,
  name,
  review_count
from
  user
order by
  review_count desc
limit
  3;

user_id,name,review_count
Hi10sGSZNxQH3NLyWSZ1oA,Fox,17473
8k3aO-mPeyhbR5HUucA5aA,Victor,16978
hWDybu_KvYLSdEFzGrniTw,Bruce,16567


#### Find the top 10 users with the most fans

In [0]:
df_top_10 = df_user.select("user_id","name","fans").orderBy(desc(col("fans")))

In [0]:
%sql
select
  user_id,
  name,
  fans
from
  user
order by
  fans desc
limit
  10

user_id,name,fans
37cpUoM8hlkSQfReIEBd-Q,Mike,12497
hizGc5W1tBHPghM5YKCAtg,Katie,3642
Hi10sGSZNxQH3NLyWSZ1oA,Fox,3493
JjXuiru1_ONzDkYVrHN0aw,Richard,3243
j14WgRoU_-2ZE1aw1dXrJg,Daniel,3138
VHdY6oG2JPVNjihWhOooAQ,Jessica,2627
iLjMdZi0Tm7DQxX1C1_2dg,Ruggy,2547
lt7bNHl-TXziny4FETu8nA,Megan,2451
fgwI3rYHOv1ipfVfCSx7pg,Emi,2424
ITa3vh5ERI90G_WP4SmGUQ,Peter,2388


#### Analyse the top 10 categories by a number of reviews.

In [0]:
df_business_review = df_business.groupBy(col("categories")).agg(count(col("review_count")).alias("total_reviews_count"))

df_business_review = df_business_review.withColumn("rnk", row_number().over(Window.orderBy(desc("total_reviews_count"))))
df_business_top_10 = df_business_review.filter(col("rnk")<=10)



#### Analyse top businesses which have over 1000 reviews.

In [0]:
df_business_1000 = df_business_review.filter(col("total_reviews_count")>1000)

#### Analyse Business Data: Number of restaurants per state.

In [0]:
df_restraunts_per_state = df_business.groupBy("state").count().orderBy(desc("count"))

#### Analyze the top 3 restaurants in each state.

In [0]:
df_business_rnk = df_business.withColumn("rnk", row_number().over(Window.partitionBy("state").orderBy("review_count")))
df_top_3_restraunts = df_business_rnk.filter(col("rnk")<=3)

#### List the top restaurants in a state by the number of reviews.

In [0]:
df_restraunts = df_business.withColumn("rnk",row_number().over(Window.partitionBy("state").orderBy(desc("review_count"))))
df_top_restraunts = df_restraunts.filter((col("rnk")<=10) & (col("state") == 'AZ'))

#### Numbers of restaurants in Arizona state per city.

In [0]:
df_business_AZ = df_business.filter(col("state")=='AZ')
df_business_AZ_city = df_business_AZ.groupBy("city").count().orderBy(desc("count"))

#### select city with most no of restraunts in AZ

In [0]:
df_city_restraunt_az = df_business_AZ_city.withColumn("rnk",rank().over(Window.orderBy(desc("count"))))
df_city_restraunt_az = df_city_restraunt_az.filter(col("rnk")==1)

#### Broadcast Join: restaurants as per review ratings in AZ state top city.

In [0]:
df_best_restraunts = df_business.join(broadcast(df_city_restraunt_az),"city","inner")
df_best_restraunts = df_best_restraunts.groupBy("name","stars").agg(count(col("review_count")).alias("review_count"))
df_best_restraunts = df_best_restraunts.filter(col("review_count")>=10)
df_best_restraunts = df_best_restraunts.filter(col("stars")>=3)

#### Most rated Italian restaurant in AZ.

In [0]:
df_Italian = df_business_AZ.filter(col("categories").contains("Italian"))
df_Italian = df_Italian.withColumn("rnk",rank().over(Window.orderBy(desc("stars"))))

df_Italian = df_Italian.filter(col("rnk")==1).select("name","city","state","stars")
