In [None]:
import pandas as pd
import geopandas as gpd
import numpy as np
from shapely.geometry import Point, LineString, shape
from PIL import Image, ImageDraw, ImageFont
from PIL.ExifTags import TAGS
import os
from shapely.ops import nearest_points
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import os
#import tensorflow as tf
import tensorflow.compat.v1 as tf
from glob import glob
import cv2

In [None]:
pd.options.display.max_rows = None

# Define functions

In [None]:
def get_dist_to_next_signal(df):
    df["signal_idx"] = df.signal_idx.fillna(method="bfill")
    df["increment"] = df.to_crs("EPSG:21781").distance(df.to_crs("EPSG:21781").shift(-1))
    return df.groupby("signal_idx")["increment"].apply(lambda x: x.iloc[::-1].cumsum().iloc[::-1])
    
def get_lat_lon(base_path):
    def f_inner(image_name):
        image = Image.open(base_path + image_name)
        exifdata = image.getexif()
        lat, lon = [exifdata.get(34853)[i] for i in [2,4]]
        return pd.Series({"fn": image_name, "lat": lat, "lon": lon})
    return f_inner

In [None]:
# What model to download.
MODEL_NAME = 'ssd_mobilenet_v1_coco_11_06_2017'
MODEL_FILE = MODEL_NAME + '.tar.gz'
DOWNLOAD_BASE = 'http://download.tensorflow.org/models/object_detection/'

# Path to frozen detection graph. This is the actual model that is used for the object detection.
model_path = "./"
PATH_TO_CKPT = model_path + MODEL_NAME + '/frozen_inference_graph.pb'


def download_model():
    import six.moves.urllib as urllib
    import tarfile

    opener = urllib.request.URLopener()
    opener.retrieve(DOWNLOAD_BASE + MODEL_FILE, MODEL_FILE)
    tar_file = tarfile.open(MODEL_FILE)
    for file in tar_file.getmembers():
        file_name = os.path.basename(file.name)
        if 'frozen_inference_graph.pb' in file_name:
            tar_file.extract(file, os.getcwd())

In [None]:
def load_graph():
    if not os.path.exists(PATH_TO_CKPT):
        download_model()

    detection_graph = tf.Graph()
    with detection_graph.as_default():
        od_graph_def = tf.GraphDef()
        with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
            serialized_graph = fid.read()
            od_graph_def.ParseFromString(serialized_graph)
            tf.import_graph_def(od_graph_def, name='')

    return detection_graph

def select_boxes(boxes, classes, scores, score_threshold=0, target_class=10):
    """

    :param boxes:
    :param classes:
    :param scores:
    :param target_class: default traffic light id in COCO dataset is 10
    :return:
    """

    sq_scores = np.squeeze(scores)
    sq_classes = np.squeeze(classes)
    sq_boxes = np.squeeze(boxes)

    sel_id = np.logical_and(sq_classes == target_class, sq_scores > score_threshold)

    return sq_boxes[sel_id]

class TLClassifier(object):
    def __init__(self):

        self.detection_graph = load_graph()
        self.extract_graph_components()
        self.sess = tf.Session(graph=self.detection_graph)

        # run the first session to "warm up"
        dummy_image = np.zeros((100, 100, 3))
        self.detect_multi_object(dummy_image,0.1)
        self.traffic_light_box = None
        self.classified_index = 0

    def extract_graph_components(self):
        # Definite input and output Tensors for detection_graph
        self.image_tensor = self.detection_graph.get_tensor_by_name('image_tensor:0')
        # Each box represents a part of the image where a particular object was detected.
        self.detection_boxes = self.detection_graph.get_tensor_by_name('detection_boxes:0')
        # Each score represent how level of confidence for each of the objects.
        # Score is shown on the result image, together with the class label.
        self.detection_scores = self.detection_graph.get_tensor_by_name('detection_scores:0')
        self.detection_classes = self.detection_graph.get_tensor_by_name('detection_classes:0')
        self.num_detections = self.detection_graph.get_tensor_by_name('num_detections:0')
    
    def detect_multi_object(self, image_np, score_threshold):
        """
        Return detection boxes in a image

        :param image_np:
        :param score_threshold:
        :return:
        """

        # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
        image_np_expanded = np.expand_dims(image_np, axis=0)
        # Actual detection.

        (boxes, scores, classes, num) = self.sess.run(
            [self.detection_boxes, self.detection_scores, self.detection_classes, self.num_detections],
            feed_dict={self.image_tensor: image_np_expanded})

        sel_boxes = select_boxes(boxes=boxes, classes=classes, scores=scores,
                                 score_threshold=score_threshold, target_class=10)

        return sel_boxes

In [None]:
def load_image(directory_name, fn):
    im = Image.open(directory_name + "/" + fn)
    image_np = np.asarray(im)
    image_np = image_np[:,:,:3]
    return image_np

