## Trilateration: Measurement Preparation

Given a set of Atlas anchors, downloads anchoring measurements and generates a dataframe containing {probe, anchor, time, RTT} samples, joined to latitude and longitude information about the probe and the anchor. Trilateration and correlation experiments then follow from this prepared data.

### Needful things

First, imports, utility functions, etc. required for data preparation below

In [89]:
import pandas as pd
import matplotlib.pyplot as plt
import requests
import json
import os.path

from collections import namedtuple

DATA_CACHE_PATH="data_cache"

### Download static files to data cache

In [105]:
! curl http://ftp.ripe.net/ripe/atlas/probes/archive/meta-latest | bunzip2 > data_cache/all_probes.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  860k  100  860k    0     0  1034k      0 --:--:-- --:--:-- --:--:-- 1033k


### Probe Metadata

Get location of all probes, indexed by probe ID.

In [98]:
AtlasProbe = namedtuple("AtlasProbe",
           ("pid", "version", "nat", "ip4", "ip6", "asn4", "asn6", "cc", "lon", "lat"))

def extract_atlas_probe(pobj):
    
    if "address_v4" in pobj and pobj['address_v4'] is not None:
        ip4 = pobj["address_v4"]
    elif "prefix_v4" in pobj:
        ip4 = pobj["prefix_v4"]
    else:
        ip4 = None

    if "address_v6" in pobj and pobj['address_v6'] is not None:
        ip6 = pobj["address_v6"]
    elif "prefix_v4" in pobj:
        ip6 = pobj["prefix_v6"]
    else:
        ip6 = None

    if "asn_v4" in pobj:
        asn4 = pobj["asn_v4"]
    else:
        asn4 = None

    if "asn_v6" in pobj:
        asn6 = pobj["asn_v6"]
    else:
        asn6 = None

    if "tags" in pobj:
        if len(pobj['tags']) > 0 and isinstance(pobj['tags'][0], dict):
            alltags = [tag['slug'] for tag in pobj['tags']]
        else:
            alltags = pobj['tags']

        if "system-v1" in alltags:
            version = 1
        elif "system-v2" in alltags:
            version = 2
        elif "system-v3" in alltags:
            version = 3
        elif "system-anchor" in alltags:
            version = 4
        else:
            version = 0

        nat = "nat" in alltags
    else:
        version = None
        nat = None
        
    # Short circuit: never connected means don't load
    if "status" in pobj and pobj['status'] == 0:
        version = 0
    
    if "geometry" in pobj and "coordinates" in pobj['geometry']:
        (lon, lat) = pobj['geometry']['coordinates']
    elif "longitude" in pobj and "latitude" in pobj:
        lon = pobj['longitude']
        lat = pobj['latitude']
    else:
        lon = None
        lat = None

    return AtlasProbe(pobj["id"], version, nat, ip4, ip6, asn4, asn6,
                      pobj["country_code"], lon, lat)


# def get_probe(pid, cachedir=None):
#     url = "https://atlas.ripe.net/api/v2/probes/%u" % (pid,)
#     params = {}

#     if cachedir and os.path.isdir(cachedir):
#         filepath = os.path.join(cachedir, "probe", "%u.json" % (pid,))

#         # try to read from cache
#         try: 
#             with open(filepath) as stream:
#                 return extract_atlas_probe(json.loads(stream.read()))
#         except Exception as e:
#             # download if not present
#             if not os.path.isfile(filepath):
#                 with open(filepath, mode="wb") as file:
#                     print("Cache miss, retrieving "+url)
#                     res = requests.get(url, params=params)

#                     if not res.ok:
#                         raise RuntimeError("Atlas probe API request failed: "+repr(res.json()))
                    
#                     file.write(res.content)
#                     return extract_atlas_probe(json.loads(res.content.decode("utf-8")))

#     else:
#         # just read from the net
#         res = requests.get(url, params=params)
#         try:
#             return extract_atlas_probe(json.loads(res.content.decode("utf-8")))
#         except KeyError:
#             return None

