![](../images/dtlogo.png)

The hope in this activity is to do initialization, predict update of the histogram filter but should be a template we can follow for the others. 

In [None]:
from complete_image_pipeline.pipeline import run_pipeline
from typing import Dict, Tuple


import duckietown_code_utils as dtu
from anti_instagram import AntiInstagramInterface
from duckietown_msgs.msg import Segment, SegmentList
from duckietown_segmaps import FRAME_AXLE, FRAME_GLOBAL
from duckietown_segmaps.draw_map_on_images import plot_map, predict_segments
from duckietown_segmaps.maps import plot_map_and_segments
from duckietown_segmaps.transformations import TransformationsInfo
from easy_algo import get_easy_algo_db
from easy_node.utils.timing import FakeContext, ProcessingTimingStats
from ground_projection.ground_projection_interface import find_ground_coordinates
from ground_projection.segment import rectify_segments
from image_processing.ground_projection_geometry import GroundProjectionGeometry
from image_processing.rectification import Rectify
from line_detector2.image_prep import ImagePrep
from line_detector_interface import FAMILY_LINE_DETECTOR
from line_detector_interface.visual_state_fancy_display import normalized_to_image, vs_fancy_display
from localization_templates import FAMILY_LOC_TEMPLATES
import duckietown_code_utils as dtu
from image_processing.more_utils import get_robot_camera_geometry
import matplotlib.pyplot as plt

In [None]:
import cv2
import matplotlib
import numpy as np
from scipy.ndimage.filters import gaussian_filter
from scipy.stats import entropy, multivariate_normal
from math import floor, sqrt

In [None]:
from matplotlib.pyplot import imshow
%matplotlib inline
img = cv2.imread("images/pic1.png")
imshow(img)

In [None]:
line_detector_name="baseline"
image_prep_name="baseline"
anti_instagram_name="baseline"
robot_name = "lab"
rcg = get_robot_camera_geometry(robot_name)

dtu.check_isinstance(img, np.ndarray)

algo_db = get_easy_algo_db()
line_detector = algo_db.create_instance(FAMILY_LINE_DETECTOR, line_detector_name)
image_prep = algo_db.create_instance(ImagePrep.FAMILY, image_prep_name)
ai = algo_db.create_instance(AntiInstagramInterface.FAMILY, anti_instagram_name)
segment_list = image_prep.process(FakeContext(), img, line_detector, transform=None)
jpg1 = vs_fancy_display(image_prep.image_cv, segment_list)

imshow(jpg1)

segment_list_rect = rectify_segments(rcg.rectifier, rcg.gpg , segment_list)

sg = find_ground_coordinates(rcg.gpg, segment_list_rect)

#res, _stats = run_pipeline(
#        img,
#        gpg=rcg.gpg,
#        rectifier=rcg.rectifier,
#        line_detector_name=line_detector_name,
#        image_prep_name=image_prep_name,
#        lane_filter_name=lane_filter_name,
#        anti_instagram_name=anti_instagram_name,
#        all_details=True,
#        ground_truth=None,
#)

outd = "output"
#dtu.write_jpgs_to_dir(res, outd)

In [None]:
# load parameters from the configuration file
import yaml
with open("src/lane_filter/config/lane_filter_node/default.yaml", "r") as stream:
    try:
        params = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)
hp = params["lane_filter_histogram_configuration"]
print(hp)

d, phi = np.mgrid[hp['d_min'] : hp['d_max'] : hp['delta_d'], hp['phi_min'] : hp['phi_max'] : hp['delta_phi']]
grid_spec = {
    "d": d,
    "phi": phi,
    "delta_d": hp['delta_d'],
    "delta_phi": hp['delta_phi'],
    "d_min": hp['d_min'],
    "d_max": hp['d_max'],
    "phi_min": hp['phi_min'],
    "phi_max": hp['phi_max'],
}
road_spec = {
    "linewidth_white": hp['linewidth_white'],
    "linewidth_yellow": hp['linewidth_yellow'],
    "lanewidth": hp['lanewidth'],
}
robot_spec = {
    "wheel_radius": hp['wheel_radius'],
    "wheel_baseline": hp['wheel_baseline'],
    "encoder_resolution": hp['encoder_resolution'],
}
# The "cov_mask" is effectively the process model covariance
cov_mask = [hp['sigma_d_mask'], hp['sigma_phi_mask']]
belief = np.empty(d.shape)
mean_0 = [hp['mean_d_0'], hp['mean_phi_0']]
cov_0 = [[hp['sigma_d_0'], 0], [0, hp['sigma_phi_0']]]


In [None]:
def histogram_prior(belief, grid_spec, mean_0, cov_0):
    pos = np.empty(belief.shape + (2,))
    pos[:, :, 0] = grid_spec["d"]
    pos[:, :, 1] = grid_spec["phi"]
    RV = multivariate_normal(mean_0, cov_0)
    belief = RV.pdf(pos)
    return belief