def save_detected_image(im, image_np, boxes, dir_name, fn, distance):
    draw = ImageDraw.Draw(im, 'RGBA')   
    if ((len(boxes) > 0) & (distance < 40)):
        for box in boxes:   
            rect = get_rectangle(image_np, box)
            draw.rectangle(rect, outline ="red")
    font = ImageFont.truetype("Helvetica.ttc",40)
    
    left, top = [40, 640]
    width, height = [310, 90]
    right, bottom = [left + width, top + height]
    font = ImageFont.truetype("Helvetica.ttc",40)
    draw.rectangle(((left, top), (right, bottom)), width = 20, fill=(255,255,255,160))
    draw.text((left + 10, top + 10), f"Distance to next\nsignal: {round(distance)} m", fill =(0,0,0), font = font)
    im.save(dir_name + "/" + fn, "JPEG")
    im.close()
    
def get_rectangle(image_np, sel_box):
    im_height, im_width, _ = image_np.shape
    (left, right, top, bottom) = (sel_box[1] * im_width, sel_box[3] * im_width,
                                  sel_box[0] * im_height, sel_box[2] * im_height)
    box_width = right - left
    box_height = bottom - top
    return patches.Rectangle((left, top), box_width, box_height,linewidth=1,edgecolor='r',facecolor='none')

# Define pathnames

In [None]:
project_dir = "/Users/Georg/Dropbox/Work/projects/2020_09_05_hackathon_zurich"
data_dir = f"{project_dir}/data/Dataset_complete"
image_dir = f"{data_dir}/Trackpictures/nice_weather/nice_weather_filisur_thusis_20200824_pixelated/"
results_dir = f"{project_dir}/data/results/"
images_results_dir = f"{results_dir}/nice_weather_images_detected"

# Get signal data

In [None]:
trackdata_raw = pd.read_csv(f"{project_dir}/data/Dataset_complete/Trackdata/TrackSiteData_2020_clean.csv")
trackdata = trackdata_raw[[col for col in trackdata_raw.columns if not "Unnamed" in col]]
signals = (trackdata[trackdata["Element Type"].isin(['Distant signal', 'Main & distant signal', 'Main signal'])]
          .dropna(subset=["Latitude", "Longitude"]))
geometry = [Point(xy) for xy in zip(signals.Longitude, signals.Latitude)]
signals_gdf = gpd.GeoDataFrame(signals, geometry=geometry)
signals_gdf.crs = "EPSG:4326"
signals_gdf = signals_gdf.to_crs("EPSG:4326")
(signals_gdf
 .to_csv(f"{project_dir}/data/Dataset_complete/Trackdata/TrackSiteData_2020_clean_geo.csv", index = False))

# Get geotags from images

In [None]:
image_names = pd.Series([fn for fn in os.listdir(image_dir) if "image" in fn])
image_geotags = image_names.apply(get_lat_lon(image_dir))

In [None]:
image_geotags_gdf = gpd.GeoDataFrame(
    image_geotags, geometry=gpd.points_from_xy(image_geotags.lon,image_geotags.lat))
image_geotags_gdf.crs = "EPSG:4326"
image_geotags_gdf = image_geotags_gdf.to_crs("EPSG:4326")
image_geotags_gdf = image_geotags_gdf.sort_values("fn").reset_index(drop=True)
image_geotags_gdf.to_csv(f"{project_dir}/data/geotags/nice_weather_filisur_thusis_20200824_geotags.csv")

# Compute upcoming signal

In [None]:
image_geotags_gdf["signal_idx"] = None
for i in signals_gdf.index:
    nearest_geom = nearest_points(signals_gdf.loc[i]["geometry"], image_geotags_gdf.geometry.unary_union)[1]
    nearest = image_geotags_gdf["geometry"] == nearest_geom
    signal_index = image_geotags_gdf[nearest].index[0]
    image_geotags_gdf.loc[signal_index, "signal_idx"] = i
    
image_geotags_gdf["signal_idx"] = image_geotags_gdf.signal_idx.fillna(method="bfill")
image_geotags_gdf["distance_to_next_signal"] = get_dist_to_next_signal(image_geotags_gdf)

In [None]:
image_geotags_gdf.to_csv(results_dir + "distance_to_next_signal.csv", index = False)

# Detect railway signals on images and assign distances

In [None]:
image_names = pd.Series([fn for fn in os.listdir(image_dir) if "image" in fn])

In [None]:
tlc=TLClassifier()

In [None]:
for fn in image_names.sort_values()[:100]:
    im = Image.open(image_dir + "/" + fn)
    image_np = np.asarray(im)[:,:,:3]
    dist = image_geotags_gdf.query(f"fn=='{fn}'").distance_to_next_signal.iloc[0]
    boxes=tlc.detect_multi_object(image_np, score_threshold=0.1)
    save_detected_image(im, image_np, boxes, images_results_dir, fn, dist)

In [None]:
# upload images to s3 bucket with
# aws s3 cp nice_weather_images_detected s3://liftthevail/ModifiedPictures --recursive