In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from Point import Point

from datetime import datetime

from Point import Point
from PIL import Image
from Trajectory import Trajectory

In [92]:


class Trajectory(object):
    """
    1. Метод, который кушает одну приходящую точку (x, y, z, v_x=np.nan, v_y=np.nan, v_z=np.nan, a_x=np.nan, a_y=np.nan, a_z=np.nan),
    если тестовый режим работы - зашумляем, если присутствуют только координаты, *считает* скорости, ускорения и сохраняет это в self.x_list, self...
    2. Внутренний методы просчёта, фильтрации и аппроксимации траекторий.
    3. Метод для тестового зашумления данных и тд
    4. 
    """

    def __init__(self, target="Target_0"):
        # Название и характеристики цели (на данный момент - строка):
        self.target = target
        # Координаты, скорости, ускорения и таймстампы точек траектории
        self.trajectory = []
        # Значения точек траектории, полученные после её обработки одним
        # или несколькими последовательно примененными алгоритмами фильтрации
        # и интерполяции
        self.filtered_trajectory = []
        #self.__map__ = None
        #self.__img__ = imread(map_)

    def add_point_by_coordinates(self, timestamp, x, y, z,
                                 v_x=np.nan, v_y=np.nan, v_z=np.nan,
                                 a_x=np.nan, a_y=np.nan, a_z=np.nan):

        self.add_point(Point(timestamp, x, y, z, v_x, v_y, v_z, a_x, a_y, a_z))
        return self

    def add_point(self, point):
        """
        Добавление новой точки:

        Аргументы:
            point: Point: объект класса Point,
        """
        self.trajectory.append(point)
        self._filter_trajectory(filtring_type="l1_filtering")
        # self._recompute_speed()
        # self._recompute_accelerations()
        return self

    def _filter_trajectory(self, filtring_type=None):

        import copy
        """
        Фильтрация и сглаживание траектории одним из способов:
        Нужно минимум window точек
        """


        def savgol(arr, window=15, order=5, deriv=0, rate=1):
            # window >= order + 2

            from math import factorial

            if len(arr) < window:
                window = len(arr)
                if window < order + 2:
                    order = window - 2

            y = np.asarray(arr[:])
            order_range = range(order + 1)
            half_window = (window - 1) // 2
            b = np.mat([[k ** i for i in order_range] for k in range(-half_window, half_window + 1)])
            m = np.linalg.pinv(b).A[deriv] * rate ** deriv * factorial(deriv)
            firstvals = y[0] - np.abs(y[1:half_window + 1][::-1] - y[0])
            lastvals = y[-1] + np.abs(y[-half_window - 1:-1][::-1] - y[-1])
            y = np.concatenate((firstvals, y, lastvals))
            return np.convolve(m[::-1], y, mode='valid')

        self.filtered_trajectory = copy.deepcopy(self.trajectory)
        if len(self.filtered_trajectory) > 2:
            filtered_coordinate = savgol([self.trajectory[i].x for i in range(len(self.trajectory))])
            for i in range(len(self.filtered_trajectory)):
                self.filtered_trajectory[i].x = filtered_coordinate[i]
            filtered_coordinate = savgol([self.trajectory[i].y for i in range(len(self.trajectory))])
            for i in range(len(self.filtered_trajectory)):
                self.filtered_trajectory[i].y = filtered_coordinate[i]
            filtered_coordinate = savgol([self.trajectory[i].z for i in range(len(self.trajectory))])
            for i in range(len(self.filtered_trajectory)):
                self.filtered_trajectory[i].z = filtered_coordinate[i]

        return self.filtered_trajectory

    def get_map(self):
        mplfigure = Figure()
        ax = mplfigure.add_subplot(111)
        ax.plot([self.filtered_trajectory[i].x for i in range(len(self.filtered_trajectory))],
                 [self.filtered_trajectory[i].y for i in range(len(self.filtered_trajectory))])
        ax.legend()
        ax.imshow(self.__img__, extent=[0, self.__img__.shape[1], 0, self.__img__.shape[0]])
        canvas = FigureCanvas(mplfigure)
        canvas.set_size_request(400, 400)
        return canvas
    
    def get_coordinates_list(self):
        """
        Метод, который выдаёт список координат в формате [[t_1, ..., t_n],
        [x_1, ..., x_n], [y_1, ..., y_n], [z_1, ..., z_n], ]
        """
        ts = [coord.timestamp for coord in self.filtered_trajectory]
        xs = [coord.x for coord in self.filtered_trajectory]
        ys = [coord.y for coord in self.filtered_trajectory]
        zs = [coord.z for coord in self.filtered_trajectory]
        
        return ts, xs, ys, zs
        
        
