# Data preparation for experiment

In [1]:
# external imports
import pandas as pd
import ipinfo

In [2]:
# internal imports
from src.utils.common_functions import (
    json_file_to_dict,
    dict_to_json_file,
    check_ip,
)
from src.utils.constants import (
    KEYS_FILEPATH,
    TRAFFIC_LOGS_FILEPATH,
    TRAFFIC_LOGS_IP_CLASSIFIED_FILEPATH,
    ANYCAST_IP_CLASSIFICATION_FILEPATH,
    ANYCAST_PII_TRAFFIC_LOGS_FILEPATH
)

Extract the data and filter it to get only the necessary things for experiment

In [3]:
# Extract raw data
traffic_logs_df = pd.read_csv(TRAFFIC_LOGS_FILEPATH)

Change the structure of data to be handled easier in the experiment

In [4]:
traffic_logs_df["ip_anycast"] = False

Make the necessary requests to work only with anycast ips

In [5]:
def classify_ip_directions(ip_to_check_list: list, file_to_save: str, use_cache: bool):
    if use_cache:
        try:
            ip_results_dict = json_file_to_dict(file_to_save)
            ip_already_check_set = set(ip_results_dict.keys())
        except:
            ip_results_dict = {}
            ip_already_check_set = set()
    else:
        ip_results_dict = {}
        ip_already_check_set = set()
    
    ips_checked = 0
    for ip_to_check in ip_to_check_list:
        if ips_checked % 100 == 0:
            print(f"Already check {ips_checked} number of IPs")
        
        ip_to_check = str(ip_to_check)
        #print(ip_to_check)
        if ip_to_check in ip_already_check_set:
            print("IP already checked")
            ips_checked += 1
            continue
        elif not check_ip(ip_to_check):
            print("IP not valid")
            ips_checked += 1
            continue

        access_token = json_file_to_dict(KEYS_FILEPATH)["ipinfo_token"]
        handler = ipinfo.getHandler(access_token)
        details = handler.getDetails(ip_to_check)
        #print(json.dumps(details.all, indent=4))
        
        try:
            ip_results_dict[ip_to_check] = details.anycast
        except:
            ip_results_dict[ip_to_check] = False

        dict_to_json_file(
            dict=ip_results_dict,
            file_path=file_to_save
        )
        
        ips_checked += 1

In [6]:
ips_list = traffic_logs_df["ip_dest"].unique().tolist()
print(f"Number of IPs to check anycast: {len(ips_list)}")

Number of IPs to check anycast: 5647


In [7]:
classify_ip_directions(
    ip_to_check_list=ips_list,
    file_to_save=ANYCAST_IP_CLASSIFICATION_FILEPATH,
    use_cache=True,
)

Already check 0 number of IPs
IP not valid
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP already checked
IP alre

Include anycast classification in the dataset

In [8]:
ips_classified = json_file_to_dict(ANYCAST_IP_CLASSIFICATION_FILEPATH)

for ip_classified in ips_classified.keys():
    traffic_logs_df.loc[traffic_logs_df["ip_dest"] == ip_classified, "ip_anycast"] = ips_classified[ip_classified]

The experiment is only going to be carried using anycast IP that treat personal data, so we filter the unicast and the ones that do not carry personal data

In [9]:
# SAve the traffic log with the classification
traffic_logs_df.to_csv(TRAFFIC_LOGS_IP_CLASSIFIED_FILEPATH, sep=",", index=False)

# Create new traffic logs dataset filtering the non anycast and with non PII
anycast_pii_traffic_logs_df = traffic_logs_df.loc[
    (traffic_logs_df["ip_anycast"] == True) &
    (traffic_logs_df["PII"] != "No-PII") &
    (traffic_logs_df["PII"].notna())
]
anycast_pii_traffic_logs_df.to_csv(ANYCAST_PII_TRAFFIC_LOGS_FILEPATH, sep=",", index=False)

ips_anycast_to_hunt = anycast_pii_traffic_logs_df['ip_dest'].unique().tolist()
print(f"Number of IPs anycast to Hunt: {len(ips_anycast_to_hunt)}")

Number of IPs anycast to Hunt: 200