# def probe_dataframe_from_cache(pids, cachedir=DATA_CACHE_PATH):
#     data = []
    
#     # make a giant array
#     for pid in pids:
#         data.append(get_probe(pid, cachedir=cachedir))

#     # create a dataframe from it
#     df = pd.DataFrame(data, columns=AtlasProbe._fields)
    
#     # indexed by probe ID
#     df.index = df['pid']
#     del(df['pid'])
    
#     # and return it
#     return df

def probe_dataframe_from_file(filename):
    data = []
    
    # make a giant array
    with open(filename) as stream:
        all_probes = json.loads(stream.read())
        for pobj in all_probes["objects"]:
            data.append(extract_atlas_probe(pobj))

    # create a dataframe from it
    df = pd.DataFrame(data, columns=AtlasProbe._fields)
    
    # indexed by probe ID
    df.index = df['pid']
    del(df['pid'])
    
    # and return it
    return df

Load a probe dataframe from a file, and build a lookup table for getting anchor IDs from IPv4 and IPv6 addresses.

**note** this probably isn't the right way to go about this anymore, use anchor metadata lookup instead

In [73]:
anchor_lut_v4 =  probe_dataframe_from_file("data_cache/all-probes.json").loc[6000:6999,['ip4']].dropna()
anchor_lut_v4['aid'] = anchor_lut_v4.index
anchor_lut_v4.index = anchor_lut_v4['ip4']
del anchor_lut_v4['ip4']

In [76]:
anchor_lut_v6 =  probe_dataframe_from_file("data_cache/all-probes.json").loc[6000:6999,['ip6']].dropna()
anchor_lut_v6['aid'] = anchor_lut_v6.index
anchor_lut_v6.index = anchor_lut_v6['ip6']
del anchor_lut_v6['ip6']

### Anchor Metadata

The v2 API allows us to look up information about anchors by ID. These routines allow cached access to anchor metadata records, and generate a dataframe of all anchor metadata based on them.

In [112]:
AtlasAnchor = namedtuple("AtlasAnchor",
           ("aid", "name", "pid", "ip4", "ip6", "asn4", "asn6", "cc", "lon", "lat"))

def extract_atlas_anchor(aobj):
    
    if "id" in aobj:
        aid = int(aobj["id"])
    else:
        aid = None

    if "fqdn" in aobj:
        name = aobj["fqdn"]
    else:
        name = None
    
        
    if "probe" in aobj:
        pid = int(aobj["probe"])
    else:
        pid = None
    
    if "ip_v4" in aobj:
        ip4 = aobj["ip_v4"]
    else:
        ip4 = None
    
    if "ip_v6" in aobj:
        ip6 = aobj["ip_v6"]
    else:
        ip6 = None

    if "as_v4" in aobj and aobj['as_v4'] is not None:
        asn4 = int(aobj["as_v4"])
    else:
        asn4 = None

    if "as_v6" in aobj and aobj['as_v6'] is not None:
        asn6 = int(aobj["as_v6"])
    else:
        asn6 = None
        
    if "country" in aobj:
        cc = aobj['country']
    else:
        cc = None
    
    if "geometry" in aobj and "coordinates" in aobj['geometry']:
        (lon, lat) = aobj['geometry']['coordinates']
    elif "longitude" in aobj and "latitude" in aobj:
        lon = aobj['longitude']
        lat = aobj['latitude']
    else:
        lon = None
        lat = None

    return AtlasAnchor(aid, name, pid, ip4, ip6, asn4, asn6, cc, lon, lat)


# def get_anchor(aid_or_url, cachedir=None):  
#     if isinstance(aid_or_url, str):
#         aid = int(url.strip("/").split("/")[-1])
#         url = aid_or_url
#     else:
#         aid = int(aid_or_url)
#         url = "https://atlas.ripe.net/api/v2/anchors/%u" % (aid,)

#     params = {}
        
#     if cachedir and os.path.isdir(cachedir):
#         filepath = os.path.join(cachedir, "anchor", "%u.json" % (aid,))

