# Semantic segmentation of the trip video

In [1]:
# Import libraries

# Generic library
import io
import os

import cv2
import numpy as np
import pandas as pd
from PIL import Image
from natsort import natsorted

# Semantic segmentation model
from torchvision.models.segmentation import fcn_resnet50, FCN_ResNet50_Weights
from torchvision.transforms.functional import to_pil_image, resize, pil_to_tensor
from torchvision.utils import draw_segmentation_masks

# Creation of the video
from tqdm import tqdm

from pytrack.analytics import plugins
from pytrack.analytics import video
from pytrack.graph import distance

# Creation of matched path
from pytrack.graph import graph
from pytrack.graph import utils
from pytrack.matching import candidate, mpmatching_utils, mpmatching

In [2]:
def image_to_byte_array(image, format="PNG"):
  # BytesIO is a fake file stored in memory
  imgByteArr = io.BytesIO()
  # image.save expects a file as an argument, passing a bytes io ins
  image.save(imgByteArr, format=format)
  # Turn the BytesIO object back into a bytes object
  imgByteArr = imgByteArr.getvalue()
  return imgByteArr

class Segmenter(plugins.Segmenter):
    """
    For more information see: https://pytorch.org/vision/stable/models.html
    """
    def __init__(self, model, weights):
         # Initialize model with the weights
        self.weights = weights.DEFAULT
        self.model = model(weights=self.weights)
        self.model.eval()

    def processing(self, img):
        # Initialize the inference transforms
        preprocess = self.weights.transforms()
        # Apply inference preprocessing transforms
        img = preprocess(img)
        return img

    def run(self, img):
        # Use the model and visualize the prediction
        img = pil_to_tensor(Image.open(io.BytesIO(img)))
        batch = self.processing(img).unsqueeze(0)

        prediction = self.model(batch)["out"]
        normalized_masks = prediction.softmax(dim=1)
        class_to_idx = {cls: idx for (idx, cls) in enumerate(self.weights.meta["categories"])}

        pred = draw_segmentation_masks(resize(img, 520), masks=normalized_masks.argmax(1) == class_to_idx['car'], alpha=0.6, colors="green")

        imgByteArr = image_to_byte_array(to_pil_image(pred))
        return imgByteArr

In [3]:
# Initialize segmentation model
model = fcn_resnet50
weights = FCN_ResNet50_Weights
segmenter = Segmenter(model, weights)

In [4]:
df = pd.read_excel("dataset.xlsx")

latitude = df["latitude"].to_list()
longitude = df["longitude"].to_list()

points = [(lat, lon) for lat, lon in zip(latitude[:30], longitude[:30])]

# Create BBOX
north, east = np.max(np.array([*points]), 0)
south, west = np.min(np.array([*points]), 0)

# Extract road graph
G = graph.graph_from_bbox(*distance.enlarge_bbox(north, south, west, east, 500), simplify=True, network_type='drive')

nodes, edges = utils.graph_to_gdfs(G)  # Add to G a geometry attribute describing the geometry of both nodes and edges. TODO: create an autonomous method.

# Extract candidates
G_interp, candidates = candidate.get_candidates(G, points, interp_dist=5, closest=True, radius=30)

# Extract trellis DAG graph
trellis = mpmatching_utils.create_trellis(candidates)

# Perform the map-matching process
path_prob, predecessor = mpmatching.viterbi_search(G_interp, trellis, "start", "target")

_, path = mpmatching_utils.create_matched_path(G_interp, trellis, predecessor)  # Path expressed through a list of nodes (lat, lng)

Downloaded 448.33kB


In [5]:
root_dir = "SV_panoramas"  # Directory where save Google Street View panoramas
api_key = 'Insert your private API key for Google services'

if not os.path.exists(root_dir):
    os.makedirs(root_dir)

for i in tqdm(range(len(path))):
    if not os.path.isdir(os.path.join(root_dir, str(i))):
        if i != 0:
            point = path[i]
            prec_point = path[i - 1]
            head = distance.get_bearing(prec_point[0], prec_point[1], point[0], point[1])
        else:
            point = path[i]
            succ_point = path[i + 1]
            head = distance.get_bearing(point[0], point[1], succ_point[0], succ_point[1])

        pic, meta = video.extract_streetview_pic(point, api_key, size="520x520", heading=head, pitch=-10)

        if pic is not None:
            video.save_streetview(pic, meta, os.path.join(root_dir, str(i)), model=segmenter)

100%|██████████| 347/347 [03:19<00:00,  1.74it/s]


In [6]:
# Create video of the path
root_dir = "SV_panoramas"

images = list()
for root, dirs, files in os.walk(root_dir):
    for file in files:
        if file.endswith("pic_seg.png"):
            images += [os.path.join(root, file)]

images = natsorted(images)

fourcc = cv2.VideoWriter_fourcc(*"avc1")
video_path = os.path.join("video_seg.mp4")

video.make_video(images, video_path, fourcc, fps=16, size=(520, 520), is_color=True)

In [7]:
from IPython.display import Video

Video("video_seg.mp4", embed=True, width=520, height=520)