class Target(Trajectory):
    def __init__(self, target_name, target_id, pic_path="data/pics/airplane.png"):
        """
        Класс цели (объекта наведения)
        
        Аргументы:
            target_name: str: имя цели
            target_id: str: уникальный id цели
            airplane_picture: str: путь к изображению самолётика
        """
        super(Target, self).__init__()
        self.__pic__ = Image.open(pic_path)
        self.target_name = target_name
        self.target_id = target_id
        
    def fake_trajectory(self):
        """
        Генератор случайной траектории полета цели для тестового использования
        """
        x, y, z = [0, 0, 50]
        for i in range(600):
            self.add_point(Point(datetime.now().isoformat(), x + i, y + i, z + i))
            

    def get_coordinates_list(self):
        """
        Метод, который выдаёт список координат в формате [[t_1, ..., t_n],
        [x_1, ..., x_n], [y_1, ..., y_n], [z_1, ..., z_n], ]
        """
        ts = [coord.timestamp for coord in self.filtered_trajectory]
        xs = [coord.x for coord in self.filtered_trajectory]
        ys = [coord.y for coord in self.filtered_trajectory]
        zs = [coord.z for coord in self.filtered_trajectory]

        return ts, xs, ys, zs

    def load_trajectory(self, filename="data/trajectories/ID001.json"):
        """
        Функция загрузки траектории из файла.

        :param filename: str: Строка, указывающая полный путь до файла траектории
        :return:
        """
        df = pd.read_json(filename, orient="records", lines=True)
        df = df.iloc[len(self.trajectory):]
        #ts, xs, ys, zs = df["timestamp"].values, df["x"].values, df["y"].values, df["z"].values
        for point_ in df.iterrows():
            ts, xs, ys, zs = point_[1].values
            self.add_point_by_coordinates(ts, xs, ys, zs)

    def save_trajectory(self, filename="data/trajectories/ID001.json"):
        """
        Функция сохранения траектории в файл.

        :param filename: str: Строка, указывающая полный путь до файла траектории
        :return:
        """

        ts, xs, ys, zs = self.get_coordinates_list()
        df = pd.DataFrame(np.array([ts, xs, ys, zs]).T, columns=["timestamp", "x", "y", "z"])

        df.to_json(filename, orient="records", lines=True)

In [93]:
test = Target(target_name="F-16", target_id="ID001")

In [94]:
#test.fake_trajectory()

In [65]:
%%time
test.save_trajectory("data/trajectories/test.json")

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 11.3 ms


In [96]:
%%time
a = pd.read_json("data/trajectories/test.json", orient="records", lines=True)

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 30.3 ms


In [97]:
%%time
a = test.get_coordinates_list()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 35 µs


In [99]:
df.to_json(date_format="%Y-%M-&d T%h:%m:%s.%f")

'{"x":{"0":0.0,"1":0.0,"2":0.0},"y":{"0":0.0,"1":0.0,"2":0.0},"z":{"0":0.0,"1":0.0,"2":0.0}}'

In [100]:
df

Unnamed: 0,x,y,z
0,0.0,0.0,0.0
1,0.0,0.0,0.0
2,0.0,0.0,0.0


In [98]:
%%time
test.load_trajectory("data/trajectories/test.json")

TypeError: Parser must be a string or character stream, not Timestamp

In [78]:
test = Target(target_id="ID002", target_name="Su-25")

In [79]:
%%time
test.load_trajectory("data/trajectories/test.json")

ValueError: too many values to unpack

In [81]:
df = pd.DataFrame(data=np.zeros([3, 3]), columns=["x", "y", "z"])

In [90]:
a, b, d = list(df.iterrows())[1][1].values

In [91]:
a

0.0