# Методические указания к выполнению домашнего задания по курсу "Методы траекторной обработки сигналов"'2024

Для выполнения работы потребуется 
* Python 3 (рекомендуемая сборка для windows https://sourceforge.net/projects/winpython/files/latest/download)
* Редактор VisualStudio Code (https://code.visualstudio.com/)
* Программа просмотра радиолокационных данных РЛС кругового обзора в формате Asterix cat.240 (RadarView240)

Перед выполнением работы обязательно удостоверьтесь в работоспособности модуля `asterix`. Для этого выполните следующие шаги:
1) Установите какой-либо дистрибутив Python из дистрибутива WinPython
2) Запустите VisualStudio Code (VSCode), установите дополнения python и jupyter 
3)  При необходимости установите дополнительные модули в дистрибутив Python через консоль `./WPy64-31230/WinPython Command Prompt.exe` и пакетного менеджера `pip`, либо командой `..\WPy64-31230\scripts\python.bat -m pip install <module name>`


**Цель работы** - сформировать глубокое понимание процесса сигнальной обработки радиолокационных данных и формирования потока измерений для последующей траекторной обработки сигналов

**Изучите**, как строятся имитационные программные модели системы, включающие сигнальную и траекторную обработку радиолокационной информации, а затем **выполните задание по вариантам**.

Модель должна включать независимые подсистемы, работающие в единой системе координат и системе времени. Система времени задает шаг моделирования, который необходимо согласовывать с учётом параметров зондирующих сигналов. Система координат должна однозначно определять истинное взаимное расположение цели и радиолокатора.

Перед началом работы импортируем необходимые модули и создаим вспомогательные переменные.

In [10]:
import numpy as np      # библиотека математических функций
import socket, struct, time
# import dpkt
# import scapy.all as scapy

d2r = np.pi/180         # перевод градусы в радианы
r2d = 180/np.pi         # перевод радианы в градусы

Создадим класс  **Target** , описывающий поведение цели

In [11]:
class Target():
    # класс модели движения цели
    def __init__(self, init):
        self.state = {'x':0.0, 'vx':0.0, 'ax':0.0,
                      'y':0.0, 'vy':0.0, 'ay':0.0}
        for k in init.keys():
            self.state[k] = init[k]
            
        self.pn = {'sax':1.0, 'say': 1.0}
    
    def CV(self, dT):
        # модель движения с примерно постоянной скоростью
        keys = ['x','vx','y','vy']
        X = np.array([self.state[k] for k in keys])
        F = [[1, dT, 0, 0],
             [0,  1, 0, 0],
             [0,  0, 1, dT],
             [0,  0, 0, 1]]
        F = np.array(F)
        G = [[dT**2/2, 0],
             [dT, 0],
             [0, dT**2/2],
             [0, dT]]
        G = np.array(G)
        w = np.array([np.random.normal(0,self.pn['sax']), np.random.normal(0,self.pn['say'])])
        Xnew = F@X + G@w
        
        for i in range(4):
            self.state[keys[i]] = Xnew[i]

Создадим класс **Radar** , описывающий поведение работы радиолокатора с заданными ТТХ

