In [100]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import when
from pyspark.sql.functions import min as spark_min, max as spark_max, round, desc
from pyspark.sql.functions import col, collect_set, concat_ws, to_date, current_date, date_sub
from pyspark.sql.functions import row_number


spark = SparkSession.builder.appName("FirstApp").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Dataset1

In [129]:
# Read all CSV files into a single DataFrame
df1 = spark.read.csv("s3://emr-project/output/dataset1/" + "*.csv", header=True, inferSchema=True)

df1 = df1.withColumn("score", col("score").cast("double"))
df1 = df1.filter(df1.score.isNotNull())
df1 = df1.filter(df1.restaurant_id.isNotNull())

min_score, max_score = df1.select(spark_min(col("score")), spark_max(col("score"))).first()

df1 = df1.withColumn("normalized_score", (col("score") - min_score) / (max_score - min_score))
df1 = df1.withColumn("normalized_score", round(col("normalized_score") * 100, 3))

df1 = df1.drop("score")

df1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----------------+----------------+--------------------+--------------------+----------------+
|inspection_date|   restaurant_id| restaurant_name|     inspection_type|         description|normalized_score|
+---------------+----------------+----------------+--------------------+--------------------+----------------+
|     02/29/2024|#807 TUTTA BELLA|#807 TUTTA BELLA|Routine Inspectio...|3400 - Wiping clo...|          10.638|
|     02/29/2024|#807 TUTTA BELLA|#807 TUTTA BELLA|Routine Inspectio...|4200 - Food-conta...|          10.638|
|     03/02/2023|#807 TUTTA BELLA|#807 TUTTA BELLA|Routine Inspectio...|2120 - Proper col...|          15.957|
|     03/02/2023|#807 TUTTA BELLA|#807 TUTTA BELLA|Routine Inspectio...|1300 - Food conta...|          15.957|
|     08/31/2022|#807 TUTTA BELLA|#807 TUTTA BELLA|Routine Inspectio...|3200 - Insects, r...|          10.638|
|     08/31/2022|#807 TUTTA BELLA|#807 TUTTA BELLA|Routine Inspectio...|0200 - Food Worke...|          10.638|
|

In [130]:
count_df1 = df1.groupBy("restaurant_id").count()
result_df1 = count_df1.join(
    df1.select("restaurant_id", "restaurant_name").distinct(), 
    on="restaurant_id", 
    how="left"
)
sorted_df1 = result_df1.orderBy(col("count").desc()).limit(10)
sorted_df1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+--------------------+
|       restaurant_id|count|     restaurant_name|
+--------------------+-----+--------------------+
|           TACO TIME| 1127|           TACO TIME|
|              SUBWAY| 1026|        SUBWAY #3146|
|              SUBWAY| 1026|              SUBWAY|
|              SUBWAY| 1026|       SUBWAY #10558|
|              SUBWAY| 1026|MAPLE VALLEY SUBW...|
| PAGLIACCI PIZZA INC|  687| PAGLIACCI PIZZA INC|
|    TOSHI'S TERIYAKI|  412|    TOSHI'S TERIYAKI|
|TAQUERIA EL RINCO...|  408|TAQUERIA EL RINCO...|
|         CAFFE LADRO|  397|         CAFFE LADRO|
|          MCDONALD'S|  333|   MCDONALD'S #11027|
+--------------------+-----+--------------------+

In [131]:
count_df1 = df1.groupBy("inspection_type").count()
sorted_df1 = count_df1.orderBy(col("count").desc()).limit(10)
sorted_df1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|     inspection_type| count|
+--------------------+------+
|Routine Inspectio...|218396|
|Consultation/Educ...| 38718|
|   Return Inspection|  6764|
+--------------------+------+

In [132]:
# Convert the Inspection_date column to date type
df1 = df1.withColumn("inspection_date", to_date(col("inspection_date"), "MM/dd/yyyy"))

# Calculate the date 2 years ago from today
two_years_ago = date_sub(current_date(), 2 * 365)

old_inspections_df1 = df1.filter(col("inspection_date") < two_years_ago)
old_inspections_df1 = old_inspections_df1.drop("restaurant_id")

