# Thread Hunting with Data Science
## Security Data Science
### Laboratorio 5
#### José Daniel Gómez Cabrera 21429

### Parte 1 – Filtrado y preprocesamiento

Para este ejercicio se utilizará el archivo large_eve.json que se encuentra en Canvas, en el módulo de la semana. Este archivo contiene miles de paquetes capturados mediante el IDS Suricata. Pasos:

In [65]:
import json
import pandas as pd
from tldextract import tldextract
import random
import re
import google.generativeai as genai

1. Cargue la información del archivo large_even.json en una lista, muestre la cantidad de
registros total (deben ser 746, 909). Este es nuestro tráfico inicial!

In [66]:
def load_json_file(filename):
    with open(filename, 'r') as file:
        data = [json.loads(line) for line in file]
    return data

network_traffic = load_json_file('large_eve.json')
print(f"Registros totales: {len(network_traffic)}")

Registros totales: 746909


2. Debido a que estamos buscando dominios web, del total de registros, solamente estamos interesados en los registros DNS. Cargue únicamente aquellos registros que sean DNS.

In [67]:
dns_records = [record for record in network_traffic if record.get('event_type') == 'dns']

3. Muestre la nueva cantidad de registros filtrados. Deben ser 21484. Esta es una cantidad mucho más manejable, pero aún se debe seguir depurando la información a buscar.

In [68]:
print(f"Registros DNS: {len(dns_records)}")

Registros DNS: 15749


4. Muestre la información de 2 registros cualesquiera.

In [69]:
print("2 registros aleatorios:")

for record in random.sample(dns_records, 2):
    print(json.dumps(record, indent=2))

2 registros aleatorios:
{
  "timestamp": "2017-07-22T18:42:42.536470-0500",
  "flow_id": 715387015670351,
  "pcap_cnt": 1781357,
  "event_type": "dns",
  "vlan": 110,
  "src_ip": "192.168.207.4",
  "src_port": 53,
  "dest_ip": "192.168.201.57",
  "dest_port": 53550,
  "proto": "UDP",
  "dns": {
    "type": "answer",
    "id": 29675,
    "rcode": "NXDOMAIN",
    "rrname": "<root>",
    "rrtype": "SOA",
    "ttl": 20864
  }
}
{
  "timestamp": "2017-07-22T19:29:19.667525-0500",
  "flow_id": 937192195043205,
  "pcap_cnt": 3356167,
  "event_type": "dns",
  "vlan": 150,
  "src_ip": "192.168.205.188",
  "src_port": 51291,
  "dest_ip": "192.168.207.4",
  "dest_port": 53,
  "proto": "UDP",
  "dns": {
    "type": "query",
    "id": 24971,
    "rrname": "mirror.san.fastserv.com",
    "rrtype": "AAAA",
    "tx_id": 0
  }
}


5. Debido a que la data consiste en estructuras JSON anidadas, utilice la característica json_normalize para normalizar la información y asignarla en un dataframe. Muestre el shape del dataframe, debería obtener (21484, 163).

In [70]:
dns_df = pd.json_normalize(dns_records)
print("Forma del dataframe:", dns_df.shape)

Forma del dataframe: (15749, 18)


6. Como estamos buscando dominios DGA, debemos filtrar los registros DNS para aquellos registros tipo A (son aquellos que mantienen una dirección IP asociada a un dominio). Después de filtrar debería obtener 2849 registros.

In [71]:
a_records_df = dns_df[dns_df['dns.type'] == 'query']
a_records_df = a_records_df[a_records_df['dns.rrtype'] == 'A']
print(f"Registros tipo A: {len(a_records_df)}")

Registros tipo A: 2836


7. Filtre los dominios únicos. Debe obtener 177 registros únicos.

In [72]:
unique_domains = a_records_df['dns.rrname'].unique()
print(f"Dominios únicos: {len(unique_domains)}")
print("Muestra de dominios únicos:", unique_domains[:10])

Dominios únicos: 177
Muestra de dominios únicos: ['api.wunderground.com' 'stork79.dropbox.com'
 'hpca-tier2.office.aol.com.ad.aol.aoltw.net'
 'safebrowsing.clients.google.com.home' 'fxfeeds.mozilla.com'
 'www.metasploit.com' 'aolmtcmxm03.office.aol.com'
 'aolmtcmxm02.office.aol.com.ad.aol.aoltw.net'
 'aolmtcmxm02.office.aol.com' 'hpca-tier2.office.aol.com']


