In [None]:
pip install OTXv2

Collecting OTXv2
  Downloading OTXv2-1.5.12-py3-none-any.whl (16 kB)
Installing collected packages: OTXv2
Successfully installed OTXv2-1.5.12


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from OTXv2 import OTXv2, NotFound, IndicatorTypes

API_KEY = 'YOUR API KEY'                    #Paste api key here
SERVER = 'https://otx.alienvault.com/'

def get_response(ioc_type, ioc):
    try:
        otx = OTXv2(API_KEY, SERVER)
        response = otx.get_indicator_details_full(IndicatorTypes.IPv4, ioc)
        return response
    except NotFound:
        return None
    except Exception as e:
        print(e)
        return None

def extract_threat_info(json_response):
    if not json_response:
        status = "incomplete"
        return None
    status="complete"
    indicator_details = json_response.get('general', {})
    ip_address = indicator_details.get('indicator', '')
    reputation = indicator_details.get('reputation', 0)
    asn = indicator_details.get('asn', 0)
    location = json_response.get('geo', {}).get('country_name', '')

    # Extract pulse_info
    pulse_info = indicator_details.get('pulse_info', {})
    pulse_count = indicator_details.get('pulse_info', {}).get('count', 0)
    pulse_tags = set()
    pulse_adversary =set()
    pulse_names= set()
    pulse_description =set()
    pulse_created = set()
    pulse_industries =set()
    pulse_TLPs =set()
    pulse_targeted_countries = set()
    pulse_malware_families = []
    pulse_attack_ID=[]
    pulse_attack_name=[]

    for i in range(pulse_count):
        pulse_tags.update(indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('tags', []))
        pulse_adversary.update(indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('adversary', []))
        pulse_name = indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('name', [])
        pulse_names.add(pulse_name)
        description = indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('description', '')
        # Remove '\n' and '\t' from the description
        cleaned_description = description.replace('\n', '').replace('\t', '')

        pulse_description.add(cleaned_description)
        pulse_create = indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('created', [])
        pulse_created.add(pulse_create)
        pulse_TLP = indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('TLP', [])
        pulse_TLPs.add(pulse_TLP)
        pulse_industries.update(indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('industries', []))
        pulse_malware_families.extend(entry['display_name'] for entry in pulse_info.get('pulses', [])[i].get('malware_families', []))
        pulse_targeted_countries.update(indicator_details.get('pulse_info', {}).get('pulses', [])[i].get('targeted_countries', []))
        pulse_attack_ID.extend(entry['id'] for entry in pulse_info.get('pulses', [])[i].get('attack_ids', []))
        pulse_attack_name.extend(entry['name'] for entry in pulse_info.get('pulses', [])[i].get('attack_ids', []))


   # Use a tuple to represent each unique targeted country
    unique_targeted_countries = list(set(pulse_targeted_countries))
    unique_malware_families = list(set(pulse_malware_families))
    unique_tags = list(set(pulse_tags))
    unique_adversary = list(set(pulse_adversary))
    unique_industries = list(set(pulse_industries))
    unique_pulse_attack_id = list(set(pulse_attack_ID))
    unique_pulse_attack_name = list(set(pulse_attack_name))
    # Extract malware information
    malware_info = json_response.get('malware', {})
    malware_data = [
    tuple([
        malware.get('detections', {}).get('avast', ''),
        malware.get('detections', {}).get('avg', ''),
        malware.get('detections', {}).get('clamav', ''),
        malware.get('detections', {}).get('msdefender', '')
    ])
    for malware in malware_info.get('data', [])
    if any(malware.get('detections', {}).get(product) for product in ['avast', 'avg', 'clamav', 'msdefender'])]
    unique_malware = list(set(malware_data))

    # Assuming you have a JSON response stored in a variable called 'response'
    passive_dns_data = json_response.get('passive_dns', {}).get('passive_dns', [])

    # Extract hostnames from passive DNS data
    hostnames = [entry.get('hostname', '') for entry in passive_dns_data]
    unique_hostnames = list(set(hostnames))

    # Assuming you have a JSON response stored in a variable called 'response'
    url_list_data = json_response.get('url_list', {}).get('url_list', [])

    # Extract URLs from url_list data
    urls = [entry.get('url', '') for entry in url_list_data]
    unique_urls = list(set(urls))

    is_whitelisted = any(validation.get('name') == 'Whitelisted IP' for validation in indicator_details.get('validation', []))

    return {
        "IP Address": ip_address,
        "ASN": asn,
        "Location": location,
        "Tags": unique_tags,
        "Pulse Name": pulse_names,
        "Pulse Description": pulse_description,
        "Pulse Created": pulse_created,
        "Pulse TLP": pulse_TLPs,
        "Pulse Adversary": unique_adversary,
        "Pulse Targeted Countries": unique_targeted_countries,
        "Pulse Industries": unique_industries,
        "Pulse Malware Families": unique_malware_families,
        "Pulse Attack ID": unique_pulse_attack_id,
        "Pulse Attack Name": unique_pulse_attack_name,
        "Malware": ', '.join([', '.join(filter(None, malware)) for malware in unique_malware]),
        "Passive DNS_Hostnames": unique_hostnames,
        "Associated URLS": unique_urls,
        "Reputation": reputation,
        "Whitelisted": is_whitelisted,
        "Status": status
    }


def search_and_extract_info(ip_address):
    ioc_type = "IPv4"
    response = get_response(ioc_type, ip_address)

    if response:
        threat_info = extract_threat_info(response)
        return threat_info



