In [2]:
import requests
import pandas as pd
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed
from ipywidgets import widgets, VBox, HBox, Output
from IPython.display import display
import time
import datetime
from tqdm.notebook import tqdm
from IPython.display import HTML
import base64

############
# API KEYS #
############
api_key_abuseipdb = "<YOUR-API-KEY-HERE>"
api_key_ipgeo = "<YOUR-API-KEY-HERE>"
api_key_ipinfo = "<YOUR-API-KEY-HERE>"

###################
# Global variable #
###################
# To store the most recently displayed DataFrame
exported_df = None

#############
# FUNCTIONS #
#############

#### AbuseIPDB Functions ####
def fetch_abuseipdb_data(ip, api_key):
    """Fetch abuse data for a single IP from AbuseIPDB."""
    try:
        response = requests.get(
            f"https://api.abuseipdb.com/api/v2/check?ipAddress={ip}",
            headers={'Accept': 'application/json', 'Key': api_key},
            timeout=10
        )
        response.raise_for_status()
        data = response.json()
        return {
            "IP": ip,
            "Abuse Score": data.get("data", {}).get("abuseConfidenceScore", "N/A"),
            "ISP": data.get("data", {}).get("isp", "N/A"),
            "Reports": data.get("data", {}).get("totalReports", "N/A"),
            "Last Reported": data.get("data", {}).get("lastReportedAt", "N/A"),
        }
    except Exception as e:
        return {"IP Address": ip, "Error": str(e)}

def check_abuseipdb_bulk(api_key, ip_list):
    """Check multiple IPs using AbuseIPDB."""
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_abuseipdb_data, ip, api_key): ip for ip in ip_list}
        
        with tqdm(total=len(ip_list), desc="    Checking AbuseIPDB") as pbar:
            for future in as_completed(futures):
                results.append(future.result())
                pbar.update(1)
    return pd.DataFrame(results)

#### IPGeolocation.io Functions ####
def resolve_hostname_to_ip(hostname):
    """Resolve a hostname to an IP address."""
    try:
        return socket.gethostbyname(hostname)
    except Exception as e:
        return None

def fetch_ipgeo_data(ip, api_key):
    """Fetch data for a single IP from IPGeolocation.io."""
    url = f"https://api.ipgeolocation.io/v2/ipgeo?apiKey={api_key}&ip={ip}"
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        data = response.json()
        return {
            "IP": ip,
            "Continent": data.get("location", {}).get("continent_name", "N/A"),
            "Country": data.get("location", {}).get("country_name", "N/A"),
            "State": data.get("location", {}).get("state_prov", "N/A"),
            "District": data.get("location", {}).get("district", "N/A"),
            "City": data.get("location", {}).get("city", "N/A"),
            "Zip code": data.get("location", {}).get("zipcode", "N/A"),
            "Latitude": data.get("location", {}).get("latitude", "N/A"),
            "Longitude": data.get("location", {}).get("longitude", "N/A"),
        }
    except Exception as e:
        return {"IP Address": ip, "Error": str(e)}

def check_ipgeo_bulk(api_key, ip_list):
    """Check multiple IPs using IPGeolocation.io."""
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_ipgeo_data, ip, api_key): ip for ip in ip_list}
        
        with tqdm(total=len(ip_list), desc="    Checking IPGeolocation") as pbar:
            for future in as_completed(futures):
                results.append(future.result())
                pbar.update(1)
    return pd.DataFrame(results)

#### IPInfo Functions ####
def fetch_ipinfo_data(ip, api_key):
    """Fetch data for a single IP from IPInfo.io."""
    url = f"https://api.ipinfo.io/lite/{ip}?token={api_key}"
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        data = response.json()
        return {
            "IP": ip,
            "ASN": data.get("asn", "N/A"),
            "Name": data.get("as_name", "N/A"),
            "Domain": data.get("as_domain", "N/A"),
        }
    except Exception as e:
        return {"IP Address": ip, "Error": str(e)}

def check_ipinfo_bulk(api_key, ip_list):
    """Check multiple IPs using IPInfo.io."""
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_ipinfo_data, ip, api_key): ip for ip in ip_list}
        
        with tqdm(total=len(ip_list), desc="    Checking IPInfo") as pbar:
            for future in as_completed(futures):
                results.append(future.result())
                pbar.update(1)
    return pd.DataFrame(results)