# Show the filtered DataFrame
old_inspections_df1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----------------+--------------------+--------------------+----------------+
|inspection_date| restaurant_name|     inspection_type|         description|normalized_score|
+---------------+----------------+--------------------+--------------------+----------------+
|     2022-01-13|#807 TUTTA BELLA|Routine Inspectio...|                NULL|           5.319|
|     2021-01-06|#807 TUTTA BELLA|Routine Inspectio...|                NULL|           5.319|
|     2022-07-13|       +MAS CAFE|   Return Inspection|                NULL|           5.319|
|     2022-06-29|       +MAS CAFE|Routine Inspectio...|3400 - Wiping clo...|          30.319|
|     2022-06-29|       +MAS CAFE|Routine Inspectio...|4800 - Physical f...|          30.319|
|     2022-06-29|       +MAS CAFE|Routine Inspectio...|3200 - Insects, r...|          30.319|
|     2022-06-29|       +MAS CAFE|Routine Inspectio...|1600 - Proper coo...|          30.319|
|     2022-06-29|       +MAS CAFE|Routine Inspectio...|2110 

In [134]:
# Convert the Inspection_date column to date type
df1 = df1.withColumn("inspection_date", to_date(col("inspection_date"), "MM/dd/yyyy"))

# Define a window specification to partition by restaurant_id and order by Inspection_date descending
window_spec = Window.partitionBy("restaurant_id").orderBy(desc("inspection_date"))

# Add a row number to each row within the partition
df1_with_row_num = df1.withColumn("row_num", row_number().over(window_spec))

# Filter to get only the latest record for each restaurant_id
latest_records_df1 = df1_with_row_num.filter(col("row_num") == 1).drop("row_num")

# Calculate the date two years ago from today
two_years_ago = date_sub(current_date(), 2 * 365)

# Filter records that are older than two years from latest_records_df
old_records_df = latest_records_df1.filter(col("inspection_date") < two_years_ago)

old_records_df = old_records_df.drop("restaurant_id")

# Show the filtered DataFrame
old_records_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------------+--------------------+----------------+
|inspection_date|     restaurant_name|     inspection_type|         description|normalized_score|
+---------------+--------------------+--------------------+--------------------+----------------+
|     2022-03-26|  ADAMS BENCH WINERY|Routine Inspectio...|                NULL|           5.319|
|     2022-03-16|AMAZON - INVENT C...|Routine Inspectio...|                NULL|           5.319|
|     2022-03-16|AMAZON S - MARKET...|Routine Inspectio...|                NULL|           5.319|
|     2022-03-16|   AMAZON S- KITCHEN|Routine Inspectio...|                NULL|           5.319|
|     2022-04-02|AMBASSADOR WINES ...|Routine Inspectio...|                NULL|           5.319|
|     2019-03-12|AMERICAN LEGION P...|Routine Inspectio...|                NULL|           5.319|
|     2022-05-21|ANCESTRY CELLARS ...|Routine Inspectio...|0600 - Adequate h...|          10.638|
|     2022-03-26|ANC

# Dataset 2

In [135]:
# Read all CSV files into a single DataFrame
df2 = spark.read.csv("s3://emr-project/output/dataset2/" + "*.csv", header=True, inferSchema=True)

df2 = df2.withColumn("score", col("score").cast("double"))
df2 = df2.filter(df2.score.isNotNull())

df2 = df2.withColumnRenamed("score", "normalized_score")

df2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-------------+--------------------+--------------------+----------------+--------------------+
|inspection_date|restaurant_id|     restaurant_name|     inspection_type|normalized_score|         description|
+---------------+-------------+--------------------+--------------------+----------------+--------------------+
|     06/08/2018|      2522268|         JET'S PIZZA|Canvass Re-Inspec...|           33.33|45. FOOD HANDLER ...|
|     06/12/2018|      2222369|SIMPLY SOUPS SALA...|             Canvass|           33.33|2. FACILITIES TO ...|
|     06/29/2018|      2418662|COLUMBUS MANOR RE...|Canvass Re-Inspec...|           33.33|33. FOOD AND NON-...|
|     07/24/2018|      2147911|MEZZA GRILLED WRA...|             Canvass|           33.33|3. MANAGEMENT, FO...|
|     08/14/2018|      2609804|GATELY SEAFOOD MA...|             License|           33.33|                NULL|
|     08/07/2018|      2398548|BISMILLAH RESTAURANT|Canvass Re-Inspec...|           33.33|58. ALLERGEN T