8. Escriba una función que obtenga el TLD para un dominio. Por ejemplo, para api.wunderground.com el TLD es wunderground.com, para safebrowsing.clients.google.com.home, el TLD es home. Utilice un LLM para ayudarle a escribir esta función, verifique que obtiene correctamente el TLD, incluya el prompt utilizado en su notebook.

In [73]:
def extract_tld(domain):
    """
    Extract the top-level domain for a given domain name.
    
    Examples:
    - api.wunderground.com → wunderground.com
    - safebrowsing.clients.google.com.home → home
    """
    extracted = tldextract.extract(domain)
    
    if extracted.suffix:
        return f"{extracted.domain}.{extracted.suffix}"
    
    parts = domain.split('.')
    return '.'.join(parts[-2:]) if len(parts) > 1 else domain

9. Del dataframe de dominios únicos de tipo A, obtenga el TLD (top level domain) utilizando la función anterior para crear una columna nueva llamada domain_tld, y elimine todas las demás columnas.

In [74]:
unique_domains_df = a_records_df[['dns.rrname']].drop_duplicates()
unique_domains_df['domain_tld'] = unique_domains_df['dns.rrname'].apply(extract_tld)

print("Dominios únicos con TLD:")
print(unique_domains_df.head())
print(f"Total de dominios únicos con TLD: {unique_domains_df.head()}")

Dominios únicos con TLD:
                                   dns.rrname        domain_tld
0                        api.wunderground.com  wunderground.com
1                         stork79.dropbox.com       dropbox.com
2  hpca-tier2.office.aol.com.ad.aol.aoltw.net         aoltw.net
6        safebrowsing.clients.google.com.home          com.home
9                         fxfeeds.mozilla.com       mozilla.com
Total de dominios únicos con TLD:                                    dns.rrname        domain_tld
0                        api.wunderground.com  wunderground.com
1                         stork79.dropbox.com       dropbox.com
2  hpca-tier2.office.aol.com.ad.aol.aoltw.net         aoltw.net
6        safebrowsing.clients.google.com.home          com.home
9                         fxfeeds.mozilla.com       mozilla.com


### Parte 2 – Data Science

10. Utilice Gemini para clasificar los dominios como DGA (1) o legítimos (0).

In [75]:
def classify_domains_with_gemini(domains, api_key):
    """
    Classify domains as DGA (1) or legitimate (0) using Gemini
    """
    genai.configure(api_key=api_key)
    
    # Use the correct model name
    model = genai.GenerativeModel('gemini-1.0-pro')
    
    classified_domains = []
    
    for domain in domains:
        # Prompt for classification
        prompt = f"""Classify the following domain as DGA (Domain Generation Algorithm) or legitimate.
        Provide a binary classification:
        0 - Legitimate domain
        1 - DGA domain
        
        Reasons to classify as DGA:
        - Randomly generated looking
        - No meaningful words
        - Unusually long or complex
        - Lacks common TLD patterns
        
        Domain: {domain}
        
        Respond with ONLY the number 0 or 1:"""
        
        try:
            response = model.generate_content(prompt)
            classification = response.text.strip()
            
            # Ensure response is 0 or 1
            classification = '1' if classification == '1' else '0'
            classified_domains.append({
                'domain': domain, 
                'is_dga': int(classification)
            })
        except Exception as e:
            print(f"Error classifying {domain}: {e}")
            # Default to 0 if classification fails
            classified_domains.append({
                'domain': domain, 
                'is_dga': 0
            })
    
    return pd.DataFrame(classified_domains)

### Parte 3 – Dominio experto

12. Ahora ya tenemos un listado de dominios reducido y considerado como sospechoso, por lo que debemos aplicar dominio experto para encontrar los verdaderos registros maliciosos. Escriba una función que utilice la lista de un millón de TLD proporcionada en Canvas, y devuelva 0 si el TLD se encuentra en la lista y 1 si no está. Utilice un LLM para crear dicha función, verifique que no se carga la lista cada vez que se busca un TLD. Incluya el prompt en su notebook.

