# CLIP for bike lane detection

I'm using the CLIP model to detect types of bike lanes. So far, performance is bad: With 4 columns to predict, I'm getting the right prediction <50% of the time. Parkways are performing ok (>80%), but especially standard bike lanes have really low prediction accuracy (<20%).

I'm not using this for analysis, but I'm working on making this perform better.

In [114]:
from PIL import Image
import requests
from transformers import CLIPProcessor, CLIPModel
import pandas as pd
import numpy as np

In [115]:
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Outline

1. Get list of images (useful, coded)
- Can I predict whether bike lane is visible?
2. Open coded images
3. Can I predict:
- bike lane type
- Whether it's obstructed?

In [116]:
df = pd.read_csv('data/bike_coding_sheet_download.csv')

with open('data/ids_reviewed.txt', 'r') as f:
    ids_reviewed = f.read().split('\n')
# TUrn into int
ids_reviewed = [int(i) for i in ids_reviewed if i != '']
len(set(ids_reviewed))

2723

In [135]:
df_coded = pd.read_csv('data/bike_coding_sheet_download.csv')
df_coded = df_coded[df_coded.id.notnull()].copy()
df_coded.id = df_coded.id.astype(int)
df_coded.columns = [c.replace(' ', '_').replace('?', '') for c in df_coded.columns]

cols_protected = ['through_parking', 'plastic_stoppers', 'white_separation', 'stone']
obstructions = ['construction', 'cars_standing', 'cars_moving', 'humans']
cols_lanes = ['sharrow', 'bike_lane', 'parkway']
cols = obstructions + cols_protected + obstructions + ['useful'] + cols_lanes

bad_vals = ['?', '´']
# Remove bad_vals for all cols in double loop
for c in cols:
    for bad_val in bad_vals:
        df_coded[c] = df_coded[c].astype(str).str.replace(bad_val, '0')
        
for c in cols:
    df_coded[c] = df_coded[c].fillna('0').astype(float).fillna(0).astype(int).astype(bool)

df_coded['neither'] = ~df_coded[cols_protected].apply(max, axis=1).astype(bool)

lanes_dict = {
    0: 'None',
    1: 'Sharrows',
    2: 'Standard',
    3: 'Protected Path',
    4: 'Greenway'
}

# Invert the dictionary
lanes_dict_inv = {v:k for k,v in lanes_dict.items()}
cols_lanes = ['tf_facilit', 'ft_facilit']

# Fill the lanes with the values from lanes_dict_inv
# Any value not in that dict will be coded as 0
for c in cols_lanes:
    col = df_coded[c].copy()
    col = [lanes_dict_inv.get(v, 0) for v in col]
    df_coded[c + '_int'] = col

    # Any value from the initial column that is not in the lanes_dict_inv will be coded as 0
    
df_coded['lane_max'] = df_coded[[c + '_int' for c in cols_lanes]].apply('max', axis=1)
df_coded['lane_max'] = df_coded['lane_max'].replace(lanes_dict)
# df_coded = df_coded.query('~parkway').copy()

# In column names replace all spaces with underscores
df_coded.columns = [c.replace(' ', '_').replace('?', '') for c in df_coded.columns]

ids_useful = list(df_coded.query('useful').id.unique())

cols_protection = ['through_parking', 'plastic_stoppers', 'white_separation', 'stone']
df_coded['has_protection'] = df_coded[cols_protection].apply('max', axis=1)
df_coded['bike_standard'] = df_coded.bike_lane & ~df_coded.has_protection

df_coded[['bike_standard', 'bike_lane']].mean()

bike_standard    0.123762
bike_lane        0.166117
dtype: float64

In [136]:
# Use standard python import libraries
image_filenames = os.listdir('data/images')

image_filenames_simple = [f.split('/')[-1].split('.')[0] for f in image_filenames]
image_ids = [f.split('____')[1] for f in image_filenames_simple if '____' in f]

len(image_filenames), len(set(image_ids))

(5403, 5402)

In [137]:
ids_all = list(set(list(df_coded['id'].unique()) + ids_reviewed))

# Get the filenames in the `data/images` and split by (sequence____id):
import os

# Use standard python import libraries
image_filenames = os.listdir('data/images')

image_filenames_simple = [f.split('/')[-1].split('.')[0] for f in image_filenames]
filenames_full = {f.split('____')[1]: f for f in image_filenames_simple if '____' in f}
image_ids = list(filenames_full.keys())

