### [A01] Initialization

In [2]:
### Import libraries
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import pyspark.sql.window as sw
#
import numpy as np
import pandas as pd
import geopandas as gpd
#
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
#
import time
import datetime
from dateutil.relativedelta import relativedelta
import pickle

ModuleNotFoundError: No module named 'geopandas'

In [None]:
### Initialize Spark
spark = SparkSession.builder.appName("cse6242_restaurant_survival").getOrCreate()

### [B01] Functions
This code requires the following datasets to be downloaded and stored locally:<br>
- https://data.cityofnewyork.us/Health/DOHMH-New-York-City-Restaurant-Inspection-Results/43nn-pn8j/about_data<br>
- Place the downloaded data in ./data/raw <br>

In [3]:
import pandas as pd

In [None]:
def read_data_raw(
        spark:pyspark.sql.SparkSession,
        options:dict={}
        ):
    """
    Read the necessary files.
    """
    #
    ### Read NYC Restaurant Data: DOHMH New York City Restaurant Inspection Results
    pdf01_restaurant = pd.read_csv("./../data/raw/DOHMH_New_York_City_Restaurant_Inspection_Results.csv")
    columns_pdf01 = pdf01_restaurant.columns
    columns_pdf01 = [str(i).lower().replace(" ","_") for i in columns_pdf01]
    pdf01_restaurant.columns = columns_pdf01
    pdf01_restaurant["inspection_date"] = pd.to_datetime(pdf01_restaurant["inspection_date"])
    df01_nyc_restaurant_inspection_manhattan = (spark.createDataFrame(pdf01_restaurant)
        .where("(LOWER(boro) = 'manhattan')")
        .selectExpr("bin","camis","boro","zipcode","date(inspection_date) as inspection_date_Ymd",
            "YEAR(date(inspection_date)) as inspection_year","inspection_type","latitude","longitude")
        .distinct())
    del pdf01_restaurant
    #
    df02_buildings = (spark.read.option("header",True).csv("./../data/processed/building/building_240405_1230.csv")
        .withColumnRenamed(r"distance_from_station(ft)","distance_from_station_ft"))
    df03_bus_stop_mn = (spark.read.option("header", True).csv("./../data/raw/bus_stop/Bus_Stop_Shelter.csv"")
        .where("(LOWER(BoroName) = 'manhattan')")
        .selectExpr("Shelter_ID as shelter_id","BoroName as boro","Latitude as latitude","Longitude as longitude"))
    #
    pdf04_mta = (pd.read_csv("./../data/raw/mta_station/MTA_Subway_Stations.csv")
        .loc[:, ["GTFS Stop ID","GTFS Latitude","GTFS Longitude"]]
        .drop_duplicates())
    columns_pdf04 = pdf04_mta.columns
    columns_pdf04 = [str(i).lower().replace(" ","_") for i in columns_pdf04]
    pdf04_mta.columns = columns_pdf04
    df04_mta = spark.createDataFrame(pdf04_mta)
    del pdf04_mta
    #
    df05_ridership = (spark.read.option("header",True).csv("./../data/processed/traffic/ridership_by_bin.csv")
        .withColumnRenamed("distance_from_station(ft)","dist_station"))
    df06_aadt = spark.read.option("header",True).csv("./../data/processed/traffic/idw_aadt_by_bin.csv")
    df07_atvc = spark.read.option("header",True).csv("./../data/processed/traffic/idw_atvc_by_bin.csv")
    #
    return (df01_nyc_restaurant_inspection_manhattan, df02_buildings, df03_bus_stop_mn, df04_mta,
        df05_ridership, df06_aadt, df07_atvc)
#

In [None]:
def get_bankrupt_restaurants(
        df01_nyc_restaurant_inspection_manhattan:pyspark.sql.DataFrame
        ) -> pyspark.sql.DataFrame:
    """
    Restaurants have to be checked annually, and the dataset only shows yearly inspeaction results for three
        consecutive years. Thus, take all data where the restaurant's maximum age is 2 years, and check
        if the restaurant closed after a year.
    """
    #
    case_flag_restaurant_one_year = """
        CASE
            WHEN years_open <= 1 THEN 0
            ELSE 1
        END AS flag_restaurant_one_year
        """
    #
    cond_array = []
    cond_array.append("(latitude IS NOT NULL)")
    cond_array.append("(longitude IS NOT NULL)")
    cond_array.append("(latitude != 0.0)")
    cond_array.append("(longitude != 0.0)")
    filter_cond = " AND ".join(cond_array)
    df08_bankrupt_restaurants = (df01_nyc_restaurant_inspection_manhattan
        .where(filter_cond)
        .groupBy("bin","camis","latitude","longitude")
        .agg(sf.min("inspection_year").alias("inspection_year_min"),
            sf.max("inspection_year").alias("inspection_year_max"))
        .selectExpr("*","(inspection_year_max - inspection_year_min) as years_open")
        .selectExpr("*",case_flag_restaurant_one_year)
        .where("(inspection_year_min >= 2015) AND (years_open <= 2)")
        .distinct())
    #
    return df08_bankrupt_restaurants
#

In [None]:
def get_buildings_info(
        df02_buildings:pyspark.sql.DataFrame,
        df05_ridership:pyspark.sql.DataFrame,
        df06_aadt:pyspark.sql.DataFrame,
        df07_atvc:pyspark.sql.DataFrame
        ) -> pyspark.sql.DataFrame:
    """
    This functions relies on data that are already stored locally.
    """
    #
    tdf01_buildings = (df02_buildings
        .selectExpr("bin","office_area","retail_area","residential_area","street_width_min","street_width_max",
            "posted_speed","betweeness","distance_from_station_ft as dist_station","office_within_450ft as office_450",
            "retail_within_450ft as retail_450","residential_within_450ft as residential_450",
            "distance_to_park dist_park","distance_to_school as dist_school"))
    #
    tdf02_ridership = (df05_ridership
        .groupBy("bin")
        .agg(sf.mean("ridership_morning").alias("ridership_morning_mean"),
            sf.mean("ridership_midday").alias("ridership_midday_mean"),
            sf.mean("ridership_evening").alias("ridership_evening_mean"),
            sf.mean("ridership_night").alias("ridership_night_mean"),
            sf.mean("ridership_late_night").alias("ridership_late_night_mean")))
    tdf03_aadt = (df06_aadt
        .groupBy("bin")
        .agg(sf.mean("idw_aadt").alias("idw_aadt_mean")))
    tdf04_atvc = (df07_atvc
        .groupBy("bin")
        .agg(sf.mean("idw_atvc").alias("idw_atvc_mean")))
    #
    df09_buildings_info = (tdf01_buildings
        .join(tdf02_ridership, how="inner", on=["bin"])
        .join(tdf03_aadt, how="inner", on=["bin"])
        .join(tdf04_atvc, how="inner", on=["bin"])
        .distinct())
    #
    return df09_buildings_info
#

In [None]:
def get_proximity_restaurant(
        df01_nyc_restaurant_inspection_manhattan:pyspark.sql.DataFrame,
        df08_bankrupt_restaurants:pyspark.sql.DataFrame
        ) -> pyspark.sql.DataFrame:
    """
    Find the number of restaurants within the set distances.
    """
    #
    tdf01_unique_lat_lng = (df01_nyc_restaurant_inspection_manhattan
        .selectExpr("camis as camis_diff","latitude as lat_diff","longitude as lng_diff")
        .distinct())
    #
    tdf02_open_restaurants = (df08_bankrupt_restaurants
        .selectExpr("bin","camis","latitude","longitude")
        .distinct())
    #
    df10_proximity_restaurant = (tdf02_open_restaurants
        .join(tdf01_unique_lat_lng, how="cross")
        .where("(camis != camis_diff)")
        .withColumn('distance_in_kms' , \
            sf.round((sf.acos((sf.sin(sf.radians(sf.col("latitude"))) * sf.sin(sf.radians(sf.col("lat_diff")))) + \
                ((sf.cos(sf.radians(sf.col("latitude"))) * sf.cos(sf.radians(sf.col("lat_diff")))) * \
                (sf.cos(sf.radians("longitude") - sf.radians("lng_diff"))))) * sf.lit(6371.0)), 4))
        .groupBy("bin","camis","latitude","longitude")
        .agg(sf.countDistinct(sf.when((sf.col("distance_in_kms") <= 0.10), 
                sf.col("camis_diff")).otherwise(None)).alias("food_100"),
            sf.countDistinct(sf.when((sf.col("distance_in_kms") <= 0.40), 
                sf.col("camis_diff")).otherwise(None)).alias("food_400"),
            sf.countDistinct(sf.when((sf.col("distance_in_kms") <= 0.80), 
                sf.col("camis_diff")).otherwise(None)).alias("food_800"),
            sf.countDistinct(sf.when((sf.col("distance_in_kms") <= 1.00), 
                sf.col("camis_diff")).otherwise(None)).alias("food_1000")))
    #
    return df10_proximity_restaurant
#

In [None]:
def get_proximity_bus(
        df03_bus_stop_mn:pyspark.sql.DataFrame,
        df08_bankrupt_restaurants:pyspark.sql.DataFrame
        ) -> pyspark.sql.DataFrame:
    """
    Get proximity of restaurants to bus stops.
    """
    #
    tdf01_buses = (df03_bus_stop_mn
        .selectExpr("shelter_id","latitude as lat_diff","longitude as lng_diff"))
    df11_proximity_bus = (df08_bankrupt_restaurants
        .selectExpr("bin","camis","latitude","longitude")
        .join(tdf01_buses, how="cross")
        .withColumn("distance_in_kms", \
            sf.round((sf.acos((sf.sin(sf.radians(sf.col("latitude"))) * sf.sin(sf.radians(sf.col("lat_diff")))) + \
                ((sf.cos(sf.radians(sf.col("latitude"))) * sf.cos(sf.radians(sf.col("lat_diff")))) * \
                (sf.cos(sf.radians("longitude") - sf.radians("lng_diff"))))) * sf.lit(6371.0)), 4))
        .groupBy("bin","camis","latitude","longitude")
        .agg(sf.countDistinct(sf.when(sf.col("distance_in_kms") <= 0.10,
                sf.col("shelter_id")).otherwise(None)).alias("bus_100"),
            sf.countDistinct(sf.when(sf.col("distance_in_kms") <= 0.40,
                sf.col("shelter_id")).otherwise(None)).alias("bus_400"),
            sf.countDistinct(sf.when(sf.col("distance_in_kms") <= 1.00,
                sf.col("shelter_id")).otherwise(None)).alias("bus_1000")))
    #
    return df11_proximity_bus
#

In [None]:
def get_proximity_mta(
        df04_mta:pyspark.sql.DataFrame,
        df08_bankrupt_restaurants:pyspark.sql.DataFrame
        ) -> pyspark.sql.DataFrame:
    """
    Get proximity to MTA.
    """
    #
    tdf01_mta = (df04_mta
        .selectExpr("gtfs_stop_id","gtfs_latitude as lat_diff","gtfs_longitude as lng_diff"))
    df12_proximity_mta = (df08_bankrupt_restaurants
        .selectExpr("bin","camis","latitude","longitude")
        .join(tdf01_mta, how="cross")
        .withColumn("distance_in_kms", \
            sf.round((sf.acos((sf.sin(sf.radians(sf.col("latitude"))) * sf.sin(sf.radians(sf.col("lat_diff")))) + \
                ((sf.cos(sf.radians(sf.col("latitude"))) * sf.cos(sf.radians(sf.col("lat_diff")))) * \
                (sf.cos(sf.radians("longitude") - sf.radians("lng_diff"))))) * sf.lit(6371.0)), 4))
        .groupBy("bin","camis","latitude","longitude")
        .agg(sf.countDistinct(sf.when(sf.col("distance_in_kms") <= 0.10,
                sf.col("gtfs_stop_id")).otherwise(None)).alias("train_100"),
            sf.countDistinct(sf.when(sf.col("distance_in_kms") <= 0.40,
                sf.col("gtfs_stop_id")).otherwise(None)).alias("train_400"),
            sf.countDistinct(sf.when(sf.col("distance_in_kms") <= 1.00,
                sf.col("gtfs_stop_id")).otherwise(None)).alias("train_1000")))
    #
    return df12_proximity_mta
#

In [None]:
def get_all_features(
        spark:pyspark.sql.SparkSession,
        options:dict={}
        ) -> pd.DataFrame:
    """
    Running this function processes the raw datasets and saves the file as a CSV.
    """
    #
    start_time = time.time()
    #
    (df01_nyc_restaurant_inspection_manhattan, df02_buildings, df03_bus_stop_mn, df04_mta,
        df05_ridership, df06_aadt, df07_atvc) = read_data_raw(spark, options)
    df08_bankrupt_restaurants = get_bankrupt_restaurants(df01_nyc_restaurant_inspection_manhattan)
    df09_buildings_info = get_buildings_info(df02_buildings, df05_ridership, df06_aadt, df07_atvc)
    df10_proximity_restaurant = get_proximity_restaurant(df01_nyc_restaurant_inspection_manhattan, df08_bankrupt_restaurants)
    df11_proximity_bus = get_proximity_bus(df03_bus_stop_mn, df08_bankrupt_restaurants)
    df12_proximity_mta = get_proximity_mta(df04_mta, df08_bankrupt_restaurants)
    #
    df13_all_features = (df08_bankrupt_restaurants
        .join(df09_buildings_info, how="left", on=["bin"])
        .join(df10_proximity_restaurant, how="left", on=["bin","camis","latitude","longitude"])
        .join(df11_proximity_bus, how="left", on=["bin","camis","latitude","longitude"])
        .join(df12_proximity_mta, how="left", on=["bin","camis","latitude","longitude"])
        .selectExpr("bin","camis",
            "inspection_year_min as open_year","inspection_year_max as close_year","years_open",
            "flag_restaurant_one_year","latitude","longitude",
            "food_100","food_400","food_800","food_1000",
            "bus_100","bus_400","bus_1000",
            "train_100","train_400","train_1000",
            "office_area","retail_area","residential_area","street_width_min","street_width_max","posted_speed",
            "dist_station","dist_park","dist_school","office_450","retail_450","residential_450","ridership_morning_mean",
            "ridership_midday_mean","ridership_evening_mean","ridership_night_mean","ridership_late_night_mean",
            "idw_aadt_mean","idw_atvc_mean"))
    #
    cols_analysis = ["flag_restaurant_one_year","bin","camis","latitude","longitude",
        "open_year","close_year","years_open",
        "food_100","food_400","food_800","food_1000",
        "bus_100","bus_400","bus_1000","train_100","train_400","train_1000",
        "office_area","retail_area","residential_area","street_width_min","street_width_max","posted_speed",
        "dist_station","dist_park","dist_school","office_450","retail_450","residential_450",
        "ridership_morning_mean","ridership_midday_mean","ridership_evening_mean","ridership_night_mean","ridership_late_night_mean",
        "idw_aadt_mean","idw_atvc_mean"]
    #
    pdf13_all_features = (df13_all_features
        .toPandas()
        .loc[:, cols_analysis]
        .dropna()
        .reset_index(drop=True))
    str_date_today = options.get("date_today", datetime.date.today().strftime("%Y%m%d"))
    pdf13_all_features.to_csv("./../data/processed/{}_Survival_Features_Labeled.csv".format(str_date_today))
    #
    print("get_all_features | Done in {:.2f} s".format(time.time()-start_time))
    #
    return pdf13_all_features
#

In [None]:
def read_data_processed(
        options:dict={}
        ):
    """
    Read the training file and the test file.
    """
    #
    str_date_today = options.get("date_today", datetime.date.today().strftime("%Y%m%d"))
    pdf14_labeled = pd.read_csv("./../data/processed/{}_Survival_Features_Labeled.csv".format(str_date_today)).iloc[:, 1:]
    print("Columns Before Processing: ", list(pdf14_labeled.columns))
    tpdf01_label = pdf14_labeled.iloc[:, 0]
    tpdf02_features = pdf14_labeled.iloc[:, 8:]
    pdf14_labeled = pd.concat([tpdf01_label, tpdf02_features], axis=1)
    pdf14_labeled = pdf14_labeled.dropna().reset_index(drop=True)
    pdf14_labeled = pdf14_labeled.loc[pdf14_labeled["posted_speed"].apply(lambda x: pd.Series([x]).str.isnumeric()).iloc[:,0], :]
    pdf14_labeled = pdf14_labeled.reset_index(drop=True)
    #
    ### Clean column names
    pdf01_cols = list(pdf14_labeled.columns)
    pdf01_cols = [str(i).lower().replace(" ","_").replace(r"/","_").replace(",","").replace(".","_").replace("-","_") for i in pdf01_cols]
    pdf14_labeled.columns = pdf01_cols
    #
    print("")
    print("\nColumns After Processing: ", list(pdf14_labeled.columns))
    #
    return pdf14_labeled
#

In [None]:
def get_classification_model(
        pdf14_labeled:pd.DataFrame
        ) -> pd.DataFrame:
    """
    """
    #
    start_time = time.time()
    #
    X = pdf14_labeled.iloc[:, 1:].copy()
    y = pdf14_labeled.iloc[:, 0].copy()
    print("Features To Scale and Use: ")
    print(list(X.columns))
    print("")
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)
    #
    ### This section is built to select which features to use in the final model. ###
    scaler = StandardScaler()
    scaler.fit(X_train)
    X_train_scaled = scaler.transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    #
    ### Tune Hyperparameter
    print("Use Grid Search to Tune Hyperparameter.")
    model_forest = RandomForestClassifier(random_state = 25)
    gscv_rfc = GridSearchCV(model_forest, param_grid={"n_estimators":[4,16,256], "max_depth":[2,8,16]})
    gscv_rfc.fit(X_train_scaled, y_train)
    print("The Best Params Are: {}".format(gscv_rfc.best_params_))
    #
    model_forest = RandomForestClassifier(random_state = 25, 
        max_depth = gscv_rfc.best_params_["max_depth"], 
        n_estimators = gscv_rfc.best_params_["n_estimators"])
    model_forest.fit(X_train_scaled, y_train)
    feature_importances = model_forest.feature_importances_
    X_cols = list(X_train.columns)
    tpdf01_features = pd.DataFrame({"feature":X_cols, "score":feature_importances})
    tpdf01_features.sort_values(by=["score","feature"], ascending=[False,True], ignore_index=True, inplace=True)
    X_cols_sorted = list(tpdf01_features["feature"])
    best_score = 0
    best_features = 0
    print("")
    print("Find the Best Combination of Features")
    for idx in range(1,len(X_cols_sorted)+1):
        print("Processing Feature Number {} From {} Features".format(idx, len(X_cols_sorted)))
        i_cols = X_cols_sorted[:idx].copy()
        i_X_train_scaled = pd.DataFrame(X_train_scaled, columns=X.columns).loc[:, i_cols].copy()
        i_X_test_scaled = pd.DataFrame(X_test_scaled, columns=X.columns).loc[:, i_cols].copy()
        i_model_forest = RandomForestClassifier(random_state=25)
        i_model_forest.fit(i_X_train_scaled, y_train)
        i_predict_forest = i_model_forest.predict(i_X_test_scaled)
        i_score = accuracy_score(y_test, i_predict_forest)
        if i_score > best_score:
            best_score = i_score
            best_features = i_cols
        #
    print("")
    print("The Best Score Is: {:.2f}%".format(best_score*100))
    print("The Best Features Are:")
    print(best_features)
    print("")
    #################################################################################
    #
    ### This section is to take the learnings from the previous section to train the model. ###
    X_train_selected = X_train.loc[:, best_features].copy()
    X_test_selected = X_test.loc[:, best_features].copy()
    #
    scaler = StandardScaler()
    scaler.fit(X_train_selected)
    X_train_scaled_selected = scaler.transform(X_train_selected)
    X_test_scaled_selected = scaler.transform(X_test_selected)
    #
    model_forest = RandomForestClassifier(random_state = 25, 
        max_depth = gscv_rfc.best_params_["max_depth"], 
        n_estimators = gscv_rfc.best_params_["n_estimators"])
    model_forest.fit(X_train_scaled_selected, y_train)
    predict_forest = model_forest.predict(X_test_scaled_selected)
    print("Final Accuracy Score Is: {:.2f}%".format(accuracy_score(y_test, predict_forest)*100))
    print("Confusion Matrix:")
    print(confusion_matrix(y_test, predict_forest))
    print("")
    #
    ### Create DataFrame to Show Original Values
    X_selected_scaled = StandardScaler().fit_transform(X.loc[:, best_features])
    X_selected_scaled = pd.DataFrame(X_selected_scaled, columns=best_features)
    y_predict = model_forest.predict(X_selected_scaled)
    pdf15_classification_model = X.copy().loc[:, best_features]
    pdf15_classification_model["flag_restaurant_one_year"] = y
    pdf15_classification_model["flag_restaurant_one_year_predict"] = y_predict
    print("Accuracy Score On Original Dataset Is: {:.2f}%".format(accuracy_score(y,y_predict)*100))
    print("")
    #
    with open("./../models/scaler.pkl","wb") as f:
        pickle.dump(scaler, f)
    print("Done with exporting scaler as a pickle file.")
    with open("./../models/model_forest.pkl","wb") as f:
        pickle.dump(model_forest, f)
    print("Done with exporting model pickle file.")
    print("")
    ###########################################################################################
    #
    print("Done in {:.2f} s".format(time.time()-start_time))
    #
    return pdf15_classification_model
#

In [None]:
def run_restaurant_survival_prediction(
        ):
    """
    For more information, visit:
    """
    #
    start_time = time.time()
    #
    options = {}
    options["date_today"] = datetime.date.today().strftime("%Y%m%d")
    pdf13_all_features = get_all_features(spark, options)
    pdf14_labeled = read_data_processed()
    pdf15_classification_model = get_classification_model(pdf14_labeled)
    #
    pdf15_classification_model.to_csv("./../data/processed/survival_predicted.csv")
    #
    print("run_restaurant_survival_prediction | Done in {:.2f} s".format(time.time()-start_time))
    #
    return pdf15_classification_model
#

In [None]:
pdf15_classification_model = run_restaurant_survival_prediction()
#