#         # try to read from cache
#         try: 
#             with open(filepath) as stream:
#                 return extract_atlas_anchor(json.loads(stream.read()))
#         except Exception as e:
#             # download if not present
#             if not os.path.isfile(filepath):
#                 with open(filepath, mode="wb") as file:
#                     print("Cache miss, retrieving "+url)
#                     res = requests.get(url, params=params)

#                     if not res.ok:
#                         raise RuntimeError("Atlas probe API request failed: "+repr(res.json()))
                    
#                     file.write(res.content)
#                     return extract_atlas_probe(json.loads(res.content.decode("utf-8")))

#     else:
#         # just read from the net
#         res = requests.get(url, params=params)
#         try:
#             return extract_atlas_anchor(json.loads(res.content.decode("utf-8")))
#         except KeyError:
#             return None

# def anchor_dataframe_from_file(filename):
#     data = []
    
#     # make a giant array
#     with open(filename) as stream:
#         all_anchors = json.loads(stream.read())
#         for aobj in all_anchors["results"]:
#             data.append(extract_atlas_anchor(aobj))

#     # create a dataframe from it
#     df = pd.DataFrame(data, columns=AtlasAnchor._fields)
    
#     # indexed by probe ID
#     df.index = df['aid']
#     del(df['aid'])
    
#     # and return it
#     return df

def anchor_dataframe_from_v2api():
    data = []
    url = "https://atlas.ripe.net/api/v2/anchors/"

    # iterate over API pagination
    while url is not None:
        res = requests.get(url)
        if not res.ok:
            raise RuntimeError("Atlas probe API request failed: "+repr(res.json()))

        api_content = json.loads(res.content.decode("utf-8"))
        url = api_content['next']
        for aobj in api_content["results"]:
            data.append(extract_atlas_anchor(aobj))
            
    # create a dataframe from it
    df = pd.DataFrame(data, columns=AtlasAnchor._fields)
    
    # indexed by probe ID
    df.index = df['aid']
    del(df['aid'])
    
    # and return it
    return df

In [113]:
anchor_dataframe_from_v2api()

Unnamed: 0_level_0,name,pid,ip4,ip6,asn4,asn6,cc,lon,lat
aid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
669,ar-bue-as4270.anchors.atlas.ripe.net,6170,170.210.5.200,2800:110:5::200,4270,4270.0,AR,-58.381559,-34.603684
393,at-klu-as42473.anchors.atlas.ripe.net,6120,178.255.156.202,2a00:11c0::1:0:0:3e,42473,42473.0,AT,14.318215,46.615846
574,at-szg-as5404.anchors.atlas.ripe.net,6131,217.196.147.89,2a02:16a8:dc:200::1,5404,5404.0,AT,13.011872,47.790410
335,at-vie-as1120.anchors.atlas.ripe.net,6042,193.171.255.2,2001:628:2000:feed::2,1120,1120.0,AT,16.373819,48.208174
11,at-vie-as1853.anchors.atlas.ripe.net,6009,193.170.114.240,2001:628:2005:1::2,1853,1853.0,AT,16.373819,48.208174
729,at-vie-as30971.anchors.atlas.ripe.net,6173,83.136.33.8,2a02:850:2:1::8,30971,30971.0,AT,16.369279,48.200923
998,au-bne-as4608-2.anchors.atlas.ripe.net,6270,203.119.0.195,2001:dd8:8:800::195,4608,4608.0,AU,153.095765,-27.577785
331,au-bne-as4608.anchors.atlas.ripe.net,6055,203.133.248.56,2401:2000:6660::56,4608,4608.0,AU,153.023502,-27.470933
321,au-mel-as38796.anchors.atlas.ripe.net,6044,120.29.255.85,2001:dcd:40:3d::5,38796,38796.0,AU,144.963280,-37.814107
888,au-syd-as135150.anchors.atlas.ripe.net,6226,203.217.162.30,2404:6980:43:1::2,135150,135150.0,AU,151.192883,-33.911743



### Anchoring Measurement Search

Get metadata about available anchoring measurements (just pings for now) from the Atlas API.