def get_image_from_id(id, prefer_smaller=False, scale_factor=4):
    # Test if this image exists in the ``data/images_small` folder and if so return it (if prefer_smaller):
    filename_small = f'data/images_small/{id}.jpg'
    if prefer_smaller:
        try:
            return Image.open(filename_small)
        except:
            pass
    
    # Get the filename from the id
    try:
        filename = [f for f in image_filenames if str(id) in f][0]
        # Open the image
        image = Image.open(f'data/images/{filename}')
        
        scale_factor = 4
        # Scale down by a factor of 4
        image = image.resize((int(image.width / scale_factor), int(image.height / scale_factor)))
        
        # Store this smaller image in the `data/images_small` folder
        image.save(filename_small)
        return image
    except:
        return None

In [210]:
renamer_match_columns = {
    'bike_standard': 'bike_standard',
    # 'white_separation': 'bike_white_separation',
    'plastic_stoppers' : 'bike_plastic_stoppers',
    'sharrow' : 'bike_sharrow',
    'parkway': 'bike_parkway'
}

In [211]:
df_all = pd.DataFrame(index=ids_all)
df_all['is_useful'] = df_all.index.isin(ids_useful)
df_all.is_useful.mean()


# Sample the data so that it's 50% useful and 50% not useful
df_all = df_all.sample(frac=1, random_state=42)
df_all = df_all.groupby('is_useful').head(df_all.is_useful.sum())

df_all['filename'] = df_all.index.map(lambda id: filenames_full.get(str(id), None))

def get_prompt(what_of): return f"a photo {what_of}"

prompt_templates = {
    "bike_free": "of a bike lane that is free of obstructions",
    "bike_blocked":  "of a bike lane that is obstructed",
    "car_lane": "of a car lane",
    "dark": "in the dark",
    "no_lane": "without car lane",
    "poor": "with poor quality"
}

prompt_templates_within = {
    "bike_standard": "of a standard unprotected bike line",
    "bike_plastic_stoppers": "of a bike line where bikes are separated from cars by plastic bollards",
    "bike_parkway": "of a parkway suitable for bikes (no cars allowed)",
    "bike_sharrow": "of a bike lane that is a sharrow (shared with cars)",
    "bike_none": "of no bike lane",
    "bike_other": "any other type of bike lane"
}

def seed_model(prompt_templates):
    
    prompts = {k: get_prompt(v) for k,v in prompt_templates.items()}
    
    def get_bike_probs(image):
        inputs = processor(text=list(prompts.values()), images=image, return_tensors="pt", padding=True)
        outputs = model(**inputs)
        logits_per_image = outputs.logits_per_image
        return logits_per_image.softmax(dim=1)
    
    return get_bike_probs

In [None]:
ids_prob =  df_all.query('is_useful')[df_all.bike_standard.notnull()].sample(1).index
prompt_image = prompts_used
id = ids_prob[0]
row = df_all.loc[id]
image = get_image_from_id(id)
if image:
    probs = seed_model(prompt_image)(image)
    probs = pd.Series(probs[0].detach().numpy() * 100).astype(int)
    probs.index = [p[0:40] for p in prompt_image]
    print(probs)
image

In [230]:
num_images = 200

sample_useful = True

prompts_used = prompt_templates_within
for prompt in prompts_used.keys():
    if prompt not in df_all.columns:
        df_all[prompt] = np.nan

for i in range(num_images):
    if i % 50 == 0:
        print(i)
    id = df_all[df_all[prompt].isna()]
    if sample_useful:
        id = id.query('is_useful')
    if len(id) == 0:
        break
    id = id.sample(1).index[0]
    image = get_image_from_id(id)
    if not image:
        continue
    probs = seed_model(prompts_used)(image)
    # Print id and probabilities
    # Turn the probs into columns based on the keys
    for i, k in enumerate(prompts_used.keys()):
        df_all.loc[id, k] = probs[0, i].item()

0
50
100
150


# Performance

In [232]:
renamer_clip = {k: f"{k}_clip" for k in prompt_templates_within.keys()}
df_clip_predictions = df_all.rename(columns=renamer_clip)
df_clip_predictions = df_clip_predictions[df_clip_predictions.bike_standard_clip.notnull()]
print(len(df_clip_predictions))
df_coded_matched = df_coded.rename(columns=renamer_match_columns)
df_matched = df_clip_predictions.merge(df_coded_matched, left_index=True, right_on='id')
cols_compare = list(renamer_match_columns.values())
cols_all = list(renamer_clip.values()) + cols_compare

cols_predictions = renamer_clip.values()
df_matched['prediction'] = df_matched[cols_predictions].idxmax(axis=1).str.replace('_clip', '')

