Idea:

Assume only few spoofing attacks

Synchronize clocks assuming all broadcast locations are correct

Estimate positions for messages based on sensor timestamps

Remove messages with high deviations between broadcast locations and estimated locations

Re-synchronize clocks and re-check broadcast locations

In [3]:
import pandas as pd
import numpy as np
import os
import re
import position_estimator
from position_estimator import GeoPoint
from collections import defaultdict
from tqdm import tqdm
import itertools

DATA_LOCATION = "D:/thesis_data/1hour_complete_22_01_10_05/"
#DATA_LOCATION = "../raw_data/1hour/"

sensors = set()
#sensors_rev = dict()
received = defaultdict(lambda: { "sensors": list() })
sensor_received = defaultdict(list)

MSG_CUTOFF = 100000

pbar = tqdm(total=MSG_CUTOFF, mininterval=1)
pbar.update(0)

for file in os.listdir(DATA_LOCATION):
    assert re.match(r"^part-\d{5}$", file)
    pbar.set_description(file)
    pbar.refresh()
    with open(os.path.join(DATA_LOCATION, file), "r") as f:
        line = f.readline()
        if not line:
            continue
        assert line == "sensorType,sensorLatitude,sensorLongitude,sensorAltitude,timeAtServer,timeAtSensor,timestamp,rawMessage,sensorSerialNumber,RSSIPacket,RSSIPreamble,SNR,confidence\n"
        while (line := f.readline().strip()):
            sensorType,sensorLatitude,sensorLongitude,sensorAltitude,timeAtServer,timeAtSensor,timestamp,rawMessage,sensorSerialNumber,RSSIPacket,RSSIPreamble,SNR,confidence = line.split(',')
            
            if not position_estimator.is_relevant(rawMessage):
                continue

            if 'null' in [sensorLatitude, sensorLongitude, sensorAltitude, timeAtSensor]:
                continue
            
            sensorLatitude = float(sensorLatitude)
            sensorLongitude = float(sensorLongitude)
            sensorAltitude = float(sensorAltitude)
            timeAtSensor = float(timeAtSensor)

            timeAtServer = float(timeAtServer)

            # limit to europe for now
            if not 35 < sensorLatitude < 75:
                continue
            elif not -10 < sensorLongitude < 40:
                continue

            if not (sensorLatitude and sensorLongitude):
                continue

            if not "pos" in received[rawMessage]:
                try:
                    received[rawMessage]["pos"] = position_estimator.get_announced_pos(rawMessage, timeAtSensor)
                    assert -90 <= received[rawMessage]["pos"].lat <= 90
                    assert -180 <= received[rawMessage]["pos"].lon <= 180
                except:
                    #print("Couldn't get position :(")
                    del received[rawMessage]
                    continue

            sensor = (GeoPoint(sensorLatitude, sensorLongitude, sensorAltitude), sensorType)
            if sensor not in sensors:
                #sensors_rev[sensor] = len(sensors)
                sensors.add(sensor)

            if (timeAtSensor, sensor) not in received[rawMessage]["sensors"]:
                received[rawMessage]["sensors"].append((timeAtSensor, sensor))

            sensor_received[sensor].append((timeAtSensor, timeAtServer))

    pbar.n = len(received)
    pbar.refresh()

    if len(received) >= MSG_CUTOFF:
        break # memory constraint


print("Sensors:", len(sensors))
print("Received Messages:", len(received))


part-00000:  16%|█▌        | 15925/100000 [00:46<04:03, 345.15it/s] 
part-00008: : 107507it [01:12, 1487.55it/s]                         

Sensors: 810
Received Messages: 107507


In [4]:
def del_sensor(sensor):
    assert type(sensor) is tuple
    assert len(sensor) == 2
    assert type(sensor[0]) is GeoPoint
    assert type(sensor[1]) is str
    del sensor
    

print("Sensors before:", len(sensors))

for sensor, timestamps in sensor_received.items():
    for i in range(len(timestamps) - 1):
        if timestamps[i][0] > timestamps[i+1][0] + 100:
            del_sensor(sensor)
            break
            #print(sensor)
            #print(i, timestamps[i], timestamps[i+1])
            #print("td_sensor:", timestamps[i][0] - timestamps[i+1][0], "td_server:", timestamps[i][1] - timestamps[i+1][1])

print("Sensors after:", len(sensors))

Sensors before: 810
Sensors after: 810


In [5]:
# remove messages that have been sent more than once
print("Messages before:", len(received))
for msg in list(received.keys()):
    if len(set([s for t, s in received[msg]["sensors"]])) < len(received[msg]["sensors"]):
        del received[msg]
print("Messages after: ", len(received))

Messages before: 107507
Messages after:  99582


