In [2]:
import logging
import coloredlogs
import requests
from time import sleep

In [3]:
logging.basicConfig()
logger = logging.getLogger("poll-ooni")
# logger.setLevel(logging.DEBUG)
coloredlogs.install()
coloredlogs.install(level='DEBUG')
# coloredlogs.install(level='INFO')

In [4]:
config = {
    "sleep-times": {
        "ooni-poll": 60*5,
        "ooni-paginate": 2,
    }
}


In [5]:
# TODO make dry!
def is_nonempty_str(my_str: str) -> bool:
    return (type(my_str) == str) & (len(my_str) > 0)

class Alpha2 ():
    '''
    Represents an ISO alpha-2 country code.
    '''
    def __init__(self,
                 country_code: str):
        assert(is_nonempty_str(country_code))
        assert(len(country_code)==2)
        self.country_code = country_code
        
    def __str__(self):
        return f'{self.country_code}'
    
    def __repr__(self):
        return self.__str__()


# Getting the data we need

## Measuremnets from OONI

In [98]:
def api_query (query: str, results=[], queries=1, max_queries=None) -> list:
    '''Recursively query the API, up to `max_queries`. (If `max_queries=None`, we
    will paginate through the results as long as they run).
    '''
    base_url = 'https://api.ooni.io/api/v1/'
    query = '{!s}{!s}'.format(base_url, query)
    try:
        resp =  requests.get(query).json()
        results = results + resp['results']
        next_url = resp['metadata']['next_url']
        if max_queries is not None and queries > max_queries:
            return results
        if next_url:
            # sleep so as to not overwhelm the endpoint
            sleep(config['sleep-times']['ooni-paginate'])
            # remove base url to perfrom the query
            next_url = next_url.split('api/v1/')[1]
            print("next URL", next_url)
            return api_query(next_url, results, queries+1, max_queries)
        return results
    except Exception as inst:
        # if we have an error,
        logger.warning("Error querying API: {!s}".format(inst))
        # just return what we've collected
        # (at worst, `results` will be `[]`)
        return results


In [100]:
base_query = 'measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000'

def query_recent_measurements (max_queries=5) -> list:
    '''Queries all recent measurements, up to specified maximum number of queries.'''
    return api_query(base_query, max_queries=max_queries)

def get_blocking_type (measurement) -> str:
    '''Get blocking type, if available.'''
    try:
        return measurement['scores']['analysis']['blocking_type']
    except:
        return None

In [101]:
measurements = query_recent_measurements()

2021-05-25 13:19:23 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443
2021-05-25 13:19:24 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000 HTTP/1.1" 200 824369
2021-05-25 13:19:27 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443


next URL measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=1000


2021-05-25 13:19:28 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=1000 HTTP/1.1" 200 827717
2021-05-25 13:19:31 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443


next URL measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=2000


2021-05-25 13:19:32 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=2000 HTTP/1.1" 200 830892
2021-05-25 13:19:35 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443


next URL measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=3000


2021-05-25 13:19:36 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=3000 HTTP/1.1" 200 827264
2021-05-25 13:19:39 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443


next URL measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=4000


2021-05-25 13:19:40 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=4000 HTTP/1.1" 200 830291
2021-05-25 13:19:43 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443


next URL measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=5000


2021-05-25 13:19:44 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&offset=5000 HTTP/1.1" 200 825740


In [102]:
len(measurements)

5995

In [12]:
m_detail = requests.get(m['measurement_url']).json()

m_detail['test_keys']['queries']
m_detail

2021-05-25 12:59:39 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): ams-pg.ooni.org:443
2021-05-25 12:59:39 congratulations urllib3.connectionpool[4205] DEBUG https://ams-pg.ooni.org:443 "GET /api/v1/raw_measurement?report_id=20210525T181651Z_webconnectivity_IN_55836_n1_v7tNboMf0kqxJ5gp&input=https%3A%2F%2Fwww.sendspace.com%2F HTTP/1.1" 200 10333


