# Run this section to obtain the data

## Web-scrapping Wheelchair Data

In [None]:
!apt update
!apt install chromium-chromedriver
!pip install selenium

from selenium import webdriver

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connected to cloud.r-project.org (52.85.151.8)] [Connecting to ppa.launchp[0m                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
                                                                                                    Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
[33m0% [2 InRelease 59.1 kB/119 kB 50%] [Connected to cloud.r-project.org (52.85.151.8)] [Connecting to [0m[33m0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.8[0m                                                                                                    Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
[33m0% [Waiting for headers] [Connected to ppa.lau

In [None]:
!pip install chromedriver-autoinstaller



In [None]:
import sys
import chromedriver_autoinstaller

from selenium.webdriver.chrome.service import Service

sys.path.insert(0,'/usr/lib/chromium-browser/chromedriver')

service = Service(executable_path=r'/usr/bin/chromedriver')

options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')

chromedriver_autoinstaller.install()

driver = webdriver.Chrome(options=options)

Web scrapping using selenium and google images

In [None]:
from selenium.webdriver.common.by import By
import time

driver.get("https://images.google.com")

search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys("wheelchair crossing street")
search_box.submit()

time.sleep(5)

last_height = driver.execute_script("return document.body.scrollHeight")

while True:
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

    time.sleep(5)

    new_height = driver.execute_script("return document.body.scrollHeight")
    if new_height == last_height:
        break
    last_height = new_height

images = driver.find_elements(By.TAG_NAME, 'img')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Display images to check and see web scrapping was successful.

In [None]:
from IPython.display import Image, display

for image in images[3:]:
    url = image.get_attribute('src')
    if url:
        display(Image(url=url))


\# of images should equal 119

In [None]:
len(images)

## Pedestrian/Street Dataset

In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
import dask.dataframe as dd

keras.utils.set_random_seed(42)

In [None]:
!wget -q -P ./ https://www.dropbox.com/scl/fi/txd5rvetipwgmwxbju7l3/pedestrian.zip?rlkey=bxatdudux7m3mhj4s8b6knmor&dl=0
!unzip -qq pedestrian.zip?rlkey=bxatdudux7m3mhj4s8b6knmor&dl=0

Displaying 5 random images from our other dataset to make sure it was loaded properly

In [None]:
import os
import random

image_folder = '/content/images'

image_files = [f for f in os.listdir(image_folder) if os.path.isfile(os.path.join(image_folder, f))]

random.shuffle(image_files)

random_sample = image_files[:len(images)]

for image_file in random_sample[:5]:
    image_path = os.path.join(image_folder, image_file)
    print(image_file)
    display(Image(filename=image_path))


Put the images of no wheel chairs into the nowheelchair folder

In [None]:
import os
import shutil

# new folder name
new_folder_name = 'train'

# new folder path
new_folder_path = os.path.join('/content', new_folder_name)

# Create the folder
os.makedirs(new_folder_path, exist_ok=True)

# Iterate and copy each file to new folder
for image_file in random_sample:
    source_path = os.path.join(image_folder, image_file)
    destination_path = os.path.join(new_folder_path, image_file)
    shutil.copy(source_path, destination_path)

print(f"Success! Copied {len(random_sample)} images to {new_folder_path}")

Success! Copied 826 images to /content/test


## Putting images of wheelchair into file

In [None]:
import requests

folder_name = 'wheelchair'

if not os.path.exists(folder_name):
    os.makedirs(folder_name)

for i, image in enumerate(images):
    url = image.get_attribute('src')

    if url is None:
      print(f"Skipping image {i} as it has no src")
      continue

    if url.startswith('data:image/jpeg;base64,'):
        import base64

        image_data = base64.b64decode(url.split('base64,')[1])

        with open(os.path.join(folder_name, f'image_{i}.jpg'), 'wb') as file:
            file.write(image_data)
    elif url.startswith('http'):
        try:
            response = requests.get(url, stream=True)

            if response.status_code == 200:
                with open(os.path.join(folder_name, f'image_{i}.jpg'), 'wb') as file:
                    for chunk in response.iter_content(1024):
                        file.write(chunk)
        except requests.exceptions.RequestException as e:
            print(f'Error downloading image {i}: {e}')

In [None]:
folder_name = 'wheelchair'

files = os.listdir(folder_name)

num_files = len([file for file in files if os.path.isfile(os.path.join(folder_name, file))])

print(f"There are {num_files} files in the {folder_name} folder.")

In [None]:
import os, shutil, pathlib

base_dir = pathlib.Path("/content")

In [None]:
for category in ('nowheelchair', 'wheelchair'):
    category_dir = base_dir / category
    fnames = [fname for fname in os.listdir(category_dir) if os.path.isfile(category_dir / fname)]

    train_dir = base_dir / 'train' / category
    os.makedirs(train_dir, exist_ok=True)
    for fname in fnames[:150]:
        shutil.copyfile(src=category_dir / fname, dst=train_dir / fname)

    validation_dir = base_dir / 'validation' / category
    os.makedirs(validation_dir, exist_ok=True)
    for fname in fnames[150:200]:
        shutil.copyfile(src=category_dir / fname, dst=validation_dir / fname)

    test_dir = base_dir / 'test' / category
    os.makedirs(test_dir, exist_ok=True)
    for fname in fnames[200:]:
        shutil.copyfile(src=category_dir / fname, dst=test_dir / fname)

In [None]:
!zip -r train.zip train/
!zip -r validation.zip validation/
!zip -r test.zip test/

  adding: test/ (stored 0%)
  adding: test/1478898286321019286.jpg (deflated 0%)
  adding: test/1479504310867549548.jpg (deflated 0%)
  adding: test/1479503822835527366.jpg (deflated 1%)
  adding: test/1478901102208032382.jpg (deflated 1%)
  adding: test/1479501623686090169.jpg (deflated 1%)
  adding: test/1478898246901787801.jpg (deflated 0%)
  adding: test/1479499386033711114.jpg (deflated 0%)
  adding: test/1479502149216449971.jpg (deflated 1%)
  adding: test/1478896601017813080.jpg (deflated 0%)
  adding: test/1478897340836282941.jpg (deflated 0%)
  adding: test/1478899204954962350.jpg (deflated 1%)
  adding: test/1479503861836342840.jpg (deflated 0%)
  adding: test/1479505394941576614.jpg (deflated 0%)
  adding: test/1479502839260453337.jpg (deflated 0%)
  adding: test/1479504424874897131.jpg (deflated 0%)
  adding: test/1478901466691190016.jpg (deflated 0%)
  adding: test/1479498923503466825.jpg (deflated 0%)
  adding: test/1479503087284166574.jpg (deflated 0%)
  adding: test/147

In [None]:
import os

validation_dir = 'train'

image_count = 0

for subdir in os.listdir(validation_dir):
    subdir_path = os.path.join(validation_dir, subdir)

    if os.path.isdir(subdir_path):
        image_count += len([file for file in os.listdir(subdir_path) if os.path.isfile(os.path.join(subdir_path, file))])

print(f'There are {image_count} images in the train directory.')

There are 202 images in the train directory.


In [None]:
import os

validation_dir = 'validation'

image_count = 0

for subdir in os.listdir(validation_dir):
    subdir_path = os.path.join(validation_dir, subdir)

    if os.path.isdir(subdir_path):
        image_count += len([file for file in os.listdir(subdir_path) if os.path.isfile(os.path.join(subdir_path, file))])

print(f'There are {image_count} images in the validation directory.')


There are 70 images in the validation directory.


In [None]:
import os

validation_dir = 'test'

image_count = 0

for subdir in os.listdir(validation_dir):
    subdir_path = os.path.join(validation_dir, subdir)

    if os.path.isdir(subdir_path):
        image_count += len([file for file in os.listdir(subdir_path) if os.path.isfile(os.path.join(subdir_path, file))])

print(f'There are {image_count} images in the test directory.')

There are 0 images in the test directory.


# Previous

##Previous method

In [None]:
train_labels = pd.read_csv("labels_train.csv")
test_labels = pd.read_csv("labels_val.csv")
import pandas as pd
first_118_images = train_labels.iloc[:118]

## Data Preprocessing

In [None]:
def preprocess_img(img_path):
  img_raw = tf.io.read_file("images/" + img_path)
  img_tensor = tf.image.decode_jpeg(img_raw, channels=3)
  img_tensor = tf.image.resize(img_tensor, [224, 224])  # Adjust size as needed
  return (img_tensor / 255.0).numpy()

In [None]:
train_labels["class_id"].value_counts()

1    101314
5     12700
3     10637
2      6313
4      1442
Name: class_id, dtype: int64

In [None]:
train_labels_people = train_labels[train_labels["class_id"] == 3]
ids_people = set(train_labels_people["frame"])
ids_no_people = set(train_labels["frame"]) - ids_people

In [None]:
train_labels_people

Unnamed: 0,frame,xmin,xmax,ymin,ymax,class_id
1,1478019952686311006.jpg,437,454,120,186,3
63,1478019961680640592.jpg,264,271,139,155,3
69,1478019962181150666.jpg,268,275,138,156,3
76,1478019962681840550.jpg,271,280,137,156,3
82,1478019963181283434.jpg,278,287,138,158,3
...,...,...,...,...,...,...
132291,1479506168470775690.jpg,232,243,140,163,3
132302,1479506168990704654.jpg,225,241,133,154,3
132314,1479506169996887187.jpg,263,282,127,161,3
132326,1479506170991708816.jpg,344,375,105,168,3


In [None]:
no_people_df = train_labels[train_labels["frame"].isin(list(ids_no_people))]
no_people_df

Unnamed: 0,frame,xmin,xmax,ymin,ymax,class_id
2,1478019953180167674.jpg,218,231,146,158,1
3,1478019953689774621.jpg,171,182,141,154,2
4,1478019953689774621.jpg,179,191,144,155,1
5,1478019953689774621.jpg,206,220,145,156,1
6,1478019953689774621.jpg,385,420,122,152,1
...,...,...,...,...,...,...
132401,1479506176491553178.jpg,166,186,139,156,1
132402,1479506176491553178.jpg,182,204,142,153,1
132403,1479506176491553178.jpg,239,261,139,155,1
132404,1479506176491553178.jpg,259,280,139,157,1


In [None]:
no_people_df = no_people_df[:11_000]
final_df = pd.concat([no_people_df, train_labels_people])

In [None]:
final_df.shape

(21637, 6)

#### Conversion to Pixels

In [None]:
### X and y splitting
X_train = final_df[["frame"]]
y_train = final_df[["class_id", "xmin", "ymin", "xmax", "ymax"]]

In [None]:
### ONLY RUN THIS IF FILE NOT SAVED
dask_df = dd.from_pandas(X_train, npartitions=10)  # Adjust npartitions based on your dataset and available cores

# Apply the function in parallel
result = dask_df['frame'].map_partitions(lambda df: df.apply(preprocess_img),
                                         meta=pd.Series(dtype=object)).compute(scheduler='processes')

# Convert back to Pandas DataFrame if needed
X_train["frame"] = result

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_train["frame"] = result


In [None]:
#X_train = pd.read_csv("X_train_hodl_preprocessed.csv")

In [None]:
X_train = X_train["frame"].to_numpy()

In [None]:
X_train = np.stack(X_train)

In [None]:
X_train.shape

(21637, 224, 224, 3)

In [None]:
y_train_pedestrians = y_train.copy()
### Make y-binary, detect other objects later
y_train_pedestrians["class_id"] = y_train_pedestrians["class_id"].apply(lambda x: 1 if x == 3 else 0)

In [None]:
y_train_pedestrians["class_id"].value_counts()

0    11000
1    10637
Name: class_id, dtype: int64

In [None]:
y_class = y_train_pedestrians["class_id"]
y_bbox = y_train_pedestrians[y_train_pedestrians.columns[1:]]
y_bbox.columns

Index(['xmin', 'ymin', 'xmax', 'ymax'], dtype='object')

In [None]:
y_train = {
    "class_output": y_class,
    "bbox_output": y_bbox
}

## Baseline Architecture Modelling

In [None]:
input = keras.Input(shape=(224,224,3))

### CNN layer 1
x = keras.layers.Conv2D(64, kernel_size = (2, 2), activation = "relu")(input) #Was 128
x = keras.layers.MaxPool2D()(x)

### CNN layer 2
x = keras.layers.Conv2D(64, kernel_size = (2, 2), activation = "relu")(x)
x = keras.layers.MaxPool2D()(x)

### Output Layers
x = keras.layers.Flatten()(x)
classification_output = keras.layers.Dense(1, activation='sigmoid', name='class_output')(x)
bbox_output = keras.layers.Dense(4, activation='linear', name='bbox_output')(x)

model = keras.models.Model(inputs=input, outputs=[classification_output, bbox_output], name= "baseline_model")

model.compile(optimizer='adam',
              loss={'class_output': 'binary_crossentropy', 'bbox_output': 'mse'},
              metrics={'class_output': 'accuracy', 'bbox_output': 'mse'})

model.summary()


NameError: name 'keras' is not defined

In [None]:
model.compile(optimizer='adam',
              loss={'class_output': 'binary_crossentropy', 'bbox_output': 'mse'},
              metrics={'class_output': 'accuracy', 'bbox_output': 'mse'})

model_history = model.fit(
    X_train, y_train,
    epochs = 5, batch_size = 50, #was 20 epochs
    validation_split = 0.20)

NameError: name 'model' is not defined

## Data Augmentation for Images


In [None]:
def augment_data(image):
  x = keras.layers.RandomFlip("horizontal")(image)
  x = keras.layers.RandomRotation(0.1)(x)
  x = keras.layers.RandomZoom(0.2)(x)
  return x

In [None]:
input = keras.Input(shape=(224,224,3))

# we insert the data augmentation layers here
h = keras.layers.RandomFlip("horizontal")(input)
h = keras.layers.RandomRotation(0.1)(h)
h = keras.layers.RandomZoom(0.2)(h)

# rest of the model is the same as before

h = keras.layers.Rescaling(1./255)(h)


# first convolutional block
h = keras.layers.Conv2D(32,                    # the number of filters
                        kernel_size=(2, 2),    # the shape of each filter
                        activation="relu",
                        name="Conv_1")(h)
h = keras.layers.MaxPool2D()(h)


# second convolutional block
h = keras.layers.Conv2D(32,                    # the number of filters
                        kernel_size=(2, 2),    # the shape of each filter
                        activation="relu",
                        name="Conv_2")(h)
h = keras.layers.MaxPool2D()(h)

# flatten layer
h = keras.layers.Flatten()(h)

# output layer
output = keras.layers.Dense(1, activation="sigmoid")(h)

model = keras.Model(input, output)

In [None]:
model.summary()

In [None]:
model.compile(loss='binary_crossentropy',
             optimizer='adam',
             metrics=['accuracy'])

In [None]:
history = model.fit(train_dataset,
                    epochs=10,
                    validation_data=validation_dataset)

In [None]:
plot_loss_curves(history)

In [None]:
plot_acc_curves(history)

In [None]:
model.evaluate(test_dataset)

##ResNet50

In [None]:
resnet50_base = keras.applications.ResNet50(
  weights='imagenet',
  include_top=False,
  input_shape=(224, 224, 3))

In [None]:
resnet50_base.summary()

In [None]:
keras.utils.plot_model(resnet50_base)

In [None]:
def get_features_and_labels(dataset):
  all_features = []
  all_labels = []
  for images, labels in dataset:
    preprocessed_images = keras.applications.resnet50.preprocess_input(images)
    features = resnet50_base.predict(preprocessed_images)
    all_features.append(features)
    all_labels.append(labels)
  return np.concatenate(all_features), np.concatenate(all_labels)

In [None]:
train_features, train_labels =  get_features_and_labels(train_dataset)
val_features, val_labels =  get_features_and_labels(validation_dataset)
test_features, test_labels =  get_features_and_labels(test_dataset)

In [None]:
input = keras.Input(shape=(7, 7, 2048))

h = keras.layers.Flatten()(input)

h = keras.layers.Dense(256, activation="relu")(h)

h = keras.layers.Dropout(0.5)(h) # first time we are using this!

output = keras.layers.Dense(1, activation="sigmoid")(h)

model = keras.Model(input, output)

In [None]:
keras.utils.plot_model(model, show_shapes=True)

In [None]:
model.summary()

In [None]:
model.compile(loss='binary_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

history = model.fit(train_features,
                    train_labels,
                    epochs=10,
                    validation_data=(val_features, val_labels))

In [None]:
plot_acc_curves(history)

In [None]:
model.evaluate(test_features, test_labels)

##Test using Camera

In [None]:
# Camera Capture code snippet courtesy Google Colab


from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode

def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      // Wait for Capture to be clicked.
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)
  data = eval_js('takePhoto({})'.format(quality))
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return filename


In [None]:
def predict_image(im):
  img = keras.preprocessing.image.load_img(im, target_size=(224,224))
  arr = keras.preprocessing.image.img_to_array(img)
  arr = keras.applications.resnet50.preprocess_input(arr)
  arr = np.expand_dims(arr, axis=0)
  arr = resnet50_base(arr)
  pred = model.predict(arr)
  pred = "NOT PEDESTRIAN" if pred > 0.5 else "PEDESTRIAN"
  print("************************************\n\n")
  print(f"...........it is a {pred}!")
  print("\n\n************************************\n\n")

In [None]:
# Camera Capture code snippet courtesy Google Colab

from IPython.display import Image
try:
  filename = take_photo()
  print('Saved to {}'.format(filename))

  # Show the image which was just taken.
  display(Image(filename))
  predict_image(filename)
except Exception as err:
  # Errors will be thrown if the user does not have a webcam or if they do not
  # grant the page permission to access it.
  print(str(err))