In [None]:
def histogram_predict(belief, dt, left_encoder_ticks, right_encoder_ticks, grid_spec, robot_spec, cov_mask):
        belief_in = belief
        delta_t = dt
        # TODO calculate v and w from ticks using kinematics. You will need `robot_spec`
        v = 0.0
        w = 0.0
        d_t = grid_spec['d'] + v * delta_t * np.sin(grid_spec['phi'])
        phi_t = grid_spec['phi'] + w * delta_t

        p_belief = np.zeros(belief.shape)

        # histogram propagation step
        for i in range(belief.shape[0]):
            for j in range(belief.shape[1]):
                if belief[i, j] > 0:
                    if (
                        d_t[i, j] > grid_spec['d_max']
                        or d_t[i, j] < grid_spec['d_min']
                        or phi_t[i, j] < grid_spec['phi_min']
                        or phi_t[i, j] > grid_spec['phi_max']
                    ):
                        continue

                    i_new = int(floor((d_t[i, j] - grid_spec['d_min']) / grid_spec['delta_d']))
                    j_new = int(floor((phi_t[i, j] - grid_spec['phi_min']) / grid_spec['delta_phi']))

                    p_belief[i_new, j_new] += belief[i, j]

        s_belief = np.zeros(belief.shape)
        gaussian_filter(p_belief, cov_mask, output=s_belief, mode="constant")

        if np.sum(s_belief) == 0:
            return belief_in
        belief = s_belief / np.sum(s_belief)
        return belief


In [None]:
def prepare_segments(segments):
    filtered_segments = []
    for segment in segments:

        # we don't care about RED ones for now
        if segment.color != segment.WHITE and segment.color != segment.YELLOW:
            continue
        # filter out any segments that are behind us
        if segment.points[0].x < 0 or segment.points[1].x < 0:
            continue

        filtered_segments.append(segment)
    return filtered_segments

In [None]:
def generate_vote(segment, road_spec):
    p1 = np.array([segment.points[0].x, segment.points[0].y])
    p2 = np.array([segment.points[1].x, segment.points[1].y])
    t_hat = (p2 - p1) / np.linalg.norm(p2 - p1)

    n_hat = np.array([-t_hat[1], t_hat[0]])
    d1 = np.inner(n_hat, p1)
    d2 = np.inner(n_hat, p2)
    l1 = np.inner(t_hat, p1)
    l2 = np.inner(t_hat, p2)
    if l1 < 0:
        l1 = -l1
    if l2 < 0:
        l2 = -l2

    l_i = (l1 + l2) / 2
    d_i = (d1 + d2) / 2
    phi_i = np.arcsin(t_hat[1])
    if segment.color == segment.WHITE:  # right lane is white
        if p1[0] > p2[0]:  # right edge of white lane
            d_i -= road_spec['linewidth_white']
        else:  # left edge of white lane
            d_i = -d_i
            phi_i = -phi_i
        d_i -= road_spec['lanewidth'] / 2

    elif segment.color == segment.YELLOW:  # left lane is yellow
        if p2[0] > p1[0]:  # left edge of yellow lane
            d_i -= road_spec['linewidth_yellow']
            phi_i = -phi_i
        else:  # right edge of white lane
            d_i = -d_i
        d_i = road_spec['lanewidth'] / 2 - d_i

    return d_i, phi_i

In [None]:
def generate_measurement_likelihood(segments, road_spec, grid_spec):

    # initialize measurement likelihood to all zeros
    measurement_likelihood = np.zeros(grid_spec['d'].shape)

    for segment in segments:
        d_i, phi_i = generate_vote(segment, road_spec)

        # if the vote lands outside of the histogram discard it
        if d_i > grid_spec['d_max'] or d_i < grid_spec['d_min'] or phi_i < grid_spec['phi_min'] or phi_i > grid_spec['phi_max']:
            continue

        i = int(floor((d_i - grid_spec['d_min']) / grid_spec['delta_d']))
        j = int(floor((phi_i - grid_spec['phi_min']) / grid_spec['delta_phi']))
        measurement_likelihood[i, j] += 1

    if np.linalg.norm(measurement_likelihood) == 0:
        return None
    measurement_likelihood /= np.sum(measurement_likelihood)
    return measurement_likelihood


In [None]:
def histogram_update(belief, segments, road_spec, grid_spec):
    # prepare the segments for each belief array
    segmentsArray = prepare_segments(segments)
    # generate all belief arrays

    measurement_likelihood = generate_measurement_likelihood(segmentsArray, road_spec, grid_spec)

    if measurement_likelihood is not None:
        belief = np.multiply(belief, measurement_likelihood)
        if np.sum(belief) == 0:
            belief = measurement_likelihood
        else:
            belief /= np.sum(belief)
    return (measurement_likelihood, belief)


In [None]:
dt = 0.1 #s
left = 10 # left ticks
right = 20 # right ticks

belief = histogram_prior(belief, grid_spec, mean_0, cov_0)
imshow(belief)

In [None]:
belief = histogram_predict(belief, dt, left, right, grid_spec, robot_spec, cov_mask)
imshow(belief)

In [None]:
(measurement_likelihood, belief) = histogram_update(belief,sg.segments, road_spec, grid_spec)
imshow(belief)