**FIXME** make this include measurement metadata information; we'll link against the anchor table for more.

In [116]:
AnchoringMetadata = namedtuple("AnchoringMetadata", ("aid", "type", "msm"))

def anchoring_measurements_from_v2api(how_many = None):
    data = []
    url = "https://atlas.ripe.net/api/v2/anchor-measurements/"

    # iterate over API pagination
    while url is not None:
        res = requests.get(url)
        if not res.ok:
            raise RuntimeError("Atlas probe API request failed: "+repr(res.json()))

        api_content = json.loads(res.content.decode("utf-8"))
        url = api_content['next']
        for mobj in api_content["results"]:
            try:
                aid = int(mobj["target"].strip("/").split("/")[-1])
                typ = mobj["type"]
                msm = int(mobj["measurement"].strip("/").split("/")[-1])
            except Exception:
                pass
            
            if typ != "ping":
                continue
                
            data.append(AnchoringMetadata(aid, typ, msm))
        
        if how_many is not None and len(data) >= how_many:
            break
            
    # create a dataframe from it
    df = pd.DataFrame(data, columns=AnchoringMetadata._fields)
    
    # indexed by MSM ID
    df.index = df['msm']
    del(df['msm'])
    
    # and return it
    return df

### RTT Sample Extraction

Given a set of MSMs, download (or read from cache) and generate a set of RTT tuples:

- time: timestamp of the measurement underlying the sample
- aid: Atlas probe ID of anchor
- pid: Atlas probe ID of probe
- af: address family used (4 or 6)
- proto: string identifier of protocol used
- rtt: RTT sample in microseconds

**FIXME** none of this code has been tested against the v2 api yet...

In [5]:
Alp = namedtuple("Alp", ("time","af","proto","pid","sip","dip","rtt"))

RTT_NONE = 0.0
DATA_CACHE_PATH = 'data_cache'

def gen_dict(msm_ary):
    for a_res in msm_ary:
        yield a_res

def gen_alp(msm_ary):
    for a_res in msm_ary:
        if a_res['type'] == 'ping':
            if "rcvd" in a_res:
                for x in a_res["result"]:
                    rtt = None
                    try: 
                        rtt = float(x)
                    except:
                        try:
                            rtt = float(x['rtt'])
                        except:
                            pass
                    if rtt:
                        yield Alp(int(a_res['timestamp']), a_res['af'], a_res['proto'], 
                                  a_res['prb_id'], a_res['src_addr'], a_res['dst_addr'], 
                                  int(rtt * 1000))
        
        elif a_res['type'] == 'traceroute':
            if ('result' in a_res) and ('result' in a_res['result'][-1]):
                for h_res in a_res['result'][-1]['result']:
                    if ('from' in h_res) and ('rtt' in h_res) and (h_res['from'] == a_res['dst_addr']):
                        yield Alp(int(a_res['timestamp']), a_res['af'], a_res['proto'] + '_TR', 
                                  a_res['prb_id'], a_res['src_addr'], a_res['dst_addr'], h_res['rtt'])

        # For HTTP, return each subresult as a separate RTT sample
        elif a_res['type'] == 'http':
            for r_res in a_res['result']:
                if ('res' in r_res) and (r_res['res'] < 400):
                    yield Alp(a_res['timestamp'], r_res['af'], 'HTTP', 
                              a_res['prb_id'], r_res['src_addr'], r_res['dst_addr'], r_res['rt'])                    

        

def get_msm(msm, gen=gen_dict, cachedir=None, start=None, stop=None):
    """
    Given an MSM, fetch it from the cache or from the RIPE Atlas API.
    Yield each separate result according to the generation function.
    """
    url = "https://atlas.ripe.net/api/v1/measurement/%u/result/" % (msm,)

    params = {"format": "json"}
    if start is not None and stop is not None:
        params["start"] = str(start)
        params["stop"] = str(stop)
    
    if cachedir and os.path.isdir(cachedir):
        filepath = os.path.join(cachedir, "measurement", "%u.json" % (msm,))

        # download if not present
        if not os.path.isfile(filepath):
            with open(filepath, mode="wb") as file:
                print("Cache miss, retrieving "+url)
                res = requests.get(url, params=params)

                if not res.ok:
                    raise "Atlas measurement API request failed: "+repr(res.json())
                
                file.write(res.content)

        # then read from cache
        with open(filepath) as stream:
            yield from gen(json.loads(stream.read()))

    else:
        # just read from the net
        res = requests.get(url, params=params)
        yield from gen(json.loads(res.content.decode("utf-8")))
        

