# Incoherence: Measurement Preparation

This notebook contains code for downloading and preparing public RIPE Atlas DNS measurement data for investigating the extent and type of inconsistency in publicly visible Internet naming information.

## Initialization and Utilities

Run this first to set up the environment and define utility functions/classes used in the analysis

In [16]:
# import things for data analysis
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# import other things for talking to atlas
from collections import namedtuple
import requests
import json
import dns.message
import base64
import os.path

# Named tuple representing a single DNS measurement
NSAddrSample = namedtuple("NSAddrSample", ("msm","time","probe","server","name","address","rtt"))


def _gen_nsaddrsample_from_answers(a_res):
    """
    Given a single measurement result containing a DNS message 
    with answers, yield name/address tuples.
    """
    m = dns.message.from_wire(base64.decodebytes(a_res['result']['abuf'].encode('utf8')))
    for answer in m.answer:
        name = answer.name.to_text()
        for item in answer.items:
            ## FIXME check rrtype
            try:
                yield NSAddrSample(a_res['msm_id'],
                                   a_res['timestamp'],
                                   a_res['prb_id'],
                                   a_res['dst_addr'],
                                   name,
                                   item.address,
                                   a_res['result']['rt']
                                   )
            except KeyError as e:
                print(repr(e))
                print(repr(a_res))

def _gen_nsaddrsample_from_noanswer(a_res):
    """
    Given a single measurement result containing a DNS message 
    with answers, yield a name/address tuple.
    """
    m = dns.message.from_wire(base64.decodebytes(a_res['result']['abuf'].encode('utf8')))   
    name = m.question[0].name.to_text() # this is a little hacky
    yield NSAddrSample(a_res['msm_id'],
                       a_res['timestamp'],
                       a_res['prb_id'],
                       a_res['dst_addr'],
                       name,
                       None,
                       a_res['result']['rt'])
    
    
def gen_nsaddrsample(msm_ary):
    """
    Given an array containing measurements in an MSM retrieved 
    from RIPE Atlas, generate NSAddrSamples
    
    """
    for a_res in msm_ary:
        if a_res['type'] == 'dns':
            if 'result' in a_res and 'ANCOUNT' in a_res['result']:
                if a_res['result']['ANCOUNT'] > 0:
                    yield from _gen_nsaddrsample_from_answers(a_res)
                else:
                    yield from _gen_nsaddrsample_from_noanswer(a_res)

def gen_dict(msm_ary):
    for a_res in msm_ary:
        yield a_res
                
def get_msm(msm, gen=gen_dict, cachedir=None, start=None, stop=None):
    """
    Given an MSM, fetch it from the cache or from the RIPE Atlas API.
    Yield each separate result according to the generation function.
    """
    url = "https://atlas.ripe.net/api/v1/measurement/%u/result/" % (msm,)

    params = {"format": "json"}
    if start is not None and stop is not None:
        params["start"] = str(start)
        params["stop"] = str(stop)
    
    if cachedir and os.path.isdir(cachedir):
        filepath = os.path.join(cachedir, "measurement", "%u.json" % (msm,))

        # download if not present
        if not os.path.isfile(filepath):
            with open(filepath, mode="wb") as file:
                print("Cache miss, retrieving "+url)
                res = requests.get(url, params=params)

                if not res.ok:
                    raise RuntimeError("Atlas measurement API request failed: "+repr(res.json()))
                
                file.write(res.content)

        # then read from cache
        with open(filepath) as stream:
            yield from gen(json.loads(stream.read()))

    else:
        # just read from the net
        res = requests.get(url, params=params)
        yield from gen(json.loads(res.content.decode("utf-8")))
        
def make_dns_dataframe(msm_list=[], cachedir=None):
    df = None
    
    for msm_id in msm_list:
        if df is None: 
            df = pd.DataFrame(get_msm(msm_id, gen_nsaddrsample, cachedir))
        else:
            df.append(pd.DataFrame(get_msm(msm_id, gen_nsaddrsample, cachedir)), ignore_index=True)

    return df

## Configuration

Configure the notebook with the list of MSMs to retrieve for analysis, and the cache directory for saving Atlas measurements.

In [17]:
MSM_LIST = [
    9182135 # whoami.akamai.com -- DNS reflection hack
]

CACHEDIR = "/Users/briant/work/incoherence/atlas_cache"

In [18]:
df = make_dns_dataframe(MSM_LIST, CACHEDIR)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
m.question[0].name.to_text()

In [None]:
a.items[0].address

In [None]:
a.name.to_text()

In [None]:
with open("/Users/briant/Downloads/RIPE-Atlas-measurement-9193998.json") as jf:
    df = pd.DataFrame(gen_nsaddrsample(json.load(jf)))

In [None]:
df

### Codepile

Atlas code taken from other notebooks, used in creation of parsing and retrieval code for DNS measurements

In [None]:
Alp = collections.namedtuple("Alp", ("time","af","proto","pid","sip","dip","rtt"))

RTT_NONE = 0.0
DATA_CACHE_PATH = 'data_cache'

def gen_sagan(msm_ary):
    for a_res in msm_ary:
        yield sagan.Result.get(a_res)

def gen_dict(msm_ary):
    for a_res in msm_ary:
        yield a_res

