In [12]:
data_path = '/data

In [2]:
# Import all libraries
import json
import numpy as np
import functools

from pyspark.sql import Row
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, desc, asc
from pyspark.sql import functions as F
from pyspark.sql import types as t

from difflib import SequenceMatcher

In [3]:
# Define map function for reading first record of json file with column names
def flat_map_first_row(rec):
    rows = []
    json_obj = json.loads((rec))
    for e in json_obj:
        row_dict = {str(k): v for k, v in e.items()}
        rows.append(Row(**row_dict))
    return rows[0]

In [4]:
# Define map function for creating rdd rows with fillin mising column
def flat_map_json_with_fillin_missing_column(rec, column_names):
    rows = []
    json_obj = json.loads((rec))
    for e in json_obj:
        row_dict = {str(k): v for k, v in e.items()}
        for c in column_names:
            if c not in row_dict.keys():
                row_dict[c] = None
        rows.append(Row(**row_dict))
    return rows

In [13]:
# Create rdd to represent first row of all files
rdd_first_row = sc.wholeTextFiles(data_path + '/*.json').filter(lambda x: x[1] != "[]\n").map(lambda x: x[1]).map(flat_map_first_row)
# rdd_first_row.take(5)

In [14]:
# Define column name of set type that has all column names from all first json records  
column_names = []

for i in rdd_first_row.toLocalIterator():
    column_names.extend(i.asDict().keys())
column_names = set(column_names)

# exec time < 4 min

In [15]:
# Specify reference to json row mapping function with column name passed in's - this will fill any missing columns with NULL 
parsing_func = functools.partial(flat_map_json_with_fillin_missing_column, column_names=column_names)

In [16]:
# Read through all files with implement filling in missing column names
rdd = sc.wholeTextFiles(data_path + '/*.json').filter(lambda x: x[1] != "[]\n").map(lambda x: x[1]).flatMap(parsing_func)
# rdd.take(5)

In [17]:
# Create pyspark dataframe from the rdd object (rdd is a row or dictionary format of data)
df = spark.createDataFrame(rdd)

# time < 1 min

In [18]:
df.cache()

DataFrame[action: string, boro: string, building: string, camis: string, critical_flag: string, cuisine_description: string, dba: string, grade: string, grade_date: string, inspection_date: string, inspection_type: string, phone: string, record_date: string, score: string, street: string, violation_code: string, violation_description: string, zipcode: string]

In [19]:
#########################################################
# 1 Define business dataframe 
#########################################################

df_business = df.select(["cuisine_description", "camis", "dba", "boro", "building", "phone", "street", "zipcode", "inspection_date"]).distinct()#.collect()

In [20]:
# specify window function - getting last row of every camis (dobid) and latest inspection date

window = Window.partitionBy("camis").orderBy(desc("inspection_date"))
df_business_latest = (df_business.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
)
# df_business_latest.take(5)

In [21]:
#########################################################
# 2 Define business grade dataframe 
#########################################################

[Row(camis='50000804', score='12', grade='A', grade_date='2015-02-18T00:00:00.000'),
 Row(camis='41448559', score='9', grade='A', grade_date='2015-04-07T00:00:00.000'),
 Row(camis='41377019', score='4', grade='A', grade_date='2015-04-09T00:00:00.000'),
 Row(camis='50005214', score='9', grade='A', grade_date='2015-04-25T00:00:00.000'),
 Row(camis='41712926', score='10', grade='A', grade_date='2015-06-02T00:00:00.000')]

In [None]:
df_business_grade_detail = df.where(col('grade').isNotNull()).select(['camis', 'score', 'grade', 'grade_date']).distinct()
df_business_grade_detail.take(5)
# 4 mins

In [22]:
# Define function to return grade number from letter

def grade_letter_dict_to_number(key):
    dict = {'A': 5, 'B': 4, 'C': 3}
    return dict.get(key)
    
# grade_number_dict_to_letter = {'5': 'A', '4':'B' , '3': 'C'}

In [23]:
# Define pyspark user define function for mapping grade letter to grade number
grade_udf = F.udf(grade_letter_dict_to_number, t.IntegerType())

In [24]:
# Create new grade number column from grade (letter) column using dict udf function
df_business_grade_detail = df_business_grade_detail.withColumn('grade_number', grade_udf(col('grade')))

In [25]:
# Exclude business grade d.f. of the following: exclude NULL and grade 'Not Yet Graded', 'G', 'P', 'Z'
list_grade_exclude = {'Not Yet Graded', 'G', 'P', 'Z'}
df_business_grade_detail2 = df_business_grade_detail.where(col('grade').isNotNull()).filter(~col('grade').isin(list_grade_exclude))#.select(col('grade')).distinct().collect()
# df_business_grade.take(5)