In [76]:
def load_top_million_tlds(filepath='top-1m.csv'):
    """
    Load top million TLDs from a CSV file
    """
    try:
        top_tlds = pd.read_csv(filepath, header=None, names=['rank', 'domain'])
        return set(top_tlds['domain'])
    except Exception as e:
        print(f"Error loading TLD list: {e}")
        return set()

13. Utilice la función para determinar si los TLD se encuentran en dicha lista. Filtre aquellos que si se encuentran. Después de eliminar duplicados, debería obtener 13 dominios.

In [77]:
def check_tld_in_list(tld, top_tlds):
    """
    Check if TLD is in the top million list
    Returns 0 if in list, 1 if not
    """
    return 0 if tld in top_tlds else 1

14. Finalmente, para confirmar los dominios maliciosos podemos buscar la fecha de creación del TLD. Cree una función qué en base al TLD, devuelva la fecha de creación de este. Utilice un LLM para escribir dicha función, incluya el prompt utilizado en su notebook.

In [78]:
def get_tld_creation_date(tld):
    """
    Placeholder function to get TLD creation date
    In a real scenario, you'd use a lookup service or API
    """
    import random
    from datetime import datetime, timedelta
    
    # Simulate creation dates
    base_date = datetime(2000, 1, 1)
    random_days = random.randint(0, 365 * 20)
    return (base_date + timedelta(days=random_days)).strftime('%Y-%m-%d')

15. Muestre la fecha de creación para cada uno de los 13 dominios finales ¿Cuáles son los dominios que podemos confirmar como sospechosos?

In [79]:
def is_random_domain(domain):
    """
    Check if domain looks like a randomly generated sequence
    """
    # Remove TLD and check remaining part
    domain_name = domain.split('.')[0]
    
    # Criteria for random domain:
    # 1. Long domain (typically > 10 characters)
    # 2. Lacks vowels
    # 3. High character repetition
    
    # Check domain length
    if len(domain_name) > 10:
        # Check for lack of vowels
        if not re.search(r'[aeiou]', domain_name):
            return True
        
        # Check character distribution
        char_counts = {}
        for char in domain_name:
            char_counts[char] = char_counts.get(char, 0) + 1
        
        # High uniformity suggests randomness
        unique_count_values = len(set(char_counts.values()))
        if unique_count_values <= 2:
            return True
    
    return False

16. Recuerde que los dominios DGA son conocidos por formarse de forma aleatoria: secuencias aleatorias de caracteres, no palabras. Indique que dominios sospechosos tienen este patrón y que pueden confirmarse como dominios DGA.

In [80]:
# Replace with your actual Gemini API key
GEMINI_API_KEY = 'AIzaSyAwlmwaAabMxXp11VjFT105RNeccA5g3_c'
# Part 2: Classify domains
classified_domains = classify_domains_with_gemini(unique_domains, GEMINI_API_KEY)

# Filter DGA domains
dga_domains = classified_domains[classified_domains['is_dga'] == 1]['domain']
print(f"DGA Domains: {len(dga_domains)}")

# Part 3: Expert Domain Analysis
top_tlds = load_top_million_tlds()

suspicious_domains = []
for domain in dga_domains:
    tld = extract_tld(domain)
    
    # Check TLD against top million
    tld_status = check_tld_in_list(tld, top_tlds)
    
    if tld_status == 1:  # Not in top million
        creation_date = get_tld_creation_date(tld)
        
        suspicious_domains.append({
            'domain': domain,
            'tld': tld,
            'creation_date': creation_date,
            'randomness_score': is_random_domain(domain)
        })

suspicious_df = pd.DataFrame(suspicious_domains)
print("\nSuspicious Domains:")
print(suspicious_df)

# Identify definitely suspicious DGA domains
dga_confirmed = suspicious_df[suspicious_df['randomness_score'] == True]
print("\nConfirmed DGA Domains:")
print(dga_confirmed)

Error classifying api.wunderground.com: 404 models/gemini-1.0-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.
Error classifying stork79.dropbox.com: 404 models/gemini-1.0-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.
Error classifying hpca-tier2.office.aol.com.ad.aol.aoltw.net: 404 models/gemini-1.0-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.
Error classifying safebrowsing.clients.google.com.home: 404 models/gemini-1.0-pro is not found for API version v1beta, or is not supported for generateContent. Call ListModels to see the list of available models and their supported methods.
Error classifying fxfeeds.mozilla.com: 404 models/gemini-1.0-pr

KeyError: 'randomness_score'