In [5]:
from pyspark.sql.functions import col, concat
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.types import *

spark

In [7]:
df = spark.read.csv("datasets/DOHMH_New_York_City_Restaurant_Inspection_Results.csv", header=True)
df.columns

['CAMIS',
 'DBA',
 'BORO',
 'BUILDING',
 'STREET',
 'ZIPCODE',
 'PHONE',
 'CUISINE DESCRIPTION',
 'INSPECTION DATE',
 'ACTION',
 'VIOLATION CODE',
 'VIOLATION DESCRIPTION',
 'CRITICAL FLAG',
 'SCORE',
 'GRADE',
 'GRADE DATE',
 'RECORD DATE',
 'INSPECTION TYPE']

make little dataframes that map features to numbers

In [188]:
# the Closed action is the "target" (not feature)
no_action = "No violations were recorded at the time of this inspection"
re_open = "Establishment re-opened by DOHMH"
citations = "Violations were cited in the following area(s)."
closed = "Establishment Closed by DOHMH.  Violations were cited in the following area(s) and those requiring immediate action were addressed."

violation_map = df.filter(col("VIOLATION CODE").isNotNull()).groupby(
    "VIOLATION CODE"
).agg(
    F.collect_set(col("VIOLATION DESCRIPTION")).alias("description")
).select(
    col("VIOLATION CODE").alias("code"), "description").rdd.collectAsMap()

boros =spark.createDataFrame(
    [("MANHATTAN", 0), ("QUEENS", 1), ("BROOKLYN", 2), ("BRONX", 3), ("STATEN ISLAND", 4)],
    ["boro", "x_boro"]
)
grades = spark.createDataFrame(
    [(None, 0), ("Note Yet Graded", 0), ("P", 0), ("Z", 0), ("A", 1), ("B", 2), ("C", 3)],
    ["grade", "x_grade"]
)
violations = spark.createDataFrame(
           [
               ("08C", 0), ("03C", 1), ("16C", 2), ("02C", 3), ("03F", 4), ("10H", 5), 
               ("10A", 6), ("16A", 7), ("16B", 8), ("08B", 9), ("15I", 10), ("03B", 11), 
               ("03A", 12), ("10J", 13), ("06I", 14), ("03G", 15), ("04B", 16), ("02E", 17), 
               ("02J", 18), ("05A", 19), ("06D", 20), ("05D", 21), ("04L", 22), ("15L", 23),
               ("20F", 24), ("04G", 25), ("06C", 26), ("10C", 27), ("06A", 28), ("07A", 29),
               ("09C", 30), ("09B", 31), ("04A", 32), ("04K", 33), ("05C", 34), ("04C", 35),
               ("05E", 36), ("10F", 37), ("03D", 38), ("06B", 39), ("02D", 40), ("02A", 41),
               ("10G", 42), ("04F", 43), ("06F", 44), ("02H", 45), ("22F", 46), ("02I", 47),
               ("05H", 48), ("02B", 49), ("04N", 50), ("04M", 51), ("02G", 52), ("02F", 53), 
               ("10D", 54), ("10E", 55), ("03E", 56), ("06E", 57), ("05F", 58), ("10B", 59), 
               ("04H", 60), ("06H", 61), ("09A", 62), ("04D", 63), ("04J", 64), ("04E", 65),
               ("05B", 66), ("06G", 67), ("04O", 68), ("22G", 69), ("15S", 70), ("05I", 71),
               ("10I", 72), ("08A", 73), ("04I", 74)
           ],
           ["violation", "x_violation"],
)

descriptions =  spark.createDataFrame(
    [
        ("Pancakes/Waffles", 0), ("Chinese/Japanese", 1), ("Mexican", 2), ("Jewish/Kosher", 3), ("Bakery", 4), ("Turkish", 5), ("Scandinavian", 6), ("Armenian", 7), ("Hotdogs", 8), ("Ethiopian", 9), ("Thai", 10), ("Indian", 11), ("Chinese", 12), ("Indonesian", 13), ("Soul Food", 14), ("Continental", 15), ("Steak", 16), ("African", 17), ("Creole", 18), ("CafÃ©/Coffee/Tea", 19), ("Donuts", 20), ("Tapas", 21), ("Chicken", 22), ("Chilean", 23), ("Irish", 24), ("Ice Cream, Gelato, Yogurt, Ices", 25), ("Polish", 26), ("Pizza/Italian", 27), ("Sandwiches/Salads/Mixed Buffet", 28), ("Hawaiian", 29), ("Peruvian", 30), ("Japanese", 31), ("Salads", 32), ("English", 33), ("Australian", 34), ("Filipino", 35), ("Californian", 36), ("Spanish", 37), ("Pakistani", 38), ("Other", 39), ("Pizza", 40), ("Portuguese", 41), ("Italian", 42), ("Caribbean", 43), ("Bangladeshi", 44), ("Fruits/Vegetables", 45), ("Not Listed/Not Applicable", 46), ("Sandwiches", 47), ("Vegetarian", 48), ("Russian", 49), ("Korean", 50), ("French", 51), ("Southwestern", 52), ("Hotdogs/Pretzels", 53), ("Latin (Cuban, Dominican, Puerto Rican, South & Central American)", 54), ("Egyptian", 55), ("Middle Eastern", 56), ("Seafood", 57), ("Hamburgers", 58), ("Chinese/Cuban", 59), ("Nuts/Confectionary", 60), ("Polynesian", 61), ("Bottled beverages, including water, sodas, juices, etc.", 62), ("Eastern European", 63), ("Brazilian", 64), ("Cajun", 65), ("Bagels/Pretzels", 66), ("Greek", 67), ("Asian", 68), ("Creole/Cajun", 69), ("Iranian", 70), ("Moroccan", 71), ("German", 72), ("Juice, Smoothies, Fruit Salads", 73), ("Mediterranean", 74), ("Soups & Sandwiches", 75), ("Vietnamese/Cambodian/Malaysia", 76), ("Barbecue", 77), ("American", 78), ("Afghan", 79), ("Czech", 80), ("Soups", 81), ("Delicatessen", 82), ("Tex-Mex", 83)
    ],
    ["description", "x_description"]
)


