In [3]:
from datetime import datetime
from ripe.atlas.cousteau import AtlasRequest, AtlasResultsRequest
from ripe.atlas.sagan import Result, DnsResult

from ipwhois import IPWhois
STOPPED = 4


def get_measurements_collection_by_date_and_type(date, measurement_type):
    timestamp = datetime.strptime(date, "%Y-%m-%d").timestamp()
    filters = {"type": measurement_type,
               "status": STOPPED,
               "start_time__gte": timestamp,
               "start_time__lt": timestamp+24*60*60,
               "fields": "id,start_time,stop_time,af,query_argument,query_type,result"
               }
    url_path = "/api/v2/measurements/"
    is_success, response = AtlasRequest(
        **{"url_path": url_path}).get(**filters)
    if is_success:
        return response["results"]
    else:
        return None


def get_measurement_results(measurement_id):
    url_path = f"/api/v2/measurements/{measurement_id}/results/"
    is_success, response_results = AtlasRequest(
        **{"url_path": url_path}).get()
    if is_success:
        return response_results
    else:
        return None


def get_probes_geolocation_list_by_id(probe_id_list):
    url_path = "/api/v2/probes/"
    result = []
    for sub_list in [probe_id_list[i:i+100] for i in range(0, len(probe_id_list), 100)]:
        is_success, response_results = AtlasRequest(
            **{"url_path": url_path}).get(**{"id__in": ",".join(sub_list)})
        if is_success:
            result.extend(response_results["results"])
        else:
            continue
    return result


def map_measurement_result_to_probe_info(parsed_result):
    return {
        "probe_id": parsed_result.probe_id,
        "rtt_results": [parsed_result.responses[0].response_time],
    }


def create_measurement_hash(measurement_id):

    def find_object_by_id(collection, id):
        for obj in collection:
            if obj["id"] == id:
                return obj  # Return the object if found

    meausrement_results = get_measurement_results(measurement_id)
    probes_information = [
        map_measurement_result_to_probe_info(DnsResult(result))
        for result in meausrement_results if not DnsResult(result).is_error]
    probes_geolocation = get_probes_geolocation_list_by_id(
        [str(probe_obj["probe_id"]) for probe_obj in probes_information])
    for probe_info in probes_information:
        according_probe_geolocation = find_object_by_id(
            probes_geolocation, probe_info["probe_id"])
        probe_info["geolocation"] = according_probe_geolocation
    return probes_information

def merge_measurement_results(measurement_hash,previous_results):
    if previous_results==[]:
        return measurement_hash
    else:
        for probe_info in measurement_hash:
            for previous_probe_info in previous_results:
                if probe_info["probe_id"]==previous_probe_info["probe_id"]:
                    previous_probe_info["rtt_results"].extend(probe_info["rtt_results"])
                    break
            else:
                previous_results.append(probe_info)
        return previous_results

def check_dns_measurements(date):
    measurements = get_measurements_collection_by_date_and_type(date, "dns")
    results=[]
    for measurement in measurements:
        # print('Measurement ID:', measurement.get("id"))
        # print('Start Time:', datetime.fromtimestamp(
        #     measurement.get("start_time")))
        # print('Stop Time:', datetime.fromtimestamp(
        #     measurement.get("stop_time")))
        # print(
        #     f"Query: {measurement.get('af')} {measurement.get('query_argument')} {measurement.get('query_type')}")
        # print('Results Link:', measurement.get("result"))
        # print('----')
        measurement_hash = create_measurement_hash(measurement.get("id"))
        results=merge_measurement_results(measurement_hash,results)
    return results


res=check_dns_measurements("2022-05-01")
print()



In [2]:
res

[{'probe_id': 1003227,
  'rtt_results': [49.431, 44.861, 73.56],
  'geolocation': {'address_v4': '189.240.197.50',
   'address_v6': None,
   'asn_v4': 8151,
   'asn_v6': None,
   'country_code': 'MX',
   'description': 'UADY-01',
   'first_connected': 1637085785,
   'geometry': {'type': 'Point', 'coordinates': [-89.6415, 20.9705]},
   'id': 1003227,
   'is_anchor': False,
   'is_public': True,
   'last_connected': 1686384518,
   'prefix_v4': '189.240.192.0/19',
   'prefix_v6': None,
   'status': {'id': 1, 'name': 'Connected', 'since': '2023-06-01T16:39:43Z'},
   'status_since': 1685637583,
   'tags': [{'name': 'Academic', 'slug': 'academic'},
    {'name': 'system: Resolves A Correctly',
     'slug': 'system-resolves-a-correctly'},
    {'name': 'system: Resolves AAAA Correctly',
     'slug': 'system-resolves-aaaa-correctly'},
    {'name': 'system: IPv4 Works', 'slug': 'system-ipv4-works'},
    {'name': 'Native IPv4', 'slug': 'native-ipv4'},
    {'name': 'system: IPv4 Capable', 'slug': '

In [None]:

from pprint import pprint
from ipwhois.experimental import bulk_lookup_rdap,get_bulk_asn_whois
ip_list = ['74.125.225.229', '2001:4860:4860::8888', '62.239.237.1', '2a00:2381:ffff::1', '210.107.73.73',
        #    '2001:240:10c:1::ca20:9d1d', '200.57.141.161', '2801:10:c000::', '196.11.240.215', '2001:43f8:7b0::', '133.1.2.5', '115.1.2.3'
           ]
# obj = IPWhois('["8.8.4.4", "1.1.1.1", "2c0f:fb50:4003::"]')
results= get_bulk_asn_whois(addresses=ip_list)
# results, stats = bulk_lookup_rdap(addresses=ip_list)
pprint(results)