In [136]:
aggregated_names_df2 = df2.groupBy("restaurant_id").agg(concat_ws(", ", collect_set("restaurant_name")).alias("restaurant_names"))
count_df2 = df2.groupBy("restaurant_id").count()
result_df2 = count_df2.join(aggregated_names_df2, on="restaurant_id", how="left")
sorted_df2 = result_df2.orderBy(col("count").desc()).limit(10)

sorted_df2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-----+--------------------+
|restaurant_id|count|    restaurant_names|
+-------------+-----+--------------------+
|            0|  673|EPIPHANY PARISH/C...|
|      1354323|  198|SPORTS SERVICE SO...|
|        14616|  183|ILLINOIS SPORTSER...|
|      1574001|   87|LEVY RESTAURANTS ...|
|        60184|   64|TAQUERIA EL RANCHITO|
|      2083833|   62|MARIANO'S FRESH M...|
|      1142451|   60|JEWEL FOOD  STORE...|
|      1974745|   59|LEVY RESTAURANTS ...|
|        25152|   56|Chavez Upper Grad...|
|      1884255|   56|FOOD 4 LESS, FOOD...|
+-------------+-----+--------------------+

In [137]:
count_df2 = df2.groupBy("inspection_type").count()
sorted_df2 = count_df2.orderBy(col("count").desc()).limit(10)
sorted_df2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|     inspection_type| count|
+--------------------+------+
|             Canvass|143364|
|             License| 36772|
|Canvass Re-Inspec...| 30598|
|           Complaint| 26020|
|License Re-Inspec...| 11706|
|Complaint Re-Insp...| 10919|
|Short Form Complaint|  8440|
|      Non-Inspection|  3248|
|Suspected Food Po...|   973|
|        Consultation|   674|
+--------------------+------+

In [138]:
# Convert the Inspection_date column to date type
df2 = df2.withColumn("inspection_date", to_date(col("inspection_date"), "MM/dd/yyyy"))

# Calculate the date 2 years ago from today
two_years_ago = date_sub(current_date(), 2 * 365)

old_inspections_df2 = df2.filter(col("inspection_date") < two_years_ago)
old_inspections_df2 = old_inspections_df2.drop("restaurant_id")

# Show the filtered DataFrame
old_inspections_df2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------------+----------------+--------------------+
|inspection_date|     restaurant_name|     inspection_type|normalized_score|         description|
+---------------+--------------------+--------------------+----------------+--------------------+
|     2018-06-08|         JET'S PIZZA|Canvass Re-Inspec...|           33.33|45. FOOD HANDLER ...|
|     2018-06-12|SIMPLY SOUPS SALA...|             Canvass|           33.33|2. FACILITIES TO ...|
|     2018-06-29|COLUMBUS MANOR RE...|Canvass Re-Inspec...|           33.33|33. FOOD AND NON-...|
|     2018-07-24|MEZZA GRILLED WRA...|             Canvass|           33.33|3. MANAGEMENT, FO...|
|     2018-08-14|GATELY SEAFOOD MA...|             License|           33.33|                NULL|
|     2018-08-07|BISMILLAH RESTAURANT|Canvass Re-Inspec...|           33.33|58. ALLERGEN TRAI...|
|     2018-06-05|    FRANCESCA'S CAFE|License Re-Inspec...|           33.33|                NULL|
|     2018-06-21|PRE

In [139]:
# Convert the Inspection_date column to date type
df2 = df2.withColumn("inspection_date", to_date(col("inspection_date"), "MM/dd/yyyy"))

# Define a window specification to partition by restaurant_id and order by Inspection_date descending
window_spec = Window.partitionBy("restaurant_id").orderBy(desc("inspection_date"))

# Add a row number to each row within the partition
df2_with_row_num = df2.withColumn("row_num", row_number().over(window_spec))