### Extract and write data to HDF5



### Scratch space

In [78]:
get_probe(6003)

AtlasProbe(pid=6003, version=4, nat=False, ip4='195.140.195.61', ip6='2001:67c:2b0:3d::c38c:c33d', asn4=29432, asn6=29432, cc='FI', lon=23.7575, lat=61.4985)

In [46]:
pdf = probe_dataframe_from_file("data_cache/all-probes.json")

In [47]:
pdf

Unnamed: 0_level_0,version,nat,ip4,ip6,asn4,asn6,cc,lon,lat
pid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1,1,True,212.238.0.0/16,2001:980::/29,3265.0,3265.0,NL,4.8875,52.3875
2,1,True,145.130.128.0/17,,1136.0,,NL,4.9575,52.3085
3,1,True,62.251.17.184,2001:985:71fc:1:220:4aff:fec8:2532,3265.0,3265.0,NL,4.9375,52.3685
4,1,True,83.163.50.165,2001:980:57a4:1:220:4aff:fec8:244a,3265.0,3265.0,NL,4.6475,52.3995
5,1,True,83.163.239.181,2001:981:602b:1:220:4aff:fec8:2355,3265.0,3265.0,NL,4.9175,52.0595
6,1,True,24.132.0.0/17,,6830.0,,NL,4.9175,52.3505
7,1,True,94.214.87.41,,9143.0,,NL,6.0375,51.2305
8,1,True,83.68.21.139,2001:984:aee9:6:220:4aff:fec8:2464,3265.0,3265.0,NL,6.0375,51.2315
9,1,False,193.0.0.78,2001:67c:2e8:ffe2:220:4aff:fec6:cc9d,3333.0,3333.0,NL,4.8975,52.3815
10,1,True,,,,,NL,4.9175,52.3475


In [44]:
with open("data_cache/all-probes.json") as stream:
    all_probes = json.loads(stream.read())
    for pobj in all_probes["objects"]:
        if pobj['id'] == 33889:
            print(json.dumps(pobj, indent=2))

{
  "total_uptime": 6046784,
  "status": 1,
  "prefix_v4": "78.48.0.0/13",
  "address_v6": null,
  "probe": "https://atlas.ripe.net/api/v2/probes/33889/",
  "status_name": "Connected",
  "tags": [],
  "latitude": 52.4515,
  "asn_v6": null,
  "prefix_v6": null,
  "country_code": "DE",
  "address_v4": "78.55.49.86",
  "day": "20170806",
  "id": 33889,
  "first_connected": 1495216973,
  "longitude": 13.3615,
  "status_since": 1502064714,
  "asn_v4": 6805,
  "is_public": true,
  "is_anchor": false
}


Cache miss, retrieving https://atlas.ripe.net/api/v2/anchors/33


RuntimeError: Atlas probe API request failed: {'error': {'title': 'Not Found', 'status': 404, 'detail': 'Not found.', 'code': 104}}

In [88]:
int("https://atlas.ripe.net/api/v2/anchors/11/".strip("/").split("/")[-1])

11

In [118]:
dfa = anchoring_measurements_from_v2api(100)

In [119]:
dfa

Unnamed: 0_level_0,aid,type
msm,Unnamed: 1_level_1,Unnamed: 2_level_1
1026356,11,ping
1026358,11,ping
1026360,3,ping
1026362,3,ping
1026364,15,ping
1026366,15,ping
1026368,5,ping
1026370,5,ping
1026372,17,ping
1026374,17,ping