In [None]:
class RadarDefault():
    # модель радиолокатора с известными ТТХ
    def __init__(self, init):
        # параметры программы SPX для отображения
        self.spx = {
                    'naz':2048,     # число отображаемых направлений азимутов
                    'nr':2048       # число отображаемых ячеек дальностей
                    }
        # инициализация параметров РЛС
        self.ttx = {
                    'lmd':0.03,     # м, длина волны
                    'dR':150,       # м, разрешающая способность по дальности
                    'dP':5*d2r,     # гр, ширина ДН антенны
                    'BW':1e6,       # Гц, ширина спектра сигнала
                    'tau':1e-6,     # с, длительность сигнала
                    'f0':0,         # Гц, центральная частота в спектре сигнала (baseband carrier)
                    'fc':0,         # Гц, несущая частота (carrier)
                    'fs':4.096e6,   # Гц, частота дискретизации сигнала РЛС
                    'w':0.2,        # об/с, угловая скорость вращения антенны
                    'Tw':5,         # с, период обзора
                    'PRT':0.5e-3,   # c, период повторения однозначной дальности
                    'PRF':2000     # Гц, частота повторения импульсов
        }
        
        self.state = {
                    'x':0.0,            # координаты РЛС x
                    'y':0.0,            # координаты РЛС y
                    'los':0.0,          # гр, направление на максимума ДН в текущий момент
                    'time':0.0          # с, локальное время
        }
        
        # переопределение параметров ТТХ и состояния РЛС при инициализации
        for k in init.keys():
            try:
                self.ttx[k] = init[k]
            except:
                self.state[k] = init[k]
                
        # отобразить все настройки
        print(self.ttx)
        print("\n")
        print(self.state)
        print("\n")
        print(self.spx)
        
    def turn(self, dt):
        # функция имитации поворота максимуа ДН антенны
        self.state['time'] += dt
        self.state['los'] += 2*np.pi*self.ttx['w']*dt
        
        if self.state['los'] > 2*np.pi:
            self.state['los'] -= 2*np.pi # wrap angle
                        
    def receive(self, target_coords):
        # функция имитации приёма отраженного сигнала от цели
        
        # расстояние между целью и РЛС
        range = np.sqrt((target_coords['x']-self.state['x'])**2 + (target_coords['y']-self.state['y'])**2)
        # запаздывание отраженного сигнала
        tz = 2*range/3e8                    
        # угол поворота линии визирования цели [0, 2pi]
        angle = np.arctan2(target_coords['y']-self.state['y'],target_coords['x']-self.state['x']) + np.pi
        # ракурс цели по отношению к РЛС 
        dangle = angle - self.state['los']
        # амплитуда отраженного сигнала без учета G и расстояния
        A = 128*np.sinc(dangle/np.pi/self.ttx['dP'])
        
        # отсчеты времени на 1 период зондирования
        t = np.arange(0,self.ttx['PRT']-self.ttx['tau'],1/self.ttx['fs'])
        # квадратуры принятого сигнала
        rx = A*np.exp(-1j*2*np.pi*self.ttx['f0']*(t-tz))*(np.heaviside(t-tz,0)-np.heaviside(t-tz-self.ttx['tau'],0)) + np.random.normal(0,10,t.shape)
        # квадратуры опорного сигнала
        tx = np.exp(1j*2*np.pi*self.ttx['f0']*(t))*(np.heaviside(t,0)-np.heaviside(t-self.ttx['tau'],0))
        # выборка отсчетов на длительности импульса
        # tx = tx[:int(self.ttx['tau']*self.ttx['fs'])]   
        # согласованная фильтрация сигнала
        rx_fft = np.fft.fft(rx, int(self.spx['nr']))
        tx_fft = np.fft.fft(tx, int(self.spx['nr']))
        signal = np.fft.ifft(rx_fft*tx_fft)
        # преобразование в uint8
        signal = np.uint8(np.abs(signal))
        # возвращаем угол и развертку по дальности
        return {'time':self.state['time'], 
                'angle':self.state['los'],
                'data':signal,
                'cell_dur':1/self.ttx['fs']}