def gen_alp(msm_ary):
    for a_res in msm_ary:
        if a_res['type'] == 'ping':
            if "rcvd" in a_res:
                for x in a_res["result"]:
                    rtt = None
                    try: 
                        rtt = float(x)
                    except:
                        try:
                            rtt = float(x['rtt'])
                        except:
                            pass
                    if rtt:
                        yield Alp(int(a_res['timestamp']), a_res['af'], a_res['proto'], 
                                  a_res['prb_id'], a_res['src_addr'], a_res['dst_addr'], rtt)
        
        elif a_res['type'] == 'traceroute':
            if ('result' in a_res) and ('result' in a_res['result'][-1]):
                for h_res in a_res['result'][-1]['result']:
                    if ('from' in h_res) and ('rtt' in h_res) and (h_res['from'] == a_res['dst_addr']):
                        yield Alp(int(a_res['timestamp']), a_res['af'], a_res['proto'] + '_TR', 
                                  a_res['prb_id'], a_res['src_addr'], a_res['dst_addr'], h_res['rtt'])

        # For HTTP, return each subresult as a separate RTT sample
        elif a_res['type'] == 'http':
            for r_res in a_res['result']:
                if ('res' in r_res) and (r_res['res'] < 400):
                    yield Alp(a_res['timestamp'], r_res['af'], 'HTTP', 
                              a_res['prb_id'], r_res['src_addr'], r_res['dst_addr'], r_res['rt'])                    

        

def get_msm(msm, gen=gen_dict, cachedir=None, start=None, stop=None):
    """
    Given an MSM, fetch it from the cache or from the RIPE Atlas API.
    Yield each separate result according to the generation function.
    """
    url = "https://atlas.ripe.net/api/v1/measurement/%u/result/" % (msm,)

    params = {"format": "json"}
    if start is not None and stop is not None:
        params["start"] = str(start)
        params["stop"] = str(stop)
    
    if cachedir and os.path.isdir(cachedir):
        filepath = os.path.join(cachedir, "measurement", "%u.json" % (msm,))

        # download if not present
        if not os.path.isfile(filepath):
            with open(filepath, mode="wb") as file:
                print("Cache miss, retrieving "+url)
                res = requests.get(url, params=params)

                if not res.ok:
                    raise "Atlas measurement API request failed: "+repr(res.json())
                
                file.write(res.content)

        # then read from cache
        with open(filepath) as stream:
            yield from gen(json.loads(stream.read()))

    else:
        # just read from the net
        res = requests.get(url, params=params)
        yield from gen(json.loads(res.content.decode("utf-8")))
        
def latency_dataframe(msms, start=None, stop=None, cachedir=None):
    """
    Given an iterable of MSMs, create a dataframe for doing latency measurements.
    """
    data = []
    
    # make a giant array
    for msm in msms:
        for alp in get_msm(msm, gen=gen_alp, 
                           start=start, stop=stop, 
                           cachedir=cachedir):
            data.append(alp)

    # create a dataframe from it
    return pd.DataFrame(data, columns=Alp._fields)


AtlasProbe = collections.namedtuple("AtlasProbe",
           ("pid", "version", "nat", "ip4", "ip6", "asn4", "asn6", "cc", "lat", "lon"))

def extract_atlas_probe(pobj):
    if "address_v4" in pobj:
        ip4 = pobj["address_v4"]
    else:
        ip4 = None

    if "address_v6" in pobj:
        ip6 = pobj["address_v6"]
    else:
        ip6 = None

    if "asn_v4" in pobj:
        asn4 = pobj["asn_v4"]
    else:
        asn4 = None

    if "asn_v6" in pobj:
        asn6 = pobj["asn_v6"]
    else:
        asn6 = None

    if "tags" in pobj:
        if "system-v1" in pobj["tags"]:
            version = 1
        elif "system-v2" in pobj["tags"]:
            version = 2
        elif "system-v3" in pobj["tags"]:
            version = 3
        elif "system-anchor" in pobj["tags"]:
            version = 4
        else:
            version = 0

        nat = "nat" in pobj["tags"]


    return AtlasProbe(pobj["id"], version, nat, ip4, ip6, asn4, asn6,
                      pobj["country_code"], pobj["latitude"], pobj["longitude"])

def get_probe(pid, cachedir=None):
    url = "https://atlas.ripe.net/api/v1/probe/%u" % (pid,)
    params = {"format": "json"}

    if cachedir and os.path.isdir(cachedir):
        filepath = os.path.join(cachedir, "probe", "%u.json" % (pid,))

        # try to read from cache
        try: 
            with open(filepath) as stream:
                return extract_atlas_probe(json.loads(stream.read()))
        except Exception as e:
            # download if not present
            if not os.path.isfile(filepath):
                with open(filepath, mode="wb") as file:
                    print("Cache miss, retrieving "+url)
                    res = requests.get(url, params=params)

                    if not res.ok:
                        raise "Atlas probe API request failed: "+repr(res.json())
                    
                    file.write(res.content)
                    return extract_atlas_probe(json.loads(res.content.decode("utf-8")))

    else:
        # just read from the net
        res = requests.get(url, params=params)
        return extract_atlas_probe(json.loads(res.content.decode("utf-8")))

def probe_dataframe_from_cache(pids):
    data = []
    
    # make a giant array
    for pid in pids:
        data.append(get_probe(pid, cachedir=DATA_CACHE_PATH))

    # create a dataframe from it
    df = pd.DataFrame(data, columns=AtlasProbe._fields)
    
    # indexed by probe ID
    df.index = df['pid']
    del(df['pid'])
    
    # and return it
    return df

def probe_dataframe_from_file(filename):
    data = []
    
    # make a giant array
    with open(filename) as stream:
        all_probes = json.loads(stream.read())
        for pobj in all_probes["objects"]:
            data.append(extract_atlas_probe(pobj))

    # create a dataframe from it
    df = pd.DataFrame(data, columns=AtlasProbe._fields)
    
    # indexed by probe ID
    df.index = df['pid']
    del(df['pid'])
    
    # and return it
    return df