In [None]:
import pandas as pd
import numpy as np
import snorkel
import matplotlib.pyplot as plt
from snorkel.labeling import labeling_function
from snorkel.labeling import LFAnalysis
from snorkel.labeling.model import MajorityLabelVoter
from snorkel.labeling import PandasLFApplier
from snorkel.labeling.model import LabelModel
import seaborn as sns

: 

In [12]:
# read in the training data
df_train = pd.read_csv("data/training_set_seattle.csv")
df_train.head()

# read in gt.csv file
df_test = pd.read_csv('data/test_set_seattle.csv')
Y_test = df_test.verified.values

In [13]:
label_types = df_train ['label_type'].unique().tolist()
label_types

['CurbRamp',
 'Obstacle',
 'SurfaceProblem',
 'NoSidewalk',
 'NoCurbRamp',
 'Occlusion',
 'Other',
 'Signal',
 'Crosswalk']

Labeling Functions

In [42]:
# Define the label mappings for convenience
NOT_SURE = -1
WRONG = 0
CORRECT = 1

@labeling_function()
def intersection(x):
    if x["way_type"] =='residential' and (x["intersection_distance"] >= 70 or 0 < x["intersection_distance"] <= 10):
        return WRONG 
    elif x["way_type"] =='living_street' and (x["intersection_distance"] >= 70 or 0 < x["intersection_distance"] <= 10):    
        return WRONG
    else:
        return NOT_SURE

    
#check if the label is in the cluster
@labeling_function()
def clustered(x):
    if x["label_type"] =='NoSidewalk' and x["count"] <= 2:
        return WRONG
    elif x["label_type"] =='NoCurbRamp' and x["count"] <= 4:
        return WRONG 
    elif x["label_type"] =='CurbRamp' and x["count"] > 5:
        return CORRECT
    else:
        return NOT_SURE
    # return CORRECT if x['count'] >= 2 else NOT_SURE


#check if it is a severity 4 or higher
@labeling_function()
def severity(x):
    if x["label_type"] =='NoSidewalk' and x["severity"] < 3:
        return WRONG
    elif x["label_type"] =='NoCurbRamp' and x["severity"] <= 2:
        return WRONG
    elif x["label_type"] =='CurbRamp' and x["severity"] >= 4:
        return WRONG
    elif (x["label_type"] =='Obstacle' or x["label_type"] =='SurfaceProblem') and x["severity"] >= 4:
        return CORRECT
    else:
        return NOT_SURE
    return CORRECT if x["severity"] >= 4 else NOT_SURE


#check if the user zoomed in
@labeling_function()
def zoom(x):
    # x in this case is a row of the dataframe
    if x["zoom"] > 2:
        return CORRECT
    elif x["zoom"] == 1:
        return WRONG
    else:
        return NOT_SURE


#check if the user has put a tag
@labeling_function()
def tags(x):
    if x["label_type"] =='NoSidewalk' and x["tag_list"] == 1:
        return CORRECT
    elif x["label_type"] =='NoCurbRamp' and x["tag_list"] == 1:
        return CORRECT
    elif x["label_type"] =='NoCurbRamp' and x["tag_list"] == 1:
        return CORRECT
    else:
        return NOT_SURE
    return CORRECT if x['tag_list']  == 1 else NOT_SURE


#check if the user has put a comment
@labeling_function()
def description(x):
    # x in this case is a row of the dataframe
    return CORRECT if x['description']  == 1 else NOT_SURE

@labeling_function()
def distance(x):
    # x in this case is a row of the dataframe
    if x["label_type"] =='NoSidewalk' and x["distance"] < 10:
        return WRONG
    elif x["label_type"] =='Obstacle' and x["distance"] < 10:
        return WRONG  
    else:
        return NOT_SURE

lfs = [
    intersection, 
    clustered,
    severity,
    zoom,
    tags,
    description,
    distance
    ]

In [43]:
applier = PandasLFApplier(lfs=lfs)
summary = pd.DataFrame(columns=['label_type', 'precision'])

for i in range(0,5):
    L_train = applier.apply(df=df_train[df_train['label_type'] == label_types[i]])
    L_test = applier.apply(df=df_test[df_test['label_type'] == label_types[i]])
    Y_test = df_test[df_test['label_type'] == label_types[i]].verified.values
    # print(f"Labeling function analysis for {label_types[i]}")
    # print(LFAnalysis(L=L_train, lfs=lfs).lf_summary())
    label_model = LabelModel(cardinality=2, verbose=True)
    label_model.fit(L_train=L_train, n_epochs=500, log_freq=100, seed=123)
    label_model_acc = label_model.score(L=L_test, Y=Y_test, metrics=["precision"],tie_break_policy="random")[
    "precision"
]
    # print(f"{'Label Model Accuracy for:':<25} {label_model_acc * 100:.1f}%")
    summary = summary.append({'label_type': label_types[i], 'precision': round(label_model_acc,4)}, ignore_index=True)

100%|██████████| 70690/70690 [00:05<00:00, 12239.60it/s]
100%|██████████| 5333/5333 [00:00<00:00, 12493.62it/s]
INFO:root:Computing O...
INFO:root:Estimating \mu...
  0%|          | 0/500 [00:00<?, ?epoch/s]INFO:root:[0 epochs]: TRAIN:[loss=0.247]
INFO:root:[100 epochs]: TRAIN:[loss=0.000]
 36%|███▌      | 178/500 [00:00<00:00, 1767.07epoch/s]INFO:root:[200 epochs]: TRAIN:[loss=0.000]
INFO:root:[300 epochs]: TRAIN:[loss=0.000]
 71%|███████   | 355/500 [00:00<00:00, 1740.96epoch/s]INFO:root:[400 epochs]: TRAIN:[loss=0.000]
100%|██████████| 500/500 [00:00<00:00, 1752.92epoch/s]
INFO:root:Finished Training
100%|██████████| 10103/10103 [00:00<00:00, 14368.82it/s]
100%|██████████| 2909/2909 [00:00<00:00, 14090.81it/s]
INFO:root:Computing O...
INFO:root:Estimating \mu...
  0%|          | 0/500 [00:00<?, ?epoch/s]INFO:root:[0 epochs]: TRAIN:[loss=0.252]
INFO:root:[100 epochs]: TRAIN:[loss=0.000]
 36%|███▌      | 179/500 [00:00<00:00, 1777.01epoch/s]INFO:root:[200 epochs]: TRAIN:[loss=0.000]
I

In [44]:
summary

Unnamed: 0,label_type,precision
0,CurbRamp,0.9515
1,Obstacle,0.6491
2,SurfaceProblem,0.9003
3,NoSidewalk,0.9545
4,NoCurbRamp,0.9154
