In [None]:
#Importing required libraries
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.animation import FuncAnimation
import numpy as np
from celluloid import Camera
from scipy.interpolate import make_interp_spline
import ffmpeg

In [None]:
# Max time in seconds
MAX_TIME = 3
# Move count (frame count)
MOVE_COUNT = 30

In [None]:
def read_input_ouput_times_from_file(file_name):
    times_file = open(file_name, "r")
    list_of_customers_times = times_file.read().split("\n")
    list_of_customers_times.pop()
    customer_count = len(list_of_customers_times)
    customers_times = np.zeros((customer_count, 2), dtype = np.float64)

    cur_customer = 0
    for cur_times in list_of_customers_times:
        input_time, output_time = cur_times.split(" ")
        customers_times[cur_customer][0] = float(input_time)
        customers_times[cur_customer][1] = float(output_time)
        cur_customer += 1
    times_file.close()
    
    return customers_times

In [None]:
# Calculate PI 1 customer's moving in time
def calculate_moving_for_pi_1(time_in, time_out):
    start_pos = 0
    end_pos = 10
    time_for_move = 1
    time_for_move_before_intersection = 1
    time_for_move_after_intersection = 1

    moving = np.full(MOVE_COUNT, -10, dtype = np.float64)
    one_move_size = MAX_TIME / MOVE_COUNT
    for cur_move in range(MOVE_COUNT):
        cur_time = one_move_size * cur_move
        if cur_time < time_in - time_for_move_before_intersection:
            continue
        elif cur_time < time_in:
            moving[cur_move] = -10 + (time_for_move_before_intersection - time_in + cur_time) * (start_pos - (-10)) / time_for_move_before_intersection
        elif cur_time < time_out - time_for_move:
            moving[cur_move] = start_pos
        elif cur_time < time_out:
            moving[cur_move] = start_pos + (time_for_move - time_out + cur_time) * (end_pos - start_pos) / time_for_move
        elif cur_time < time_out + time_for_move_after_intersection:
            moving[cur_move] = end_pos + (time_for_move_after_intersection - (time_out + time_for_move_after_intersection) + cur_time) * (20 - end_pos) / time_for_move_after_intersection
        else:
            break
    return moving