# The prediction is correct if the column corresponding to the predicted
# value is true
df_matched['prediction_correct'] = df_matched.apply(lambda row: row.get(row.prediction, False), axis=1)

means = {}
for col in cols_compare:
    col_predict = renamer_clip[col]
    means[col] = df_matched.groupby(col)[col_predict].mean()
    
min_max = min(df_matched[cols_compare].sum())
print(min_max)

df_matched_sampled = []
for col in cols_compare:
    df_matched_sampled.append(df_matched.query(f'{col}').sample(min_max))
    df_matched_sampled.append(df_matched.query(f'~{col}').sample(min_max))
df_matched_sampled = pd.concat(df_matched_sampled)

df_matched_sampled[cols_compare].mean()

df_matched_sampled[cols_compare + ['prediction', 'prediction_correct']].sample(10)

pd.set_option('display.precision', 2)
perf = (pd.DataFrame({col: df_matched_sampled.groupby(col)['prediction_correct'].mean() for col in cols_compare}).T * 100).astype(int)
perf
perf.columns = ['percent_correct_if_false', 'percent_correct_if_true']
perf['number_of_true_obs'] = df_matched_sampled[cols_compare].sum()
perf

478
23


Unnamed: 0,percent_correct_if_false,percent_correct_if_true,number_of_true_obs
bike_standard,30,0,46
bike_plastic_stoppers,26,6,32
bike_sharrow,15,55,34
bike_parkway,12,69,33


# Appendix (scratch work)

In [356]:
# import statsmodels formula
import statsmodels.formula.api as smf

# create a fitted model

#df_merged['car_obstruction_int']

model_reg = smf.ols(formula='I(1*car_obstruction) ~ bike_lane + sharrow + through_parking + white_separation + bike_blocked', data=df_merged).fit()
model_reg.summary()

0,1,2,3
Dep. Variable:,I(1 * car_obstruction),R-squared:,0.058
Model:,OLS,Adj. R-squared:,0.05
Method:,Least Squares,F-statistic:,7.47
Date:,"Sun, 19 Nov 2023",Prob (F-statistic):,8.13e-07
Time:,01:23:01,Log-Likelihood:,-209.04
No. Observations:,614,AIC:,430.1
Df Residuals:,608,BIC:,456.6
Df Model:,5,,
Covariance Type:,nonrobust,,

0,1,2,3,4,5,6
,coef,std err,t,P>|t|,[0.025,0.975]
Intercept,0.0019,0.056,0.034,0.973,-0.109,0.112
bike_lane[T.True],0.0443,0.032,1.372,0.171,-0.019,0.108
sharrow[T.True],0.1526,0.040,3.772,0.000,0.073,0.232
through_parking[T.True],-0.1048,0.067,-1.558,0.120,-0.237,0.027
white_separation[T.True],-0.0932,0.039,-2.399,0.017,-0.169,-0.017
bike_blocked,0.1729,0.078,2.224,0.026,0.020,0.325

0,1,2,3
Omnibus:,201.479,Durbin-Watson:,1.905
Prob(Omnibus):,0.0,Jarque-Bera (JB):,440.086
Skew:,1.852,Prob(JB):,2.73e-96
Kurtosis:,4.868,Cond. No.,9.04


In [305]:
# Collect the images with a useful_probability
ids_prob = list(df_all[df_all.bike.notnull()].index)

# Load the images into an array called images
images = []
for id in ids_prob:
    image = get_image_from_id(id, prefer_smaller=True)
    # Scale the image down by a factor of 4
    # Only keep the image if width-heigh ratio is between 1 and 2
    if image.width / image.height < 2 and image.width / image.height > 1:
        image = image.resize((image.width // 4, image.height // 4))
        images.append(image)
    
# Create labels for the images that include the columns is_useful and the probability
# of being useful
labels = []
for id in ids_prob:
    label = df_all.loc[id, ['is_useful', 'bike_blocked', 'bike_free']]
    # Turn it into a dict, without storing the index values
    label = label.to_dict()
    # The probability should be in percent, without digits
    label['bike_blocked'] = int(label['bike_blocked'] * 100)
    label['bike_free'] = int(label['bike_free'] * 100)
    label['id'] = id
    # Turn the label into a string.
    # This will be displayed under the image
    label = str(label)
    labels.append(label)
    
img_width = 300
max_images = 20
ipyplot.plot_images(images, labels=labels, img_width=img_width, max_images=max_images)

  return np.asarray(seq, dtype=type(seq[0]))


In [66]:
import ipyplot