root
 |-- violation: string (nullable = true)
 |-- x_violation: long (nullable = true)



The Health Department inspects about 24,000 restaurants a year to monitor compliance with City and State food safetyregulations. Since July 2010, the Health Department has required restaurants to post letter grades showing sanitary inspectionresults. 

Restaurants with a score between 0 and 13 points earn an A

those with 14 to 27 points receive a B

and those with 28 or more a C

In [107]:
events = df.select(
    # restaurant
    df.DBA.alias("name"),
    col("PHONE").alias("phone"),   
    col("CUISINE DESCRIPTION").alias("description"),    
    # location
    F.concat_ws(" ", df.BUILDING, df.STREET).alias("address"),     
    col("STREET").alias("street"), 
    col("ZIPCODE").alias("zip").cast("int"),
    col("BORO").alias("boro"),
    # score
    col("SCORE").cast("int").alias("score"),
    col("GRADE").alias("grade"),
    col("VIOLATION CODE").alias("violation"),
    df.ACTION.alias("action"),
    F.when(
        col("CRITICAL FLAG") == "Critical", 1
    ).otherwise(0).alias("critical"),    
    F.when(
        df.ACTION == closed, 1
    ).otherwise(0).alias("closed"),      
    # dates
    F.to_date(col("INSPECTION DATE"), 'MM/dd/yyyy').alias("inspection_date"),
    F.to_date(col("GRADE DATE"), 'MM/dd/yyyy').alias("grade_date"),
    F.month(F.to_date(col("INSPECTION DATE"), 'MM/dd/yyyy')).alias("month"),
).filter(
    "score is not NULL"
).join(
    F.broadcast(violations), "violation", how="left"
).join(
    F.broadcast(grades), "grade", how="left"
).join(
    F.broadcast(boros), "boro", how="left"
).join(
    F.broadcast(descriptions), "description", how="left"
).fillna(0).withColumn(
    "id", F.md5(
        concat(col("name"), col("address"))
    )
).orderBy("grade_date")
events.cache()
events.printSchema()

root
 |-- description: string (nullable = true)
 |-- boro: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- violation: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = false)
 |-- street: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- critical: integer (nullable = false)
 |-- closed: integer (nullable = false)
 |-- inspection_date: date (nullable = true)
 |-- grade_date: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- x_violation: long (nullable = true)
 |-- x_grade: long (nullable = true)
 |-- x_boro: long (nullable = true)
 |-- x_description: long (nullable = true)
 |-- id: string (nullable = true)



In [169]:
profiles = events.groupby("id", "name", "address", "boro", "x_boro").agg(
    F.first("phone").alias("phone"),
    F.first(F.col("zip")).alias("zip"), # the 2 queens airports changed zip codes at one point...
    F.collect_set("x_description").alias("x_descriptions"),
    F.collect_set("description").alias("descriptions"),
)
profiles.cache()
profiles.count()

25251

In [192]:


daily_profiles = events.groupby("id", "inspection_date").agg(
    F.count(F.col("x_violation")).cast("integer").alias("day_violation_count"),
    F.collect_set(F.col("x_violation").cast("integer")).alias("day_violations"),
    F.max(F.col("x_grade")).cast("integer").alias("day_grade"),  # 23 events have different grades
    
    F.avg(F.col("score")).cast("int").alias("score"), # all days have the same score, except `null` dates, which will be averaged
    F.when(F.sum(F.col("closed")) > 0, 1).otherwise(0).alias("closed"),
    F.when(F.sum(F.col("critical")) > 0, 1).otherwise(0).alias("critical"),
)

daily_profiles.cache()
daily_profiles.count()
daily_profiles.printSchema()

root
 |-- id: string (nullable = true)
 |-- inspection_date: date (nullable = true)
 |-- day_violation_count: integer (nullable = false)
 |-- day_violations: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- day_grade: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- closed: integer (nullable = false)
 |-- critical: integer (nullable = false)