In [None]:
# Calculate PI 2 trajectory
def pi_2_trajectory():
    all_count = MOVE_COUNT / MAX_TIME
    vert_point_count = int(all_count // 3)
    circle_point_count = int(all_count // 3 + all_count % 3)
    hor_point_count = int(all_count // 3)
    center_x = 6
    center_y = 5
    radius = 1

    x_vert = np.full(vert_point_count, 5, dtype = np.float64)
    y_vert = np.linspace(10, 5, vert_point_count, dtype = np.float64, endpoint = False)

    x_circle = np.linspace(5, 6, circle_point_count, dtype = np.float64, endpoint = False)
    y_circle = np.zeros(circle_point_count, dtype = np.float64)
    y_circle = np.multiply((x_circle - center_x), (x_circle - center_x))
    y_circle = radius - y_circle
    y_circle = np.sqrt(y_circle)
    y_circle = center_y - y_circle

    x_hor = np.linspace(6, 10, hor_point_count, dtype = np.float64)
    y_hor = np.full(hor_point_count, 4, dtype = np.float64)
    x = np.concatenate((x_vert, x_circle, x_hor), axis = 0)
    y = np.concatenate((y_vert, y_circle, y_hor), axis = 0)
    
    return x, y

In [None]:
# Calculate PI 2 customer's moving in time
def calculate_x_moving_for_pi_2(time_in, time_out):
    start_pos = 5
    end_pos = 10
    end_pos_path_2 = 6
    time_for_move = 1
    time_for_move_before_intersection = 1
    time_for_move_after_intersection = 1
    path_1 = 5
    path_2 = np.pi / 2
    path_3 = 4
    all_path = path_1 + path_2 + path_3
    speed = all_path / time_for_move
    path_1_time = path_1 / speed
    path_2_time = path_2 / speed
    path_3_time = path_3 / speed
    x, y = pi_2_trajectory()

    x_moving = np.full(MOVE_COUNT, -10, dtype = np.float64)
    one_move_size = MAX_TIME / MOVE_COUNT
    i = 0
    for cur_move in range(MOVE_COUNT):
        cur_time = one_move_size * cur_move
        if cur_time < time_in - time_for_move_before_intersection:
            continue
        elif cur_time < time_out - time_for_move:
            x_moving[cur_move] = start_pos
        elif cur_time < time_out:
            x_moving[cur_move] = x[i]
            i += 1
        elif cur_time < time_out + time_for_move_after_intersection:
            x_moving[cur_move] = end_pos + (time_for_move_after_intersection - (time_out + time_for_move_after_intersection) + cur_time) * (20 - end_pos) / time_for_move_after_intersection
        else:
            break
    return x_moving

In [None]:
# # Calculate PI 2 customer's moving in time
# def calculate_x_moving_for_pi_2(time_in, time_out):
#     start_pos = 5
#     end_pos = 10
#     end_pos_path_2 = 6
#     time_for_move = 1
#     time_for_move_before_intersection = 1
#     time_for_move_after_intersection = 1
#     path_1 = 5
#     path_2 = np.pi / 2
#     path_3 = 4
#     all_path = path_1 + path_2 + path_3
#     speed = all_path / time_for_move
#     path_1_time = path_1 / speed
#     path_2_time = path_2 / speed
#     path_3_time = path_3 / speed

#     x_moving = np.full(MOVE_COUNT, -10, dtype = np.float64)
#     one_move_size = MAX_TIME / MOVE_COUNT
#     for cur_move in range(MOVE_COUNT):
#         cur_time = one_move_size * cur_move
#         if cur_time < time_in - time_for_move_before_intersection:
#             continue
#         elif cur_time < time_out - time_for_move:
#             x_moving[cur_move] = start_pos
#         elif cur_time < time_out - path_2_time - path_3_time:
#             x_moving[cur_move] = start_pos
#         elif cur_time < time_out - path_3_time:
#             x_moving[cur_move] = start_pos + (path_2_time - (time_out - path_3_time) + cur_time) * path_2 / path_2_time
#         elif cur_time < time_out:
#             x_moving[cur_move] = end_pos_path_2 + (path_3_time - time_out + cur_time) * path_3 / path_3_time
#         elif cur_time < time_out + time_for_move_after_intersection:
#             x_moving[cur_move] = end_pos + (time_for_move_after_intersection - (time_out + time_for_move_after_intersection) + cur_time) * (20 - end_pos) / time_for_move_after_intersection
#         else:
#             break
#     return x_moving

In [None]:
# Calculate PI 2 customer's moving in time
def calculate_y_moving_for_pi_2(time_in, time_out):
    start_pos = 10
    end_pos = 4
    end_pos_path_1 = 5
    time_for_move = 1
    time_for_move_before_intersection = 1
    time_for_move_after_intersection = 1
    path_1 = 5
    path_2 = np.pi / 2
    path_3 = 4
    all_path = path_1 + path_2 + path_3
    speed = all_path / time_for_move
    path_1_time = path_1 / speed
    path_2_time = path_2 / speed
    path_3_time = path_3 / speed
    x, y = pi_2_trajectory()

    y_moving = np.full(MOVE_COUNT, -10, dtype = np.float64)
    one_move_size = MAX_TIME / MOVE_COUNT
    i = 0
    for cur_move in range(MOVE_COUNT):
        cur_time = one_move_size * cur_move
        if cur_time < time_in - time_for_move_before_intersection:
            continue
        elif cur_time < time_in:
            y_moving[cur_move] = 20 + (time_for_move_before_intersection - time_in + cur_time) * (start_pos - 20) / time_for_move_before_intersection
        elif cur_time < time_out - time_for_move:
            y_moving[cur_move] = start_pos
        elif cur_time < time_out:
            y_moving[cur_move] = y[i]
            i += 1
        elif cur_time < time_out + time_for_move_after_intersection:
            y_moving[cur_move] = end_pos
        else:
            break
    return y_moving

In [None]:
# # Calculate PI 2 customer's moving in time
# def calculate_y_moving_for_pi_2(time_in, time_out):
#     start_pos = 10
#     end_pos = 4
#     end_pos_path_1 = 5
#     time_for_move = 1
#     time_for_move_before_intersection = 1
#     time_for_move_after_intersection = 1
#     path_1 = 5
#     path_2 = np.pi / 2
#     path_3 = 4
#     all_path = path_1 + path_2 + path_3
#     speed = all_path / time_for_move
#     path_1_time = path_1 / speed
#     path_2_time = path_2 / speed
#     path_3_time = path_3 / speed

#     y_moving = np.full(MOVE_COUNT, -10, dtype = np.float64)
#     one_move_size = MAX_TIME / MOVE_COUNT
#     for cur_move in range(MOVE_COUNT):
#         cur_time = one_move_size * cur_move
#         if cur_time < time_in - time_for_move_before_intersection:
#             continue
#         elif cur_time < time_in:
#             y_moving[cur_move] = 20 + (time_for_move_before_intersection - time_in + cur_time) * (start_pos - 20) / time_for_move_before_intersection
#         elif cur_time < time_out - time_for_move:
#             y_moving[cur_move] = start_pos
#         elif cur_time < time_out - path_2_time - path_3_time:
#             y_moving[cur_move] = start_pos - (path_1_time - (time_out - path_2_time - path_3_time) + cur_time) * path_1 / path_1_time
#         elif cur_time < time_out - path_3_time:
#             y_moving[cur_move] = end_pos_path_1 - (path_2_time - (time_out - path_3_time) + cur_time) * path_2 / path_2_time
#         elif cur_time < time_out + time_for_move_after_intersection:
#             y_moving[cur_move] = end_pos
#         else:
#             break
#     return y_moving

In [None]:
# Calculate PI 3 customer's moving in time
def calculate_moving_for_pi_3(time_in, time_out):
    start_pos = 2
    end_pos = 6
    time_for_move = 10

    moving = np.full(MOVE_COUNT, -10, dtype = np.float64)
    one_move_size = MAX_TIME / MOVE_COUNT
    for cur_move in range(MOVE_COUNT):
        cur_time = one_move_size * cur_move
        if cur_time < time_in:
            continue
        elif cur_time < time_out - time_for_move:
            moving[cur_move] = start_pos
        elif cur_time < time_out:
            moving[cur_move] = start_pos + (time_for_move - time_out + cur_time) * (end_pos - start_pos) / time_for_move
        else:
            continue
    return moving

In [None]:
# Calculate customer's moving in time
def calculate_queue_in_time(customers_times):
    queue_length_in_time = np.zeros(MOVE_COUNT, dtype = np.int64)
    scale = MOVE_COUNT / MAX_TIME
    for input_time, output_time in customers_times:
        input_move = int(input_time * scale)
        output_move = int(output_time * scale)
        queue_length_in_time[input_move:output_move] = np.add(queue_length_in_time[input_move:output_move], 1)

    return queue_length_in_time

In [None]:
# Read data
pi_1_customers_times = read_input_ouput_times_from_file(file_name = "pi_1_times_io.txt")
pi_2_customers_times = read_input_ouput_times_from_file(file_name = "pi_2_times_io.txt")
pi_3_customers_times = read_input_ouput_times_from_file(file_name = "pi_3_times_io.txt")

# Calculate moving for all customers
all_pi_1_customers_moving = []
for input_time, output_time in pi_1_customers_times:
    all_pi_1_customers_moving.append(calculate_moving_for_pi_1(input_time, output_time))

all_pi_2_customers_x_moving = []
for input_time, output_time in pi_2_customers_times:
    all_pi_2_customers_x_moving.append(calculate_x_moving_for_pi_2(input_time, output_time))
all_pi_2_customers_y_moving = []
for input_time, output_time in pi_2_customers_times:
    all_pi_2_customers_y_moving.append(calculate_y_moving_for_pi_2(input_time, output_time))

all_pi_3_customers_moving = []
for input_time, output_time in pi_3_customers_times:
    all_pi_3_customers_moving.append(calculate_moving_for_pi_3(input_time, output_time))

# Calculate queue length
pi_1_queue_length_in_time = calculate_queue_in_time(pi_1_customers_times)
pi_2_queue_length_in_time = calculate_queue_in_time(pi_2_customers_times)
pi_3_queue_length_in_time = calculate_queue_in_time(pi_3_customers_times)

In [None]:
def read_greem_light_time_from_file(file_name):
    times_file = open(file_name, "r")
    list_of_greenlight_times = times_file.read().split("\n")
    list_of_greenlight_times.pop()
    str_count = len(list_of_greenlight_times)
    greenlights_times = np.zeros((str_count, 2), dtype = np.float64)

    cur_str = 0
    for cur_times in list_of_greenlight_times:
        start_time, end_time = cur_times.split(" ")
        greenlights_times[cur_str][0] = float(start_time)
        greenlights_times[cur_str][1] = float(end_time)
        cur_str += 1
    times_file.close()
    
    return greenlights_times

In [None]:
# Read green light time data
pi_1_greem_light_time = read_greem_light_time_from_file(file_name = "pi_1_times_greenlight.txt")
pi_2_greem_light_time = read_greem_light_time_from_file(file_name = "pi_2_times_greenlight.txt")
pi_3_greem_light_time = read_greem_light_time_from_file(file_name = "pi_3_times_greenlight.txt")

In [None]:
#%%timeit
fig, ax = plt.subplots()
ax.set_xlim([-5, 15])
ax.set_ylim([0, 15])
plt.style.use('seaborn-v0_8-white')
ax.axes.xaxis.set_visible(False)
ax.axes.yaxis.set_visible(False)
# Add intersection diagram
plt.hlines(2, -5, 15)
plt.hlines(6, -5, 3.01)
plt.hlines(6, 6.99, 15)
plt.vlines(3, 6, 15)
plt.vlines(7, 6, 15)

plt.hlines(10, 3, 7, color = 'black')
plt.vlines(0, 2, 6, color = 'black')
plt.vlines(10, 2, 6, color = 'black')
# Add a plot of PI_1 moving trajectory
plt.hlines(4.01, 0, 10, lw = 0.8)
# Add a plot of PI_2 moving trajectory
x, y = pi_2_trajectory()
plt.plot(x, y, lw = 0.8)
# Add a plot of PI_3 moving trajectory
plt.vlines(7.3, 2, 6, lw = 0.8)
plt.vlines(8.7, 2, 6, lw = 0.8)
# Add a plot of 3 semathores
ax.add_patch(
     patches.Rectangle(
        (0.5, 4.5),
        1,
        1,
        edgecolor = 'gray',
        facecolor = 'gray',
        fill=True
     ) )
ax.add_patch(
    patches.Rectangle(
        (3.5, 8.5),
        1,
        1,
        edgecolor = 'gray',
        facecolor = 'gray',
        fill=True
    ) )
ax.add_patch(
    patches.Rectangle(
        (6, 2.5),
        1,
        1,
        edgecolor = 'gray',
        facecolor = 'gray',
        fill=True
    ) )

scat_pi_1 = ax.scatter(0, 0, color = 'red' , s = 200)
scat_pi_2 = ax.scatter(0, 0, color = 'red' , s = 200)
scat_pi_3 = ax.scatter(0, 0, color = 'red' , s = 10)
time_template = '%d'
tex_pi_1 = ax.text(0.2, 0.31, '', transform=ax.transAxes, fontsize=15)
tex_pi_2 = ax.text(0.44, 0.68, '', transform=ax.transAxes, fontsize=15)
tex_pi_3 = ax.text(0.565, 0.08, '', transform=ax.transAxes, fontsize=15)
scat_greenlight_pi_1 = ax.scatter(1, 5, color = 'red' , s = 100)
scat_greenlight_pi_2 = ax.scatter(4, 9, color = 'red' , s = 100)
scat_greenlight_pi_3 = ax.scatter(6.5, 3, color = 'red' , s = 100)
greenlight_pi_1_str_id = 0
greenlight_pi_2_str_id = 0
greenlight_pi_3_str_id = 0
greenlight_pi_1_str_max_id = len(pi_1_greem_light_time)
greenlight_pi_2_str_max_id = len(pi_2_greem_light_time)
greenlight_pi_3_str_max_id = len(pi_3_greem_light_time)

one_move_size = MAX_TIME / MOVE_COUNT
def animate(i):
    global greenlight_pi_1_str_id
    global greenlight_pi_2_str_id
    global greenlight_pi_3_str_id

    pi_1_points = np.array([ [all_pi_1_customers_moving[j][i], 4] for j in range(len(all_pi_1_customers_moving)) ] )
    pi_2_points = np.array([ [all_pi_2_customers_x_moving[j][i], all_pi_2_customers_y_moving[j][i]] for j in range(len(all_pi_2_customers_x_moving)) ] )
    pi_3_points = np.array([ [7.5 + (j % 5) / 4, all_pi_3_customers_moving[j][i]] for j in range(len(all_pi_3_customers_moving)) ] )

    scat_pi_1.set_offsets(pi_1_points)
    scat_pi_2.set_offsets(pi_2_points)
    scat_pi_3.set_offsets(pi_3_points)
    
    tex_pi_1.set_text(time_template % (pi_1_queue_length_in_time[i]))
    tex_pi_2.set_text(time_template % (pi_2_queue_length_in_time[i]))
    tex_pi_3.set_text(time_template % (pi_3_queue_length_in_time[i]))

    cur_time = i * one_move_size
    if (greenlight_pi_1_str_id < greenlight_pi_1_str_max_id):
        if (cur_time < pi_1_greem_light_time[greenlight_pi_1_str_id][0] - 3.0):
            pass
        elif (cur_time < pi_1_greem_light_time[greenlight_pi_1_str_id][0]):
            scat_greenlight_pi_1.set_color( "yellow" )
        elif (cur_time < pi_1_greem_light_time[greenlight_pi_1_str_id][1]):
            scat_greenlight_pi_1.set_color( "green" )
        elif (cur_time < pi_1_greem_light_time[greenlight_pi_1_str_id][1] + 3):
            scat_greenlight_pi_1.set_color( "yellow" )
        else:
            scat_greenlight_pi_1.set_color( "red" )
            greenlight_pi_1_str_id += 1

    if (greenlight_pi_2_str_id < greenlight_pi_2_str_max_id):
        if (cur_time < pi_2_greem_light_time[greenlight_pi_2_str_id][0] - 3.0):
            pass
        elif (cur_time < pi_2_greem_light_time[greenlight_pi_2_str_id][0]):
            scat_greenlight_pi_2.set_color( "yellow" )
        elif (cur_time < pi_2_greem_light_time[greenlight_pi_2_str_id][1]):
            scat_greenlight_pi_2.set_color( "green" )
        elif (cur_time < pi_2_greem_light_time[greenlight_pi_2_str_id][1] + 3):
            scat_greenlight_pi_2.set_color( "yellow" )
        else:
            scat_greenlight_pi_2.set_color( "red" )
            greenlight_pi_2_str_id += 1

    if (greenlight_pi_3_str_id < greenlight_pi_3_str_max_id):
        if (cur_time < pi_3_greem_light_time[greenlight_pi_3_str_id][0] - 3.0):
            pass
        elif (cur_time < pi_3_greem_light_time[greenlight_pi_3_str_id][0]):
            scat_greenlight_pi_3.set_color( "yellow" )
        elif (cur_time < pi_3_greem_light_time[greenlight_pi_3_str_id][1]):
            scat_greenlight_pi_3.set_color( "green" )
        elif (cur_time < pi_3_greem_light_time[greenlight_pi_3_str_id][1] + 3):
            scat_greenlight_pi_3.set_color( "yellow" )
        else:
            scat_greenlight_pi_3.set_color( "red" )
            greenlight_pi_3_str_id += 1

    return scat_pi_1, scat_pi_2, scat_pi_3, tex_pi_1, tex_pi_2, tex_pi_3, scat_greenlight_pi_1, scat_greenlight_pi_2, scat_greenlight_pi_3

ani = FuncAnimation(fig, animate, repeat = True, frames = MOVE_COUNT, interval = 100)
ani.save('mymovie.gif', writer='pillow')
greenlight_pi_1_str_id = 0
greenlight_pi_2_str_id = 0
greenlight_pi_3_str_id = 0
ani.save("mymovie.mp4")

In [None]:
before_x = np.full(10, 5)
before_y = np.linspace(20, 10, 10, endpoint=False)
x, y = pi_2_trajectory()
after_x = np.linspace(11, 20, 9)
after_y = np.full(9, 4)
new_x = np.concatenate((before_x, x, after_x), axis = 0)
new_y = np.concatenate((before_y, y, after_y), axis = 0)
plt.scatter(new_x, new_y, lw = 0.8)

In [None]:
plt.style.available

In [None]:
# # Creating matplotlib figure and camera object
# #fig = plt.figure()
# fig, ax = plt.subplots()
# plt.xlim(0,10)
# plt.ylim(0,10)
# #plt.axis('off')
# camera = Camera(fig)

# # Looping the data and capturing frame at each iteration
# for cur_move in range(MOVE_COUNT):
#     plt.text(0.2, 3.3, str(pi_1_queue_length_in_time[cur_move]), fontsize=15)
#     for cur_customer_moving in all_pi_1_customers_moving:
#         plt.scatter(cur_customer_moving[cur_move], 3, color = 'red' , s = 200)

#     plt.text(5.6, 9.3, str(pi_2_queue_length_in_time[cur_move]), fontsize=15)
#     cur_customer_id = 0
#     for cur_customer_x_moving in all_pi_2_customers_x_moving:
#         plt.scatter(cur_customer_x_moving[cur_move], all_pi_2_customers_y_moving[cur_customer_id][cur_move], color = 'red' , s = 200)
#         cur_customer_id += 1

#     plt.text(7.8, 1.4, str(pi_3_queue_length_in_time[cur_move]), fontsize=15)
#     cur_customer_id = 0
#     for cur_customer_moving in all_pi_3_customers_moving:
#         plt.scatter(7.5 + (cur_customer_id % 5) / 4, cur_customer_moving[cur_move], color = 'red' , s = 20)
#         cur_customer_id += 1
#     plt.title('The intersection traffic')
#     camera.snap()

# # Add intersection diagram
# plt.hlines(2, 0, 10)
# plt.hlines(6, 0, 3)
# plt.hlines(6, 7, 10)
# plt.vlines(3, 6, 10)
# plt.vlines(7, 6, 10)
# # Add a plot of PI_1 moving trajectory
# plt.hlines(3.01, 0, 10, color = 'green', lw = 0.8)
# # Add a plot of PI_2 moving trajectory
# plt.vlines(5.99, 4, 10, color = 'green', lw = 0.8)
# x = np.arange(6, 7.0001, 0.0001, dtype = np.float64)
# y = pi_2_trajectory(x)
# X_Y_Spline = make_interp_spline(x, y)
# X_ = np.linspace(x.min(), x.max(), 10000)
# Y_ = X_Y_Spline(X_)
# plt.plot(X_, Y_, color = 'green' , lw = 0.6)
# # Add a plot of PI_3 moving trajectory
# plt.vlines(7.4, 2, 6, color = 'green', lw = 0.8)
# plt.vlines(8.6, 2, 6, color = 'green', lw = 0.8)

# # Creating the animation from captured frames
# animation = camera.animate(interval = 10, repeat = True, repeat_delay = 500)
# animation.save("mymovie.mp4")
# animation.save("mymovie.gif")