In [26]:
# Create business grade info d.f. of the following:
# lowest grade, highest grade, average grade, and how many given grades
df_business_grade = df_business_grade_detail2.groupBy(col('camis')).agg(F.min('grade_number').alias('grade_lowest'), \
                                            F.max('grade_number').alias('grade_highest'), \
                                            F.mean('grade_number').alias('grade_average'), \
                                            F.count('grade_number').alias('grade_count')
                                           ) #\


In [27]:
df_business_grade.take(5)
# < 1 mins

[Row(camis='50006252', grade_lowest=5, grade_highest=5, grade_average=5.0, grade_count=4),
 Row(camis='41405042', grade_lowest=5, grade_highest=5, grade_average=5.0, grade_count=3),
 Row(camis='41135090', grade_lowest=5, grade_highest=5, grade_average=5.0, grade_count=4),
 Row(camis='50003326', grade_lowest=5, grade_highest=5, grade_average=5.0, grade_count=2),
 Row(camis='41655458', grade_lowest=5, grade_highest=5, grade_average=5.0, grade_count=5)]

In [28]:
#########################################################
# 3 Define business restaurant type and similarity measures dataframe 
#########################################################


In [29]:
def similarity_score(a, b):
    return SequenceMatcher(None, a, b).ratio() * 100

In [30]:
udf_similarity_score = F.udf(similarity_score, t.DoubleType())

In [31]:
df_business_cuisine = df_business_latest.select(['camis', 'cuisine_description', 'zipcode']).distinct()
df_business_cuisine.take(5)
# < 1 mins

[Row(camis='40356731', cuisine_description='Ice Cream, Gelato, Yogurt, Ices', zipcode='11226'),
 Row(camis='40379662', cuisine_description='Italian', zipcode='10021'),
 Row(camis='40394329', cuisine_description='Pizza', zipcode='11234'),
 Row(camis='40512746', cuisine_description='American', zipcode='10451'),
 Row(camis='40518828', cuisine_description='CafÃ©/Coffee/Tea', zipcode='10017')]

In [32]:
df_bc1 = df_business_cuisine.alias('df_bc1')

In [33]:
df_bc2 = df_business_cuisine.alias('df_bc2')

In [34]:
df_bc_join = df_bc1.join(df_bc2, (df_bc1['zipcode'] == df_bc2['zipcode']) & (df_bc1['camis'] != df_bc2['camis']) )
# df_bc_join.take(2)

In [35]:
df_bc_joined = df_bc_join.select(["df_bc1.camis", "df_bc1.cuisine_description", "df_bc2.camis", "df_bc2.cuisine_description"]).withColumn("matching_camis", col("df_bc2.camis")).withColumn("matching_cuisine_description", col("df_bc2.cuisine_description")).drop(col("df_bc2.camis")).drop(col("df_bc2.cuisine_description"))#.withColumn("similar_score", SequenceMatcher(None, col(""))).take(2)
# df_bc_joined.take(2)

In [36]:
df_bc_compared_set = df_bc_joined.withColumn("similarity_score", udf_similarity_score(col("cuisine_description"), col("matching_cuisine_description")))
# df_bc_compared_set.take(20)

In [37]:
# df_bc_compared_set.take(20)
df_bc_compared_set.cache()

DataFrame[camis: string, cuisine_description: string, matching_camis: string, matching_cuisine_description: string, similarity_score: double]

In [38]:
df_bc_compared_similar_list = df_bc_compared_set.filter(col("similarity_score") >= 80).orderBy(asc("similarity_score"))
df_bc_compared_similar_list.take(10)

# < 2 mins

[Row(camis='40786919', cuisine_description='American', matching_camis='50034508', matching_cuisine_description='African', similarity_score=80.0),
 Row(camis='41510896', cuisine_description='American', matching_camis='50034508', matching_cuisine_description='African', similarity_score=80.0),
 Row(camis='50002099', cuisine_description='American', matching_camis='50034508', matching_cuisine_description='African', similarity_score=80.0),
 Row(camis='50056425', cuisine_description='American', matching_camis='50034508', matching_cuisine_description='African', similarity_score=80.0),
 Row(camis='50078406', cuisine_description='American', matching_camis='50034508', matching_cuisine_description='African', similarity_score=80.0),
 Row(camis='50034097', cuisine_description='American', matching_camis='50034508', matching_cuisine_description='African', similarity_score=80.0),
 Row(camis='50056428', cuisine_description='American', matching_camis='50034508', matching_cuisine_description='African', si