## Read IP address from List

In [None]:
import csv
# List of IP addresses to check(mentioned in CIC-Datknet daatset)
ip_addresses_to_check = ['195.154.107.23', '195.154.82.180', '213.239.216.222',
       '37.97.149.8', '10.0.2.15', '82.161.239.177', '8.0.6.4',
       '217.23.3.253', '173.194.204.188', '131.202.240.150',
       '131.202.243.255', '195.154.126.78', '198.52.200.39',
       '148.251.190.229', '5.9.123.81', '173.194.123.1', '216.58.219.195',
       '74.125.29.95', '216.58.219.206', '74.125.226.174',
       '216.58.219.237', '173.194.208.189', '173.194.123.99',
       '216.58.219.227', '74.125.226.184', '173.194.123.48',
       '74.125.226.166', '173.194.123.118', '173.194.123.94',
       '173.194.123.100', '173.194.123.117', '173.194.123.96',
       '131.202.244.3', '131.202.244.5', '131.202.6.3', '74.125.226.160']

# Create a list to store the results
results = []

# Iterate through each IP address and get the threat info
for ip_address in ip_addresses_to_check:
    threat_info = search_and_extract_info(ip_address)
    if threat_info:
        results.append(threat_info)

# Define the CSV file path
csv_file_path = "/content/drive/MyDrive/xai/Darknet/AV/DST_Tor.csv"

# Write the results to a CSV file
with open(csv_file_path, mode='w', newline='') as csv_file:
    fieldnames = ["IP Address", "ASN", "Location", "Tags", "Pulse Name","Pulse Description", "Pulse Created", "Pulse TLP", "Pulse Adversary","Pulse Targeted Countries", "Pulse Industries", "Pulse Malware Families",
    "Pulse Attack ID", "Pulse Attack Name", "Malware","Passive DNS_Hostnames", "Associated URLS", "Reputation","Whitelisted", "Status"]
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)

    # Write header
    writer.writeheader()

    # Write data
    for result in results:
        writer.writerow(result)

print(f"Results saved to: {csv_file_path}")


{'detail': 'IP is private.'}
Results saved to: /content/drive/MyDrive/xai/Darknet/AV/DST_Tor.csv


## Read IP address from csv

In [None]:
import pandas as pd

# Field names for the dataset
fields_name = ["Src IP", "ASN", "Location", "Tags", "Pulse Name","Pulse Description", "Pulse Created", "Pulse TLP", "Pulse Adversary","Pulse Targeted Countries", "Pulse Industries", "Pulse Malware Families",
    "Pulse Attack ID", "Pulse Attack Name", "Malware","Passive DNS_Hostnames", "Associated URLS", "Reputation","Whitelisted", "Status"]

# Read the CSV file into a pandas DataFrame
df = pd.read_csv('/content/drive/MyDrive/xai/Darknet/AV/DST_Non_Tor.csv', names=fields_name, engine="python")

# Define the index range you want to update
start_index = 6000
end_index = 6500  # Adjust the end index as needed

# Iterate through the specified range of rows in the DataFrame
for index, row in df.iloc[start_index:end_index].iterrows():
    if row['Status'] not in ['complete']:
        ip_address = row['Src IP']
        threat_info = search_and_extract_info(ip_address)

        if threat_info:
            # Update the row with the new threat information
            df.loc[index, 'ASN'] = threat_info.get("ASN", "")
            df.loc[index, 'Location'] = threat_info.get("Location", "")
            df.loc[index, 'Tags'] = ', '.join(threat_info.get("Tags", []))
            df.loc[index, 'Pulse Name'] = ', '.join(map(str, threat_info.get("Pulse Name", [])))
            df.loc[index, 'Pulse Description'] = ', '.join(map(str, threat_info.get("Pulse Description", [])))
            df.loc[index, 'Pulse Created'] = ', '.join(map(str, threat_info.get("Pulse Created", [])))
            df.loc[index, 'Pulse TLP'] = ', '.join(map(str, threat_info.get("Pulse TLP", [])))
            df.loc[index, 'Pulse Adversary'] = ', '.join(map(str, threat_info.get("Pulse Adversary", [])))
            df.loc[index, 'Pulse Targeted Countries'] = ', '.join(threat_info.get("Pulse Targeted Countries", []))
            df.loc[index, 'Pulse Industries'] = ', '.join(threat_info.get("Pulse Industries", []))
            df.loc[index, 'Pulse Malware Families'] = ', '.join(threat_info.get("Pulse Malware Families", ""))
            df.loc[index, 'Pulse Attack ID'] = ', '.join(map(str, threat_info.get("Pulse Attack ID", [])))
            df.loc[index, 'Pulse Attack Name'] = ', '.join(map(str, threat_info.get("Pulse Attack Name", [])))
            df.loc[index, 'Malware'] = threat_info.get("Malware", "")
            df.loc[index, 'Passive DNS_Hostnames'] = ', '.join(threat_info.get("Passive DNS_Hostnames", []))
            df.loc[index, 'Associated URLS'] = ', '.join(threat_info.get("Associated URLS", ""))
            df.loc[index, 'Reputation'] = threat_info.get("Reputation", "")
            df.loc[index, 'Whitelisted'] = threat_info.get("Whitelisted", "")
            df.loc[index, 'Status'] = threat_info.get("Status", "")


# Save the updated DataFrame to the same CSV file
df.to_csv('/content/drive/MyDrive/xai/Darknet/AV/DST_Non_Tor.csv', index=False)