In [7]:
from position_estimator import GeoPoint
import traceback
import ipyparallel as ipp
import util
C = 299792458 # light speed, meters per second

n_cores = 16
n_messages_to_process = 32

time_delta = defaultdict(lambda: defaultdict(list))
to_process = [(e['pos'], e['sensors']) for e in received.values()]#[:n_messages_to_process]
print(to_process[0])
with ipp.Cluster(n=n_cores) as rc:
    view = rc.load_balanced_view()
    asyncresult = view.map_async(util.calc_timedeltas, *zip(*to_process))
    asyncresult.wait_interactive()
    for td in asyncresult.get():
        for s1 in td:
            for s2 in td[s1]:
                time_delta[s1][s2].extend(td[s1][s2])
'''
for msg in tqdm(received):
    if "pos" not in received[msg] or received[msg]["pos"] is None:
        continue

    if len(received[msg]["sensors"]) < 2:
        continue

    for t, s in list(received[msg]["sensors"]):
        dists = sorted([s[0].dist(e[1][0]) for e in received[msg]["sensors"] if e[1] != s])
        if dists[int(len(dists)/10)] > 1e6: # ge 1000 km
            print("removing:", t, s, dists)
            received[msg]["sensors"].remove((t, s))

    try:
        sensor_dists_to_msg_origin = {x[1]: received[msg]["pos"].dist(x[1][0]) for x in received[msg]["sensors"]}
        time_to_sensor = { s: x / C for s, x in sensor_dists_to_msg_origin.items() }
    except:
        traceback.print_exc()
        print(received[msg]["pos"].lat, received[msg]["pos"].lon, received[msg]["pos"].alt)
        print({x[1]: (x[1][0].lat, x[1][0].lon, x[1][0].alt) for x in received[msg]["sensors"]})

    for (t1, s1), (t2, s2) in itertools.combinations(received[msg]["sensors"], 2):
        assert s1 != s2
        td = (t1 - time_to_sensor[s1]) - (t2 - time_to_sensor[s2])
        time_delta[s1][s2].append(td)
        time_delta[s2][s1].append(-td)
'''


lens = []
for s1 in time_delta:
    for s2 in time_delta[s1]:
        lens.append(len(time_delta[s1][s2]))

np.histogram(lens)

  if V(jupyter_client.__version__) < V('5.0'):