# Filter to get only the latest record for each restaurant_id
latest_records_df2 = df2_with_row_num.filter(col("row_num") == 1).drop("row_num")

# Calculate the date two years ago from today
two_years_ago = date_sub(current_date(), 2 * 365)

# Filter records that are older than two years from latest_records_df
old_records_df = latest_records_df2.filter(col("inspection_date") < two_years_ago)

old_records_df = old_records_df.drop("restaurant_id")

# Show the filtered DataFrame
old_records_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------------+----------------+--------------------+
|inspection_date|     restaurant_name|     inspection_type|normalized_score|         description|
+---------------+--------------------+--------------------+----------------+--------------------+
|     2017-02-24|CAL'S 400 LIQUORS...|             Canvass|           100.0|                NULL|
|     2012-07-25|        PIZZA LOUNGE|             Canvass|           100.0|                NULL|
|     2012-10-04|FRANCES COCKTAIL ...|             Canvass|           100.0|                NULL|
|     2011-09-08| LLOYDS LOUNGE, INC.|Complaint Re-Insp...|           100.0|33. FOOD AND NON-...|
|     2019-04-29|TREASURE ISLAND F...|             Canvass|           33.33|                NULL|
|     2019-06-28|TREASURE ISLAND F...|             Canvass|           33.33|                NULL|
|     2016-11-07| MIRABELL RESTAURANT|             Canvass|           33.33|                NULL|
|     2013-08-20|   

# Dataset 3

In [140]:
# Read all CSV files into a single DataFrame
df3 = spark.read.csv("s3://emr-project/output/dataset3/" + "*.csv", header=True, inferSchema=True)

df3 = df3.withColumn("score", col("score").cast("double"))
df3 = df3.filter(df3.score.isNotNull())

min_score, max_score = df3.select(spark_min(col("score")), spark_max(col("score"))).first()

df3 = df3.withColumn("normalized_score", (col("score") - min_score) / (max_score - min_score))
df3 = df3.withColumn("normalized_score", round(col("normalized_score") * 100, 3))

df3 = df3.drop("score")

df3.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-------------+--------------------+--------------------+--------------------+----------------+
|inspection_date|restaurant_id|     restaurant_name|     inspection_type|         description|normalized_score|
+---------------+-------------+--------------------+--------------------+--------------------+----------------+
|     09/23/2019|     41683279|INTREPID SEA-AIR-...|Cycle Inspection ...|                NULL|             0.0|
|     02/18/2022|     50013704|ACAPULCO BAR REST...|Inter-Agency Task...|                NULL|             0.0|
|     01/10/2023|     41482806|              DUNKIN|Cycle Inspection ...|Thawing procedure...|           6.548|
|     10/13/2021|     41374469|              DUNKIN|Cycle Inspection ...|Food contact surf...|           7.143|
|     01/30/2023|     41671540|              DUNKIN|Cycle Inspection ...|Live roaches in f...|           6.548|
|     11/18/2022|     50000606|            VEKSLERS|Cycle Inspection ...|Hot TCS food item...|          

In [141]:
count_df3 = df3.groupBy("restaurant_id").count()
result_df3 = count_df3.join(
    df3.select("restaurant_id", "restaurant_name").distinct(), 
    on="restaurant_id", 
    how="left"
)
sorted_df3 = result_df3.orderBy(col("count").desc()).limit(10)
sorted_df3.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-----+--------------------+
|restaurant_id|count|     restaurant_name|
+-------------+-----+--------------------+
|     40365904|   64|        MEE SUM CAFE|
|     50111296|   63| BIG WONG RESTAURANT|
|     40398688|   60|          MASTER WOK|
|     41406895|   59|SUN SAI GAI RESTA...|
|     50089474|   59|    THE COPPOLA CAFE|
|     50105561|   55|   DAVIDOVICH BAKERY|
|     50079599|   52|     NICE ONE BAKERY|
|     50123073|   52|    PI GREEK BAKERIE|
|     50044250|   47|FUJI JAPANESE CUI...|
|     41658324|   46|  MI CASA RESTAURANT|
+-------------+-----+--------------------+