#### Arrin Functions ####
def fetch_arin_data(ip):
    """
    Fetch ARIN RDAP data for a single IP address.
    Args:
        ip (str): The IP address to query.
    Returns:
        dict: Extracted fields from the ARIN RDAP data.
    """
    # Query the ARIN RDAP API
    rdap_url = f"https://rdap.arin.net/registry/ip/{ip}"
    try:
        response = requests.get(rdap_url, timeout=5)
        response.raise_for_status()  # Raise HTTP errors, if any
        rdap_data = response.json()  # Parse the JSON response
    except requests.exceptions.RequestException as e:
        # Return the error message for debugging purposes
        return {"IP": ip, "Error": f"Failed to query ARIN RDAP: {e}"}

    # Extract relevant fields from the RDAP response
    extracted_info = {
        "IP": ip,
        "Source Registry": rdap_data.get("handle", "Not Provided"),
        "Net Range": f"{rdap_data.get('startAddress', 'Not Provided')} - {rdap_data.get('endAddress', 'Not Provided')}",
        "CIDR": (
            rdap_data["cidr0_cidrs"][0].get("v4prefix", "Not Provided")
            if "cidr0_cidrs" in rdap_data and isinstance(rdap_data["cidr0_cidrs"], list)
            else "Not Provided"
        ),
        "Net Name": rdap_data.get("name", "Not Provided"),
        "Net Type": rdap_data.get("type", "Not Provided"),
        "Registration Date": rdap_data.get("registrationDate", "Not Provided"),
        "Last Changed": rdap_data.get("lastChanged", "Not Provided"),
    }

    # Handle related entities
    related_entities = []
    for entity in rdap_data.get("entities", []):
        entity_info = {
            "Full Name": "Not Provided",
            "Roles": ", ".join(entity.get("roles", [])),
        }
        vcard_array = entity.get("vcardArray", [])
        if len(vcard_array) > 1 and isinstance(vcard_array[1], list):
            for field in vcard_array[1]:
                if field[0] == "fn":  # Full Name
                    entity_info["Full Name"] = field[3] if len(field) > 3 else "Not Provided"
                elif field[0] == "adr":  # Address
                    entity_info["Address"] = ", ".join(field[3]) if len(field) > 3 else "Not Provided"
                elif field[0] == "email":  # Email
                    entity_info["Emails"] = field[3]
                elif field[0] == "tel":  # Telephone
                    entity_info["Telephones"] = field[3]
        related_entities.append(entity_info)

    extracted_info["Related Entities"] = related_entities

    return extracted_info

def check_arin_bulk(ip_list):
    """
    Check multiple IPs using ARIN RDAP.
    Args:
        ip_list (list): A list of IP addresses to query.
    Returns:
        pandas.DataFrame: Results from ARIN RDAP queries.
    """
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(fetch_arin_data, ip): ip for ip in ip_list}
      
        with tqdm(total=len(ip_list), desc="    Checking ARIN RDAP") as pbar:
            for future in as_completed(futures):
                results.append(future.result())
                pbar.update(1)
    return pd.DataFrame(results)
    
###################################
# CLICK EVENT HANDLERS FOR BUTTONS #
###################################
def on_click_export(b):
    global exported_df  # Reference the global DataFrame
    with output:
        output.clear_output()  # Clear the output area
        if exported_df is not None:  # Check if data exists
            try:
                # Convert DataFrame to CSV and encode it in base64
                csv_data = exported_df.to_csv(index=False)
                b64_csv = base64.b64encode(csv_data.encode()).decode()

                # Generate a downloadable link
                filename = f"exported_table_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
                payload = f"data:text/csv;base64,{b64_csv}"
                html = f"""
                    <a download="{filename}" href="{payload}" target="_blank" style="font-size: 16px; color: green;">
                        ✅ File '{filename}' is ready. Click here to download.
                    </a>
                """
                # Automatically trigger download with a JavaScript auto-click
                display(HTML(html))
            except Exception as e:
                print(f"⚠️ An error occurred while exporting: {e}")
        else:
            print("⚠️ No table available to export. Please generate a table first.")
            