In [185]:
daily_profiles.filter("day_grade = 0").count()

52080

In [58]:
events.groupby("id", "grade_date").count().groupby("grade_date").count().orderBy("grade_date").show(300)

+----------+-----+
|grade_date|count|
+----------+-----+
|      null|20111|
|2012-05-01|    1|
|2012-06-23|    1|
|2013-03-11|    1|
|2013-03-12|    1|
|2013-03-14|    1|
|2013-04-04|    2|
|2013-04-05|    1|
|2013-04-11|    1|
|2013-04-15|    1|
|2013-04-17|    1|
|2013-04-18|    1|
|2013-04-19|    1|
|2013-04-23|    1|
|2013-04-24|    1|
|2013-04-26|    1|
|2013-04-30|    2|
|2013-05-03|    1|
|2013-05-07|    1|
|2013-05-09|    2|
|2013-05-10|    1|
|2013-05-13|    2|
|2013-05-16|    1|
|2013-05-17|    1|
|2013-05-20|    1|
|2013-05-23|    1|
|2013-05-28|    1|
|2013-05-29|    1|
|2013-05-30|    1|
|2013-05-31|    4|
|2013-06-03|    1|
|2013-06-05|    2|
|2013-06-07|    2|
|2013-06-08|    2|
|2013-06-10|    1|
|2013-06-11|    1|
|2013-06-12|    2|
|2013-06-13|    1|
|2013-06-14|    3|
|2013-06-15|    2|
|2013-06-17|    1|
|2013-06-20|    2|
|2013-06-22|    1|
|2013-06-25|    1|
|2013-06-26|    1|
|2013-06-27|    1|
|2013-06-28|    3|
|2013-07-02|    1|
|2013-07-05|    1|
|2013-07-09|

## Preliminary questions

In [78]:
inspections.cache()
total = inspections.count()
null_score = inspections.filter("score is null").count()
null_grade = inspections.filter("grade is null").count()
null_grade_date = inspections.filter("grade_date is null").count()
null_inspect_date = inspections.filter("inspection_date is null").count()
null_score_and_grade = inspections.filter("score is null and grade is null").count()
null_grade_and_grade_date = inspections.filter("grade_date is null and grade is null").count()

null_phones = inspections.filter("phone is null").count()
unique_street = inspections.select("street").distinct().count()
unique_phones = inspections.select("phone").distinct().count()
unique_address = inspections.select("address").distinct().count()
unique_address_phone = inspections.select("address", "phone").distinct().count()
unique_names = inspections.select("name").distinct().count()
unique_address_name = inspections.select("address", "name").distinct().count()

uniques = inspections.select("address", "phone", "description").distinct().count()
uniques_plus_name = inspections.select(
    "address", "phone", "description", "name"
).distinct().count()

desc_changers = inspections.groupby(
    "address", "phone"
).agg(F.collect_set(inspections.description).alias("desc")).filter("size(desc) > 1").count()

name_changers = inspections.groupby(
    "address", "phone"
).agg(F.collect_set(inspections.name).alias("change")).filter("size(change) > 1").count()

phone_changers = inspections.groupby(
    "address", 
).agg(F.collect_set(inspections.phone).alias("change")).filter("size(change) > 1").count()

addr_changers = inspections.groupby(
    "phone",
).agg(F.collect_set(inspections.address).alias("change")).filter("size(change) > 1").count()
inspections.unpersist()

DataFrame[name: string, phone: string, description: string, address: string, street: string, zip: string, boro: string, score: string, grade: string, violation: string, action: string, critical: int, closed: int, inspection_date: date, grade_date: date, month: int]

In [77]:
print(f"""
Total:             {total:,}
Null Score:        {null_score:,}
Null Grad:         {null_grade:,}
Null Grad Date:    {null_grade_date:,}
Null Inspect Date: {null_inspect_date:,}

Unique Phones:     {unique_phones:,}
Null phones:       {null_phones:,}
Unique Street:     {unique_street:,}
Unique Address:    {unique_address:,}
Unique addr phone: {unique_address_phone:,}
Unique Name:       {unique_names:,}

Uniques by name:   {uniques_plus_name:,}
Uniques x3:        {uniques:,}
Uniqe addr name:   {unique_address_name:,}

Changers
 desc:             {desc_changers}
 name:             {name_changers}
 phone:            {phone_changers}
 addr:             {addr_changers}                   
                   
                   {null_score_and_grade:,}
                   {null_grade_and_grade_date:,}
""")


Total:             376,704
Null Score:        0
Null Grad:         181,295
Null Grad Date:    183,886
Null Inspect Date: 0

Unique Phones:     24,298
Null phones:       0
Unique Street:     3,260
Unique Address:    23,056
Unique addr phone: 24,928
Unique Name:       20,304

Uniques by name:   25,317
Uniques x3:        25,056
Uniqe addr name:   25,251

Changers
 desc:             86
 name:             166
 phone:            1102
 addr:             429                   
                   
                   0
                   181,295