In [27]:
class Radar():
    # модель радиолокатора с известными ТТХ
    def __init__(self, init):
        # параметры программы SPX для отображения
        self.spx = {
                    'naz':2048,     # число отображаемых направлений азимутов
                    'nr':2048       # число отображаемых ячеек дальностей
                    }
        # инициализация параметров РЛС
        self.ttx = {
                    'lmd':0.03,     # м, длина волны
                    'dR':50,       # м, разрешающая способность по дальности
                    'dP':5*d2r,     # гр, ширина ДН антенны
                    'BW':2.048e6,       # Гц, ширина спектра сигнала (я решил, что это будет частота девиации для ЛЧМ...)
                    'tau':1e-6,     # с, длительность сигнала
                    'f0':0,         # Гц, центральная частота в спектре сигнала (baseband carrier)
                    'fc':0,         # Гц, несущая частота (carrier)
                    'fs':4.096e6,   # Гц, частота дискретизации сигнала РЛС
                    'w':0.2,        # об/с, угловая скорость вращения антенны
                    'Tw':5,         # с, период обзора
                    'PRT':0.5e-3,   # c, период повторения однозначной дальности
                    'PRF':2000     # Гц, частота повторения импульсов
        }
        
        self.state = {
                    'x':0.0,            # координаты РЛС x
                    'y':0.0,            # координаты РЛС y
                    'los':0.0,          # гр, направление на максимума ДН в текущий момент
                    'time':0.0          # с, локальное время
        }
        
        # переопределение параметров ТТХ и состояния РЛС при инициализации
        for k in init.keys():
            try:
                self.ttx[k] = init[k]
            except:
                self.state[k] = init[k]
                
        # отобразить все настройки
        print(self.ttx)
        print("\n")
        print(self.state)
        print("\n")
        print(self.spx)
        
    def turn(self, dt):
        # функция имитации поворота максимуа ДН антенны
        self.state['time'] += dt
        self.state['los'] += 2*np.pi*self.ttx['w']*dt
        
        if self.state['los'] > 2*np.pi:
            self.state['los'] -= 2*np.pi # wrap angle
                        
    def receive(self, target_coords):
        # функция имитации приёма отраженного сигнала от цели
        
        # расстояние между целью и РЛС
        range = np.sqrt((target_coords['x']-self.state['x'])**2 + (target_coords['y']-self.state['y'])**2)
        # запаздывание отраженного сигнала
        tz = 2*range/3e8                    
        # угол поворота линии визирования цели [0, 2pi]
        angle = np.arctan2(target_coords['y']-self.state['y'],target_coords['x']-self.state['x']) + np.pi
        # ракурс цели по отношению к РЛС 
        dangle = angle - self.state['los']
        # амплитуда отраженного сигнала без учета G и расстояния
        A = 128*np.sinc(dangle/np.pi/self.ttx['dP'])
        
        # отсчеты времени на 1 период зондирования
        t = np.arange(0,self.ttx['PRT'], 1/self.ttx['fs'])
        
        # формирование модулирующего сигнала
        m_msg = (2 * (t * self.ttx['PRF'] % 1) - 1)  # Треугольный сигнал, изменяющийся от -1 до 1

        # квадратуры опорного сигнала
        tx = np.exp(1j * (2 * np.pi * self.ttx['f0'] * t + (2 * np.pi * self.ttx['BW'] * np.cumsum(m_msg) / self.ttx['fs'])))
        
        # выборка отсчетов на длительности импульса
        # tx = tx[:int(self.ttx['tau']*self.ttx['fs'])] #TODO: Скорее всего это укорачивание было нужно

        # Пересчет циклического сдвига ЛЧМ сигнала из секунд в отсчеты
        shift_amount = int(tz * self.ttx['fs'])  # Количество отсчетов для сдвига

        # квадратуры принятого сигнала
        rx_z = np.roll(tx, shift_amount)  # Циклический сдвиг
        rx = A*rx_z
        

        # согласованная фильтрация сигнала
        rx_fft = np.fft.fft(rx, int(self.spx['nr']))
        tx_fft = np.fft.fft(tx, int(self.spx['nr']))
        signal = np.fft.ifft(rx_fft*tx_fft)
        # преобразование в uint8
        signal = np.uint8(np.abs(signal))
        # возвращаем угол и развертку по дальности
        return {'time':self.state['time'], 
                'angle':self.state['los'],
                'data':signal,
                'cell_dur':1/self.ttx['fs']}

Создадим класс **DataSender**, формирующий пакет данных в формате Asterix cat. 240 для отображения на индикаторе SPX RadarView-240. Описание формата полей доступно по адресу https://www.eurocontrol.int/publication/cat240-eurocontrol-specification-surveillance-data-exchange-asterix . Изучите его самостоятельно.

Программа SPX RadarView-240 имеет ограничения в бесплатной версии: число азимутов 2048, число ячеек дальности 2048, динамический диапазон амплитуд 8 бит.

In [18]:
class DataSender():
    def __init__(self, udp_ip, udp_port):
        # открываем сокет для отправки пакетов (создаем сервер)
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)

        # адрес назначения
        self.UDP_IP = udp_ip
        # порт назначения
        self.UDP_PORT = udp_port
        
    def convert(self,datain):
        # возвращает пакет данных для отправки
        dataItem = {}
        r2ast = 2**16/2/np.pi           # дискретность по углам согласно стандарту
        adc2ast = 1e9                   # дискретность в пикосекундах 1e-12 с 
        s2ms = 1e3                      # дискретность по времени в 1мc
        # заполняем поля asterix-240
        
        dataItem['010'] = struct.pack('>BB',73,0)                           # sac/sic data source identifiers
        dataItem['000'] = struct.pack('>B',2)                               # message type
        dataItem['020'] = struct.pack('>I',(int(datain['time']*s2ms)))      # unique message counter (ms in integer)
        dataItem['040'] = struct.pack('>H',int(datain['angle']*r2ast))      # start_az field
        dataItem['040'] += struct.pack('>H',int(datain['angle']*r2ast))     # end_az field
        dataItem['040'] += struct.pack('>I',1)                              # start_rg field
        dataItem['040'] += struct.pack('>I',int(datain['cell_dur']*adc2ast))# cell_dur (adc2ast/radar.fs)
        dataItem['048'] = struct.pack('>BB',0,4)                            # no compression, 8-bit resolution
        dataItem['049'] = struct.pack('>H',2048)                            # number of valid video blocks
        dataItem['049'] += int(2048).to_bytes(3,'big')                      # number of valid cells (non zero)
        dataItem['051'] = struct.pack('>B',32)                              # counter for 4 byte blocks
        dataItem['051'] += bytearray(datain['data'][:2048])                 # array of magnitude
        
        cat = struct.pack('>B',240)
        fspec = struct.pack('>BB',
                            np.uint8(int('11101011',2)),
                            np.uint8(int('10100000',2)))   # mask of fields
        items = ['010','000','020','040','048','049','051']
        # construct msg
        msg = bytearray()
        for i in items:
            msg += dataItem[i]
        length = struct.pack('>H',int(len(msg)+5))
        msg = cat+length+fspec+msg
        return msg
        
    def send(self, msg):
        # отправка по сети
        self.sock.sendto(msg, (self.UDP_IP, self.UDP_PORT))
    
    def dump(self, filename):
        # сохранение сетевого дампа в файл
        pass
        
    def play(self, filename, repeat=False):
        # with open(filename, 'rb') as f:
            
        #     pcapng = dpkt.pcapng.Reader(f)
            
        #     for block in pcapng:
        #         if isinstance(block, dpkt.pcapng.PacketBlock):
        #             print(block)
        #             # print("isinstance")
        #             # Извлекаем данные пакета
        #             print(block.packet)
        #             packet_data,timestamp = block.packet
        #             # timestamp = block.timestamp
        #             # Отправляем пакет через сокет
        #             self.sock.sendto(packet_data, (self.multicast_ip, self.port))
        #             print(f'Sent packet at {timestamp} with size {len(packet_data)} bytes')
        pass 