def on_click_abuseipdb(b):
    global exported_df
    with output:
        output.clear_output()
        ip_list = [ip.strip() for ip in ip_input.value.split(',') if ip.strip()]
        if not ip_list:
            print("⚠️ Please enter at least one IP.")
            return
            
        print(f"🔍 AbuseIPDb | Started check of {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
        
        df = check_abuseipdb_bulk(api_key_abuseipdb, ip_list)

        # Timing and performance metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0    
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")

        # Save the merged DataFrame to the global variable
        exported_df = df
        
        styled_df = df.style.set_table_styles(
            [{
                'selector': 'th',  # Apply to header cells
                'props': [('text-align', 'left')]
            }, {
                'selector': 'td',  # Apply to data cells
                'props': [('text-align', 'left')]
            }])
            
        # Display the styled DataFrame
        display(styled_df)

def on_click_ipgeo(b):
    global exported_df
    with output:
        output.clear_output()
        ip_list = [ip.strip() for ip in ip_input.value.split(',') if ip.strip()]
        if not ip_list:
            print("⚠️ Please enter at least one IP.")
            return
            
        print(f"🔍 IPGeo | Started check of {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
        
        df = check_ipgeo_bulk(api_key_ipgeo, ip_list)

        # Timing and performance metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0    
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")

        # Save the merged DataFrame to the global variable
        exported_df = df
        
        styled_df = df.style.set_table_styles(
            [{
                'selector': 'th',  # Apply to header cells
                'props': [('text-align', 'left')]
            }, {
                'selector': 'td',  # Apply to data cells
                'props': [('text-align', 'left')]
            }])
            
        # Display the styled DataFrame
        display(styled_df)

def on_click_ipinfo(b):
    global exported_df
    with output:
        output.clear_output()
        ip_list = [ip.strip() for ip in ip_input.value.split(',') if ip.strip()]
        if not ip_list:
            print("⚠️ Please enter at least one IP.")
            return
        
        print(f"🔍 IPInfo | Started check of {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
        
        df = check_ipinfo_bulk(api_key_ipinfo, ip_list)

        # Timing and performance metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0    
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")

        # Save the merged DataFrame to the global variable
        exported_df = df
        
        styled_df = df.style.set_table_styles(
            [{
                'selector': 'th',  # Apply to header cells
                'props': [('text-align', 'left')]
            }, {
                'selector': 'td',  # Apply to data cells
                'props': [('text-align', 'left')]
            }])
            
        # Display the styled DataFrame
        display(styled_df)

def on_click_arin(b):
    global exported_df
    with output:
        output.clear_output()  # Clear the previous output
        ip_list = [ip.strip() for ip in ip_input.value.split(',') if ip.strip()]
        if not ip_list:
            print("⚠️ Please enter at least one IP.")
            return
        
        print(f"🔍 ARIN | Started checking {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
        
        # Perform bulk ARIN lookup
        df = check_arin_bulk(ip_list)

        # Timing metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")

        # Save results to the global variable
        exported_df = df
        
        # Display the results
        styled_df = df.style.set_table_styles(
            [{
                'selector': 'th',  # Apply styling to headers
                'props': [('text-align', 'left')]
            }, {
                'selector': 'td',  # Apply styling to table cells
                'props': [('text-align', 'left')]
            }])

        # Display the styled DataFrame
        display(styled_df)
    
def on_click_all(b):
    global exported_df
    with output:
        output.clear_output()
        ip_list = [ip.strip() for ip in ip_input.value.split(',') if ip.strip()]
        if not ip_list:
            print("⚠️ Please enter at least one IP.")
            return

        # IPAbuseDb
        print(f"\n🔍 IPAbuseDb | Started check of {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
            
        abuse_df = check_abuseipdb_bulk(api_key_abuseipdb, ip_list)

        # Timing and performance metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0    
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")
        
        # Debug
        #display(abuse_df)

        # IPGeo
        print(f"\n🔍 IPGeo | Started check of {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
        
        ipgeo_df = check_ipgeo_bulk(api_key_ipgeo, ip_list)

        # Timing and performance metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0    
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")
        
        # Debug
        #display(ipgeo_df)

        # IPInfo
        print(f"\n🔍 IPInfo | Started check of {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
        
        ipinfo_df = check_ipinfo_bulk(api_key_ipinfo, ip_list)

        # Timing and performance metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0    
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")
            
        # Debug
        #display(ipinfo_df)

        # Arin
        print(f"\n🔍 ARIN RDAP | Started checking {len(ip_list)} IPs at {time.strftime('%b %d %H:%M:%S', time.localtime())}")
        start_time = time.time()
        
        # Perform bulk ARIN lookup
        arin_df = check_arin_bulk(ip_list)

        # Timing metrics
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_minutes, elapsed_seconds = divmod(elapsed_time, 60)
        avg_time_per_ip = round(elapsed_time / len(ip_list), 1) if len(ip_list) > 0 else 0
        print(f"    ✅ Completed check at {time.strftime('%b %d %H:%M:%S', time.localtime(end_time))}")
        print(f"    ⏱ Time elapsed: {int(elapsed_minutes)} minutes and {elapsed_seconds:.1f} seconds")
        print(f"    ⏱ Average time per IP checked: {avg_time_per_ip} seconds")

        # Debug
        #display(arrin_df)
        
        print("\n🔗 Merging results into a single table...")
        merged_df = pd.merge(abuse_df, ipgeo_df, on='IP', how='outer')
        merged_df = pd.merge(merged_df, ipinfo_df, on='IP', how='outer')
        merged_df = pd.merge(merged_df, arin_df, on='IP', how='outer')

        # Save the merged DataFrame to the global variable
        exported_df = merged_df
        
        # Add a MultiIndex header to designate column sources
        merged_df.columns = pd.MultiIndex.from_tuples(
            [
                ("IPAbuseDB", col) if col in abuse_df.columns else
                ("IPGeo", col) if col in ipgeo_df.columns else
                ("IPInfo", col) if col in ipinfo_df.columns else
                ("ARIN", col) if col in arin_df.columns else
                ("Other", col)  # Fallback for unexpected columns
                for col in merged_df.columns
            ]
        )
        
        def header_style():
            return [
                {
                    'selector': 'thead th.level0',  # Top-level header row (MultiIndex first level)
                    'props': [('background-color', '#f2f2f2'),  # Light gray
                              ('color', 'black'),                 # Black text
                              ('text-align', 'center'),           # Center-align
                              ('font-weight', 'bold')             # Bold font
                    ]
                },
                {
                    'selector': 'thead th.level1',  # Second-level header row (column names)
                    'props': [('background-color', '#e3e3e3'),  # Lighter gray
                              ('color', 'black'),               # Black text
                              ('text-align', 'center')]         # Center-align
                }
            ]

        # Define a styler function to style the cell backgrounds
        def style_table(data):
            # Highlight cells based on their source (first-level MultiIndex)
            styles = pd.DataFrame('', index=data.index, columns=data.columns)

            for source in data.columns.get_level_values(0).unique():  # Level 0 is the source
                if source == "IPAbuseDB":
                    styles.loc[:, source] = 'background-color: #ffcccc; text-align: center;'  # Light red
                elif source == "IPGeo":
                    styles.loc[:, source] = 'background-color: #ccffcc; text-align: center;'  # Light green
                elif source == "IPInfo":
                    styles.loc[:, source] = 'background-color: #ccccff; text-align: center;'  # Light blue
                elif source == "ARIN":
                    styles.loc[:, source] = 'background-color: #ffffcc; text-align: center;'  # Light yellow
                else:
                    styles.loc[:, source] = 'background-color: #f0f0f0; text-align: center;'  # Light gray for others
            return styles

        # Apply styles to the DataFrame
        styled_df = merged_df.style.set_table_styles(header_style()).apply(
            style_table, axis=None)
        
        display(styled_df)

def on_click_clear(b):
    output.clear_output()

######################
# GUI IMPLEMENTATION #
######################
# Widgets
ip_input = widgets.Text(
    placeholder='Enter IP(s), separated by commas',
    description='IP(s):',
    layout=widgets.Layout(width='600px')
)
output = Output()

# Buttons for services
run_button_abuseipdb = widgets.Button(description='Check AbuseIPDB', button_style='success')
run_button_ipgeo = widgets.Button(description='Check IPGeo', button_style='success')
run_button_ipinfo = widgets.Button(description='Check IPInfo', button_style='success')
run_button_arin = widgets.Button(description='Check ARIN', button_style='success')
run_button_all = widgets.Button(description='Check All', button_style='primary')
clear_button = widgets.Button(description='Clear Output', button_style='danger')
export_button = widgets.Button(description='Export to CSV', button_style='info')

# Button event bindings
run_button_abuseipdb.on_click(on_click_abuseipdb)
run_button_ipgeo.on_click(on_click_ipgeo)
run_button_ipinfo.on_click(on_click_ipinfo)
run_button_arin.on_click(on_click_arin)
run_button_all.on_click(on_click_all)
clear_button.on_click(on_click_clear)
export_button.on_click(on_click_export)

##############
# APP LAYOUT #
##############
app_layout = VBox([
    widgets.HTML("<h2>IP Lookup Tool</h2>"),
    ip_input,
    HBox([run_button_abuseipdb, run_button_ipgeo, run_button_ipinfo, run_button_arin, run_button_all, clear_button, export_button]),
    output
])

# Display the App
display(app_layout)

VBox(children=(HTML(value='<h2>IP Lookup Tool</h2>'), Text(value='', description='IP(s):', layout=Layout(width…