In [142]:
count_df3 = df3.groupBy("inspection_type").count()
sorted_df3 = count_df3.orderBy(col("count").desc()).limit(10)
sorted_df3.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|     inspection_type| count|
+--------------------+------+
|Cycle Inspection ...|132395|
|Cycle Inspection ...| 41967|
|Pre-permit (Opera...| 34625|
|Pre-permit (Opera...|  9883|
|Pre-permit (Non-o...|  3092|
|Pre-permit (Opera...|  1648|
|Cycle Inspection ...|  1442|
|Cycle Inspection ...|   968|
|Pre-permit (Opera...|   733|
|Inter-Agency Task...|   381|
+--------------------+------+

In [143]:
# Convert the Inspection_date column to date type
df3 = df3.withColumn("inspection_date", to_date(col("inspection_date"), "MM/dd/yyyy"))

# Calculate the date 2 years ago from today
two_years_ago = date_sub(current_date(), 2 * 365)

old_inspections_df3 = df3.filter(col("inspection_date") < two_years_ago)
old_inspections_df3 = old_inspections_df3.drop("restaurant_id")

# Show the filtered DataFrame
old_inspections_df2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------------+----------------+--------------------+
|inspection_date|     restaurant_name|     inspection_type|normalized_score|         description|
+---------------+--------------------+--------------------+----------------+--------------------+
|     2018-06-08|         JET'S PIZZA|Canvass Re-Inspec...|           33.33|45. FOOD HANDLER ...|
|     2018-06-12|SIMPLY SOUPS SALA...|             Canvass|           33.33|2. FACILITIES TO ...|
|     2018-06-29|COLUMBUS MANOR RE...|Canvass Re-Inspec...|           33.33|33. FOOD AND NON-...|
|     2018-07-24|MEZZA GRILLED WRA...|             Canvass|           33.33|3. MANAGEMENT, FO...|
|     2018-08-14|GATELY SEAFOOD MA...|             License|           33.33|                NULL|
|     2018-08-07|BISMILLAH RESTAURANT|Canvass Re-Inspec...|           33.33|58. ALLERGEN TRAI...|
|     2018-06-05|    FRANCESCA'S CAFE|License Re-Inspec...|           33.33|                NULL|
|     2018-06-21|PRE

In [144]:
# Convert the Inspection_date column to date type
df3 = df3.withColumn("inspection_date", to_date(col("inspection_date"), "MM/dd/yyyy"))

# Define a window specification to partition by restaurant_id and order by Inspection_date descending
window_spec = Window.partitionBy("restaurant_id").orderBy(desc("inspection_date"))

# Add a row number to each row within the partition
df3_with_row_num = df3.withColumn("row_num", row_number().over(window_spec))

# Filter to get only the latest record for each restaurant_id
latest_records_df3 = df3_with_row_num.filter(col("row_num") == 1).drop("row_num")

# Calculate the date two years ago from today
two_years_ago = date_sub(current_date(), 2 * 365)

# Filter records that are older than two years from latest_records_df
old_records_df = latest_records_df3.filter(col("inspection_date") < two_years_ago)

old_records_df = old_records_df.drop("restaurant_id")

# Show the filtered DataFrame
old_records_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+--------------------+--------------------+----------------+
|inspection_date|     restaurant_name|     inspection_type|         description|normalized_score|
+---------------+--------------------+--------------------+--------------------+----------------+
|     2019-05-22|SOMBA VILLAGE (BA...|Cycle Inspection ...|Cold food item he...|           4.762|
|     2022-03-11|        CAFE CARDINI|Cycle Inspection ...|Food contact surf...|           7.143|
|     2018-10-24|    BROADWAY THEATRE|Cycle Inspection ...|Non-food contact ...|           7.143|
|     2019-06-21|  TOTONNO'S PIZZERIA|Cycle Inspection ...|Food not cooled b...|           4.167|
|     2022-04-26|    ACME BAR & GRILL|Cycle Inspection ...|Raw, cooked or pr...|           7.143|
|     2019-04-30|         RUMPUS ROOM|Cycle Inspection ...|Food contact surf...|           4.167|
|     2018-11-09|RADIO CITY MUSIC ...|Cycle Inspection ...|Filth flies or fo...|           5.952|
|     2022-04-07|HEA