(Latitude: 52.56525, Longitude: 4.23868, Altitude: 2331.7200000000003, [(46048.0, (Latitude: 52.356074, Longitude: 4.63139, Altitude: 9.0, 'dump1090')), (50744.0, (Latitude: 52.2108, Longitude: 4.4042, Altitude: 39.0, 'dump1090'))])
Starting 16 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
100%|██████████| 16/16 [00:08<00:00,  1.83engine/s]
calc_timedeltas: 100%|██████████| 99582/99582 [10:35<00:00, 156.82tasks/s]
Stopping engine(s): 1644259108
engine set stopped 1644259108: {'engines': {'0': {'exit_code': 1, 'pid': 15980, 'identifier': '0'}, '1': {'exit_code': 1, 'pid': 13632, 'identifier': '1'}, '2': {'exit_code': 1, 'pid': 12196, 'identifier': '2'}, '3': {'exit_code': 1, 'pid': 18768, 'identifier': '3'}, '4': {'exit_code': 1, 'pid': 15900, 'identifier': '4'}, '5': {'exit_code': 1, 'pid': 384, 'identifier': '5'}, '6': {'exit_code': 1, 'pid': 11996, 'identifier': '6'}, '7': {'exit_code': 1, 'pid': 9600, 'identifier': '7'}, '8': {'exit_code': 1, 'pid': 1825

(array([82522,  8028,  2362,   922,   310,   102,    32,    10,     2,
            2], dtype=int64),
 array([1.0000e+00, 4.7660e+02, 9.5220e+02, 1.4278e+03, 1.9034e+03,
        2.3790e+03, 2.8546e+03, 3.3302e+03, 3.8058e+03, 4.2814e+03,
        4.7570e+03]))

In [6]:
import pickle


with open("temp_storage/received.pickle", "wb") as f:
    pickle.dump(dict(received), f)

with open("temp_storage/sensors.pickle", "wb") as f:
    pickle.dump(sensors, f)

with open("temp_storage/time_delta.pickle", "wb") as f:
    pickle.dump(time_delta, f)

PicklingError: Can't pickle <function <lambda> at 0x000001699C49CD30>: attribute lookup <lambda> on __main__ failed

We now have all time deltas between stations that picked up the same signal.
We model the clock shifts using gaussian distributions, with some mean mu and standard deviation sigma

Then, to get time delta probability distribution between two stations that didn't measure the same message, we convolute the probability density functions of stations with directly known time-delta

Example:
sensors A and B measured the same 100 messages, so we can directly model their time delta distribution function as: td_AB = Gauss(mu_AB, sigma_AB)
Similarly, assume we estimate the density function for sensors B and C as Gauss(mu_BC, sigma_BC)

Now, to get the density function for sensors A and C, we can convolute these two density functions:
density function of time delta A-C = Gauss(mu_AB, sigma_AB) * Gauss(mu_BC, sigma_BC) = Gauss(mu_AB + mu_BC, sqrt(sigma_AB^2 + sigma_BC^2))

We do this for all paths going from A to C, and sum up all PDFs to get the final PDF for the time delta between A and C.
For efficiency, we could just do a "shortest path", where the path lengths are the sum of variances.
This way, we just take the "best" path, with least uncertainty.

In [14]:
import statistics

time_delta_gaussians =  defaultdict(lambda: defaultdict(lambda: None))

# initialize with directly known time deltas
for i in range(len(sensors)):
    for j in range(len(sensors)):
        if len(time_delta[i][j]) > 1:
            mean = statistics.mean(time_delta[i][j])
            var = statistics.variance(time_delta[i][j], xbar=mean)
            time_delta_gaussians[i][j] = (mean, var)

# floyd's algorithm
for k in tqdm(sensors):
    for i in sensors:
        for j in sensors:
            if time_delta_gaussians[i][k] is not None and time_delta_gaussians[k][j] is not None:
                new_variance = time_delta_gaussians[i][k][1] + time_delta_gaussians[k][j][1]
                if time_delta_gaussians[i][j] is None or time_delta_gaussians[i][j][1] > new_variance:
                    new_mean = time_delta_gaussians[i][k][0] + time_delta_gaussians[k][j][0]
                    time_delta_gaussians[i][j] = (new_mean, new_variance)




In [13]:
variances = [ time_delta_gaussians[i][j][1] for i,j in itertools.combinations(time_delta.keys(), 2) if time_delta_gaussians[i][j] is not None ]
num_conns = [ len(time_delta[i][j]) for i,j in itertools.combinations(time_delta.keys(), 2) if len(time_delta[i][j]) > 0]

print("Mean Variance:", statistics.mean(variances))
print("Median variance:", statistics.median(variances))
print("Variance modes:", statistics.multimode(variances))

print("Mean nConnections:", statistics.mean(num_conns))
print("Median nConnections:", statistics.median(num_conns))
print("nConnections modes:", statistics.multimode(num_conns))

TypeError: list indices must be integers or slices, not tuple

Using those time deltas, we can now check the aircraft locations

In [50]:
import scipy.optimize

def estimate_position(sensor_timestamps, sensor_ids, time_delta_gaussians, sensors):
    assert len(sensor_timestamps) >= 4
    assert len(sensor_timestamps) == len(sensor_ids)

    # we want to find a position p, for which we want to minimize some objective function.
    # this objective function is subject to the time deltas,
    # and the distances from the estimated position and the sensor positions
    # also, we should consider the variances in the time delta distributions:
    # if some time delta probability distribution has very big variance,
    # we can't trust that sensor to the same degree as a sensor with a small time delta variance

    # attempt 1:
    def residual_error(p):
        p = position_estimator.GeoPoint(*p)
        #print("p:", p.lat, p.lon, p.alt)
        err = 0
        for i,j in itertools.combinations(range(len(sensor_ids)), 2):
            if time_delta_gaussians[sensor_ids[i]][sensor_ids[j]] is None:
                continue

            dist_i = p.dist(sensors[sensor_ids[i]][0])
            dist_j = p.dist(sensors[sensor_ids[j]][0])
            t_i = sensor_timestamps[i] - dist_i / C
            t_j = sensor_timestamps[j] - dist_j / C
            #print(sensor_ids[i], sensor_ids[j], t_i, t_j, time_delta_gaussians[sensor_ids[i]][sensor_ids[j]])
            e = (t_i - t_j - time_delta_gaussians[sensor_ids[i]][sensor_ids[j]][0]) ** 2 / time_delta_gaussians[sensor_ids[i]][sensor_ids[j]][1]
            err += e
        #print(err)
        return err

    def residual_error2(p):
        p = position_estimator.GeoPoint(*p)
        err = 0
        true_t = sensor_timestamps[0] - p.dist(sensors[sensor_ids[0]][0]) / C
        for i in range(1, len(sensor_ids)):
            t = sensor_timestamps[i] - p.dist(sensors[sensor_ids[i]][0]) / C
            err += true_t - t - time_delta_gaussians[sensor_ids[0]][sensor_ids[i]][0]
        #print(err, len(sensor_ids), err/len(sensor_ids))
        return err

    def residual_error3(p):
        p = position_estimator.GeoPoint(*p)
        err = 0
        for assumed_true in range(len(sensor_ids)):
            true_t = sensor_timestamps[sensor_ids[assumed_true]] - p.dist(sensors[sensor_ids[assumed_true]][0]) / C
            for i in range(len(sensor_ids)):
                if i == assumed_true:
                    continue
                t = sensor_timestamps[i] - p.dist(sensors[sensor_ids[i]][0]) / C
                err += true_t - t - time_delta_gaussians[sensor_ids[assumed_true]][sensor_ids[i]][0]
            #print(err, len(sensor_ids), err/len(sensor_ids))
            return err

    def residual_error4(p):
        p = position_estimator.GeoPoint(*p)
        # choose 4 best time_delta distributions (smallest variances)
        val = (100000, None)
        for a, b, c, d in itertools.combinations(range(len(sensor_ids)), 4):
            s = sum([time_delta_gaussians[sensor_ids[e]][sensor_ids[f]][1] for e,f in itertools.combinations([a,b,c,d],2)])
            val = min(val, (s, (a,b,c,d)))
        err = 0
        for i,j in itertools.combinations(val[1], 2):
            #print(i,j, sensor_ids[i], sensor_ids[j])
            ti = sensor_timestamps[i] - p.dist(sensors[sensor_ids[i]][0]) / C
            tj = sensor_timestamps[j] - p.dist(sensors[sensor_ids[j]][0]) / C
            err += ti - tj - time_delta_gaussians[sensor_ids[i]][sensor_ids[j]][0]
        return err


    bounds = scipy.optimize.Bounds([-90, -180, 0], [90, 180, 100000])
    method = 'L-BFGS-B'
    optimum = scipy.optimize.minimize(residual_error, [sensors[sensor_ids[0]][0].lat, sensors[sensor_ids[0]][0].lon, sensors[sensor_ids[0]][0].alt], method=method, bounds=bounds).x
    print("optimum:", optimum)
    return GeoPoint(*optimum)



for msg in tqdm(received):
    if len(received[msg]["sensors"]) < 4:
        continue
    
    estimated_pos = estimate_position(*zip(*received[msg]["sensors"]), time_delta_gaussians, sensors)
    received[msg]["estimated_pos"] = estimated_pos
    print("Broadcast pos:", received[msg]["pos"].lat, received[msg]["pos"].lon, received[msg]["pos"].alt)
    print("Estimated pos:", received[msg]["estimated_pos"].lat, received[msg]["estimated_pos"].lon, received[msg]["estimated_pos"].alt)
    print("Dist:", received[msg]["pos"].dist(received[msg]["estimated_pos"]))
    

  0%|          | 2/188683 [00:00<6:25:55,  8.15it/s]

optimum: [ 90. 180.   9.]
Broadcast pos: 34.32622 -33.22529 3825.2400000000002
Estimated pos: 90.0 180.0 9.0
Dist: 6202119.397888449


  0%|          | 3/188683 [00:00<9:03:35,  5.78it/s]

optimum: [ 49.54356384  11.02691174 319.56716919]
Broadcast pos: 0.69173 176.19938 10668.0
Estimated pos: 49.54356384277344 11.026911735534668 319.5671691894531
Dist: 14260613.379714187


  0%|          | 5/188683 [00:01<22:18:33,  2.35it/s]

optimum: [51.71452444  1.38446694 20.        ]
Broadcast pos: 3.03355 -12.5867 5791.200000000001
Estimated pos: 51.714524444060494 1.3844669438658046 20.0
Dist: 5550827.9296652535
optimum: [ 89.99999986 180.         180.        ]
Broadcast pos: 81.60719 -153.84705 10972.800000000001
Estimated pos: 89.99999986254414 180.0 180.0
Dist: 937420.9843339204


  0%|          | 7/188683 [00:12<144:56:26,  2.77s/it]

optimum: [ 52.89687823  11.20942151 180.        ]
Broadcast pos: -57.15716 -116.46084 10972.800000000001
Estimated pos: 52.89687822818297 11.209421505255898 180.0
Dist: 16717563.718854435
optimum: [-90.      13.5866 180.    ]
Broadcast pos: 50.17372 -129.49214 10972.800000000001
Estimated pos: -90.0 13.5866 180.0
Dist: 15562139.51727772


  0%|          | 9/188683 [00:12<92:02:44,  1.76s/it] 

optimum: [  90. -180.   20.]
Broadcast pos: 14.75995 164.15857 11887.2
Estimated pos: 90.0 -180.0 20.0
Dist: 8369545.494566892


  0%|          | 9/188683 [00:24<140:24:11,  2.68s/it]


KeyboardInterrupt: 