{'annotations': {'engine_name': 'ooniprobe-engine',
  'engine_version': '3.9.0',
  'flavor': 'stableFull',
  'network_type': 'wifi',
  'platform': 'android'},
 'data_format_version': '0.2.0',
 'input': 'https://www.sendspace.com/',
 'measurement_start_time': '2021-05-25 19:01:09',
 'probe_asn': 'AS55836',
 'probe_cc': 'IN',
 'probe_ip': '127.0.0.1',
 'probe_network_name': 'Reliance Jio Infocomm Limited',
 'report_id': '20210525T181651Z_webconnectivity_IN_55836_n1_v7tNboMf0kqxJ5gp',
 'resolver_asn': 'AS55836',
 'resolver_ip': '49.45.31.116',
 'resolver_network_name': 'Reliance Jio Infocomm Limited',
 'software_name': 'ooniprobe-android',
 'software_version': '2.11.0',
 'test_helpers': {'backend': {'address': 'https://wcth.ooni.io',
   'type': 'https'}},
 'test_keys': {'agent': 'redirect',
  'client_resolver': '49.45.31.116',
  'retries': None,
  'socksproxy': None,
  'network_events': [{'address': '172.64.100.30:443',
    'failure': None,
    'operation': 'connect',
    'proto': 'tcp',


## Get IP from URL

In [13]:
inputs = [m['input'] for m in measurements]

In [22]:
import socket
import urllib.parse
from typing import Optional

def get_hostname (url):
    return urllib.parse.urlparse(url).netloc

def get_ip (hostname: str) ->  Optional[str]:
    try:
        hostname = socket.gethostbyname(hostname)
        if hostname == '127.0.0.1' or hostname == '0.0.0.0':
            return None
        return hostname
    except Exception as inst:
            logger.warning(f"Error looking up IP of hostname {hostname}: {inst}")
            return None


In [16]:
my_ip = get_ip(get_hostname(inputs[505]))
my_ip

'104.21.28.97'

## Get geolocation from IP

In [18]:
import geoip2.database
from typing import Optional

def ip_to_alpha2 (ip: str) -> Optional[Alpha2]:
    with geoip2.database.Reader('dbip-country-lite-2021-05.mmdb') as reader:
        try:
            response = reader.country(ip)
            return Alpha2(response.country.iso_code)
        except Exception as inst:
            # if we have an error,
            logger.warning(f"Error looking up country code of IP {ip}: {inst}")
            return None
    
ip_to_alpha2(my_ip)

US

## Putting it all together

In [116]:
def url_to_alpha2 (url: str) -> Optional[Alpha2]:
    hostname = get_hostname(url)
    maybe_ip = get_ip(hostname)
    if maybe_ip is None:
        return None
    return ip_to_alpha2(maybe_ip)

url_to_alpha2('https://wikipedia.org/')

US

## Get TLD jurisdiction

In [117]:
from imp import reload
from src.w3techs import utils as w3techs_utils
reload(w3techs_utils)
from tldextract import extract

In [26]:
def get_tld_jurisdiction (url: str) -> Optional[Alpha2]:
    '''
    Takes a URL and gets an Alpha 2
    representing the jurisdiction of the URL's top-level domain.
    '''
    tld = extract(url)
    # get last item in url
    # e.g., '.com.br' should be '.br'
    tld = tld.suffix
    tld = tld.split('.')[-1]
    # put it
    tld_str = f'.{tld}'
#     return tld_str
    # TODO put htis logic in get_country
    cc =  w3techs_utils.get_country(tld_str)
    if cc is not None:
        return Alpha2(cc)
    return None

get_tld_jurisdiction('mycool.com.br')

BR

# Model the datatype

In [30]:
import pandas as pd

def now () -> pd.Timestamp:
    return pd.Timestamp.utcnow()

def is_in_future (timestamp: pd.Timestamp) -> bool:
    return timestamp > now()

is_in_future(  pd.Timestamp('2021-06-24T20:36:06Z'))

True

In [31]:
from psycopg2.extensions import cursor
from psycopg2.extensions import connection
import pandas as pd

class OONIWebConnectivityTest():
    '''
    Class to capture results of an OONI web connectivity test.
      - https://ooni.org/nettest/web-connectivity/
    See README for more details on these fields.
    
    This is where validation happens.
    TODO - Check for SQL injection attacks.
    '''
    def __init__(self,
                  blocking_type: str,
                  probe_alpha2: Alpha2,
                  input_url: str,
                  anomaly: bool,
                  confirmed: bool,
                  report_id: str,
                  input_ip_alpha2: Alpha2,
                  tld_jurisdiction_alpha2: Alpha2,
                  measurement_start_time: pd.Timestamp):
            # we only want stuff where blocking actually happened 
            assert(blocking_type != False)
            self.blocking_type = blocking_type
            
            assert(type(probe_alpha2) == Alpha2)
            self.probe_alpha2 = str(probe_alpha2)
            
            assert(is_nonempty_str(input_url))
            self.input_url = input_url
            
            assert(type(anomaly) == bool)
            self.anomaly = anomaly
            
            assert(type(confirmed) == bool)
            self.confirmed = confirmed
            
            assert(is_nonempty_str(report_id))
            self.report_id = report_id
            
            # type is optional
            assert((type(input_ip_alpha2) == Alpha2) or 
                   (input_ip_alpha2 == None))
            if input_ip_alpha2 == None:
                self.input_ip_alpha2 = None
            else:
                self.input_ip_alpha2 = str(input_ip_alpha2)
            
            # type is optional
            assert((type(tld_jurisdiction_alpha2) == Alpha2) or
                   (tld_jurisdiction_alpha2 == None))
            if tld_jurisdiction_alpha2 == None:
                self.tld_jurisdiction_alpha2 = None
            else:
                self.tld_jurisdiction_alpha2 = str(tld_jurisdiction_alpha2)
            
            assert(type(measurement_start_time) == pd.Timestamp)
            # if the timestamp is in the future...
            if is_in_future(measurement_start_time):
                # set the time to now.
                self.measurement_start_time = now()
            # otherwise
            else:
                # set it to whenever it was reported
                self.measurement_start_time = measurement_start_time
            
    def create_table(
            self,
            cur: cursor,
            conn: connection):
        cmd = """
          CREATE TABLE ooni_web_connectivity_test (
             blocking_type             VARCHAR,
             probe_alpha2              CHAR(2) NOT NULL,
             input_url                 VARCHAR NOT NULL,
             anomaly                   BOOLEAN NOT NULL,
             confirmed                 BOOLEAN NOT NULL,
             report_id                 VARCHAR NOT NULL,
             input_ip_alpha2           CHAR(2),
             tld_jurisdiction_alpha2   CHAR(2),
             measurement_start_time    TIMESTAMPTZ NOT NULL
          )
        """
        cur.execute(cmd)
        conn.commit()

    def write_to_db(
            self,
            cur: cursor,
            conn: connection,
            commit=True,
    ):
        cur.execute(
            """
            INSERT INTO ooni_web_connectivity_test
            (blocking_type, probe_alpha2, input_url, anomaly, confirmed, report_id,  
            input_ip_alpha2, tld_jurisdiction_alpha2, measurement_start_time)   
            VALUES
            (%s, %s, %s, %s, %s, %s, %s, %s, %s)   
            """, (self.blocking_type,
                  self.probe_alpha2,
                  self.input_url,
                  self.anomaly,
                  self.confirmed,
                  self.report_id,
                  self.input_ip_alpha2,
                  self.tld_jurisdiction_alpha2,
                  self.measurement_start_time))
        if commit:
            return conn.commit()
        return

    def __str__(self):
        # TODO make DRY with write_to_db?
        # TODO do this in general?
        return f'{self.measurement_start_time} - {self.probe_alpha2} -> {self.input_ip_alpha2}, {self.tld_jurisdiction_alpha2} ({self.blocking_type} {self.input_url})'

    def __repr__(self):
        return self.__str__()


# Marshall from datatype

In [32]:
def ingest_api_measurement (measurement: dict) -> OONIWebConnectivityTest:
    blocking_type = get_blocking_type(measurement)
    probe_alpha2 = Alpha2(measurement['probe_cc'])
    input_url = measurement['input']
    anomaly = measurement['anomaly']
    confirmed = measurement['confirmed']
    report_id = measurement['report_id']
    input_ip_alpha2 = url_to_alpha2(input_url) 
    tld_jurisdiction_alpha2 = get_tld_jurisdiction(input_url)
    measurement_start_time = pd.Timestamp(measurement['measurement_start_time'])
    return OONIWebConnectivityTest(
        blocking_type,
        probe_alpha2,
        input_url,
        anomaly,
        confirmed,
        report_id,
        input_ip_alpha2,
        tld_jurisdiction_alpha2,
        measurement_start_time
    )

ingest_api_measurement(measurements[0])

2021-05-25 19:59:29+00:00 - BR -> US, US (dns http://www.bajstock.com/)

In [105]:
from multiprocessing import Pool
from typing import List

def ingest_api_measurements (measurement: List[dict]) -> List[OONIWebConnectivityTest]:
    with Pool() as p:
        return p.map(ingest_api_measurement, measurements)

# Test it with the DB

In [33]:
from config import config

In [34]:
import psycopg2

# connect to the db
my_connection = psycopg2.connect(**config['postgres'])
my_cursor = my_connection.cursor()
logging.info('Connected to database.')

2021-05-25 13:02:13 congratulations root[4205] INFO Connected to database.


In [35]:
# dummy data
OONIWebConnectivityTest(
    'example',
    Alpha2('US'),
    'example',
    False,
    False,
    'example',
    Alpha2('US'),
    Alpha2('US'),
    pd.Timestamp('2000-01-01 21:41:37+00:00')
).create_table(my_cursor, my_connection)

ProgrammingError: relation "ooni_web_connectivity_test" already exists


In [36]:
def rollback():
    my_cursor.execute("ROLLBACK")
    my_connection.commit()

In [42]:
rollback()
# test = None
# for mt in my_types:
#     try:
#         mt.write_to_db(my_cursor, my_connection, commit=False) 
#     except Exception as e:
#         test = mt
#         print(mt)
#         print(e)
#         break
        

In [38]:
# my_connection.commit()

# Set up a flow

In [146]:
from datetime import datetime

def get_latest_reading_time (cur: cursor) -> datetime:
    '''Get time of most recent measurement'''
    cur.execute('SELECT measurement_start_time from ooni_web_connectivity_test ORDER BY measurement_start_time DESC')
    return cur.fetchone()[0]
    
most_recent_reading = get_latest_reading_time(my_cursor)
most_recent_reading

datetime.datetime(2021, 5, 25, 13, 58, 5, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-420, name=None))

In [193]:
import pytz

def query_measurements_after (time: datetime) -> list:
    '''Queries all measurements after time.'''
    def to_utc (t: datetime) -> datetime:
        return most_recent_reading.astimezone(pytz.utc)
    def fmt_dt (t: datetime):
        return t.strftime("%Y-%m-%dT%H:%M:%S")
    # format timezone-aware date into UTC fo querying
    utc_dt = to_utc(time)
    # format it into the query url
    dt_str = fmt_dt(utc_dt)
    query_str = base_query + f'&since={dt_str}'
    # issue the query
    return api_query(query_str)

ms  = query_measurements_after(most_recent_reading)

2021-05-25 14:07:44 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443
2021-05-25 14:07:45 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&since=2021-05-25T20:58:05 HTTP/1.1" 200 83224


In [194]:
# first is most recent reading - that looks good
ms[0]

{'anomaly': True,
 'confirmed': False,
 'failure': False,
 'input': 'https://boerse.to/',
 'measurement_start_time': '2021-05-25T21:07:40Z',
 'measurement_url': 'https://ams-pg.ooni.org/api/v1/raw_measurement?report_id=20210525T210332Z_webconnectivity_DE_6830_n1_BKA9by9RMEHHooXq&input=https%3A%2F%2Fboerse.to%2F',
 'probe_asn': 'AS6830',
 'probe_cc': 'DE',
 'report_id': '20210525T210332Z_webconnectivity_DE_6830_n1_BKA9by9RMEHHooXq',
 'scores': {'analysis': {'blocking_type': 'dns'},
  'blocking_country': 0.0,
  'blocking_general': 1.0,
  'blocking_global': 0.0,
  'blocking_isp': 0.0,
  'blocking_local': 0.0},
 'test_name': 'web_connectivity'}

In [188]:
rollback()
my_cursor.execute('SELECT * from ooni_web_connectivity_test order by measurement_start_time desc')
my_cursor.fetchone()

('dns',
 'IT',
 'http://livesport.sx/',
 True,
 False,
 '20210525T205634Z_webconnectivity_IT_30722_n1_k5cVnE4zr72obnGi',
 'DE',
 'SX',
 datetime.datetime(2021, 5, 25, 13, 58, 5, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-420, name=None)))

In [169]:
ingested = ingest_api_measurement(ms[0])
ingested.write_to_db(my_cursor, my_connection, commit=True)

In [118]:
def write_to_db (cur: cursor, conn: connection, connectivity_tests: List[OONIWebConnectivityTest]) -> None:
    for t in connectivity_tests:
        t.write_to_db(cur, conn, commit=False) 
    conn.commit()


In [None]:
write_to_db(my_cursor, my_connection, ooni_web_connectivity_tests)

# Tying it all together

In [120]:
def scrape (cur: cursor, conn: connection) -> None:
    t = get_latest_reading_time(cur)
    ms = query_measurements_after(t)
    ingested = ingest_api_measurements(ms)
    return write_to_db(cur, conn, ingested)

In [121]:
scrape(my_cursor, my_connection)

2021-05-25 13:56:50 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443
2021-05-25 13:56:51 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&since=2021-05-25T13:19:18 HTTP/1.1" 200 823646
2021-05-25 13:56:55 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443


next URL measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&since=2021-05-25T13%3A19%3A18&offset=1000


2021-05-25 13:56:56 congratulations urllib3.connectionpool[4205] DEBUG https://api.ooni.io:443 "GET /api/v1/measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&since=2021-05-25T13%3A19%3A18&offset=1000 HTTP/1.1" 200 828557
2021-05-25 13:57:00 congratulations urllib3.connectionpool[4205] DEBUG Starting new HTTPS connection (1): api.ooni.io:443


next URL measurements?test_name=web_connectivity&anomaly=true&order_by=test_start_time&limit=1000&since=2021-05-25T13%3A19%3A18&offset=2000


KeyboardInterrupt: 