Создадим объекты рассмотренных классов и смоделируем работу системы в течение 30 секунд. Результат формирования радиолокационной информации будем преобразовывать в формат Asterix cat.240 и отправлять по UDP в программу RadarView240.

In [32]:
radar = Radar({'x':1000,'y':1000})
target = Target({'x':5000,'vx':-50,'y':1500,'vy':-50})
network = DataSender("239.192.43.78", 4378)

T = 30
t = 0
dt = radar.ttx['PRT']

while t < T:
    target.CV(dt)
    data = radar.receive(target.state)
    asterix_packet = network.convert(data)
    network.send(asterix_packet)
    radar.turn(dt)
    t = t + dt

# network.play("shitMTOS.pcapng", False)

{'lmd': 0.03, 'dR': 50, 'dP': 0.08726646259971647, 'BW': 2048000.0, 'tau': 1e-06, 'f0': 0, 'fc': 0, 'fs': 4096000.0, 'w': 0.2, 'Tw': 5, 'PRT': 0.0005, 'PRF': 2000, 'x': 1000, 'y': 1000}


{'x': 0.0, 'y': 0.0, 'los': 0.0, 'time': 0.0}


{'naz': 2048, 'nr': 2048}


# Задание

Cформировать модель движения цели с заданными параметрами: CV модель, $\sigma_a^2 = 3$. В соответствии с вариантом реализовать приём отраженного сигнала из заданной позиции в соответствии с ТТХ РЛС перечисленными в таблице. Дописать функции сохранения и воспроизведения сетевого дампа в классе **DataSender** . Сделать запись в файл отраженного сигнала в течение 10 оборотов в формате Asterix cat.240 для повторного воспроизведения.

№ варианта | ТТХ
--| ---
1 | Зондирующий сигнал ЛЧМ. Разрешающая способность по дальности 50м. Разрешающая способность по азимуту 5 гр. Длина волны 3см. Угловая скорость вращения 1 оборт за 5 секунд. Координаты РЛС (x,y): 1000, 1000м
2 | Зондирующий сигнал простой радиоимпульс длительностью. Разрешающая способность по дальности 100м. Разрешающая способность по азимуту 2 гр. Длина волны 3см. Угловая скорость вращения 1 оборт за 12 секунд. Координаты РЛС (x,y): 2000, 3000м
3 | Зондирующий сигнал ЛЧМ. Разрешающая способность по дальности 50м. Разрешающая способность по азимуту 5 гр. Длина волны 3см. Угловая скорость вращения 1 оборт за 5 секунд. Координаты РЛС (x,y): -5000, 1000м
4 | Зондирующий сигнал простой радиоимпульс длительностью. Разрешающая способность по дальности 100м. Разрешающая способность по азимуту 2 гр. Длина волны 3см. Угловая скорость вращения 1 оборт за 12 секунд. Координаты РЛС (x,y): -2000, -3000м
5 | Зондирующий сигнал ЛЧМ. Разрешающая способность по дальности 50м. Разрешающая способность по азимуту 5 гр. Длина волны 3см. Угловая скорость вращения 1 оборт за 5 секунд. Координаты РЛС (x,y): 4000, -3000м
6 | Зондирующий сигнал простой радиоимпульс длительностью. Разрешающая способность по дальности 100м. Разрешающая способность по азимуту 2 гр. Длина волны 3см. Угловая скорость вращения 1 оборт за 12 секунд. Координаты РЛС (x,y): -1500, 4500м

In [16]:
# напишите ваш код в этой части