# Laboratorio 5 Threat Hunting

In [3]:
import json
import pandas as pd

# Cargar los datos desde el archivo JSON
file_path = "large_eve.json" 

with open(file_path, "r", encoding="utf-8") as file:
    data = [json.loads(line) for line in file]

# Mostrar la cantidad total de registros
print(f"Cantidad total de registros: {len(data)}")  
# Filtrar solo los registros de tipo DNS
dns_records = [entry for entry in data if entry.get("event_type") == "dns"]
print(f"Cantidad de registros DNS: {len(dns_records)}") 

# Mostrar 2 registros de ejemplo
print("Ejemplo de registros DNS:")
for record in dns_records[:2]:
    print(json.dumps(record, indent=2))

Cantidad total de registros: 746909
Cantidad de registros DNS: 15749
Ejemplo de registros DNS:
{
  "timestamp": "2017-07-22T17:33:16.661646-0500",
  "flow_id": 1327836194150542,
  "pcap_cnt": 22269,
  "event_type": "dns",
  "vlan": 110,
  "src_ip": "2001:0dbb:0c18:0011:0260:6eff:fe30:0863",
  "src_port": 59680,
  "dest_ip": "2001:0500:0001:0000:0000:0000:803f:0235",
  "dest_port": 53,
  "proto": "UDP",
  "dns": {
    "type": "query",
    "id": 15529,
    "rrname": "api.wunderground.com",
    "rrtype": "A",
    "tx_id": 0
  }
}
{
  "timestamp": "2017-07-22T17:33:24.990320-0500",
  "flow_id": 2022925111925872,
  "pcap_cnt": 54352,
  "event_type": "dns",
  "vlan": 110,
  "src_ip": "2001:0dbb:0c18:0011:0260:6eff:fe30:0863",
  "src_port": 38051,
  "dest_ip": "2001:0500:0003:0000:0000:0000:0000:0042",
  "dest_port": 53,
  "proto": "UDP",
  "dns": {
    "type": "query",
    "id": 58278,
    "rrname": "stork79.dropbox.com",
    "rrtype": "A",
    "tx_id": 0
  }
}


In [4]:
# Normalizar los datos con json_normalize
from pandas import json_normalize

df = json_normalize(dns_records)
print(f"Shape del DataFrame: {df.shape}")  

# Filtrar registros de tipo A (los que contienen una dirección IP asociada a un dominio)
df_a_records = df[df["dns.rrtype"] == "A"]
print(f"Cantidad de registros tipo A: {df_a_records.shape[0]}")

# Filtrar dominios únicos
domain_unique = df_a_records["dns.rrname"].dropna().unique()
print(f"Cantidad de dominios únicos: {len(domain_unique)}")  
# Función para obtener el TLD de un dominio
def get_tld(domain):
    """
    source: GTP-4o
    prompt: Generame una funcion para obtener los TDL de un dominio.
    """
    parts = domain.split('.')
    if len(parts) > 1:
        return '.'.join(parts[-2:])
    return domain

# Aplicar la función a los dominios únicos
domain_tld_df = pd.DataFrame(domain_unique, columns=["dns.rrname"])
domain_tld_df["domain_tld"] = domain_tld_df["dns.rrname"].apply(get_tld)

# Eliminar todas las demás columnas dejando solo domain_tld
domain_tld_df = domain_tld_df[["dns.rrname", "domain_tld"]]
print(domain_tld_df.head())

# Guardar en un archivo CSV
domain_tld_df.to_csv("dominios.csv", index=False, encoding="utf-8")
print("Archivo 'dominios.csv' guardado con éxito.")


Shape del DataFrame: (15749, 18)
Cantidad de registros tipo A: 2849
Cantidad de dominios únicos: 177
                                   dns.rrname        domain_tld
0                        api.wunderground.com  wunderground.com
1                         stork79.dropbox.com       dropbox.com
2  hpca-tier2.office.aol.com.ad.aol.aoltw.net         aoltw.net
3        safebrowsing.clients.google.com.home          com.home
4                         fxfeeds.mozilla.com       mozilla.com
Archivo 'dominios.csv' guardado con éxito.


### Parte 2

In [30]:
import google.generativeai as genia
from dotenv import load_dotenv
import os

load_dotenv()

# Intentar cargar la API KEY desde el archivo .env
API_KEY = os.getenv("API_KEY")

# Configurar la API de Gemini
genia.configure(api_key=API_KEY)

model = genia.GenerativeModel(model_name='gemini-2.0-flash')

In [31]:
import time
# Cargar los datos desde el archivo CSV
file_path = "dominios.csv"
domain_tld_df = pd.read_csv(file_path)

# Asegurar que la columna de dominios se llama correctamente
if "dns.rrname" not in domain_tld_df.columns:
    raise ValueError("El archivo CSV no contiene la columna esperada 'dns.rrname'")

# Convertir a string para evitar errores
domain_tld_df["dns.rrname"] = domain_tld_df["dns.rrname"].astype(str)

# Aplicar la función a los dominios únicos
domain_tld_df["domain_tld"] = domain_tld_df["dns.rrname"].apply(get_tld)

# Clasificación con Gemini en lotes para evitar límites de la API
def classify_domains_batch(domains, batch_size=10):
    classifications = []
    for i in range(0, len(domains), batch_size):
        batch = domains[i:i+batch_size]
        prompt = "Para cada dominio en la siguiente lista, responde con 1 si es un dominio generado por algoritmo (DGA) o 0 si es legítimo. Responde solo con una lista de números, en el mismo orden de los dominios, separados por espacios.\n"
        prompt += "\n".join(batch)
        response = model.generate_content(prompt)
        try:
            results = [int(x.strip()) if x.strip().isdigit() else 0 for x in response.text.split()] 
            if len(results) == len(batch):
                classifications.extend(results)
            else:
                classifications.extend([0] * len(batch))  # En caso de error, asumir legítimos
        except Exception:
            classifications.extend([0] * len(batch))  # Manejo de errores
        time.sleep(1)  # Evitar sobrecarga en la API
    return classifications

In [37]:
# Aplicar clasificación por lotes
domain_list = domain_tld_df["dns.rrname"].tolist()
domain_tld_df["dga_classification"] = classify_domains_batch(domain_list, batch_size=10)

# Guardar los resultados en el archivo CSV
domain_tld_df.to_csv("dominios_clasificados.csv", index=False)

# Filtrar los dominios considerados como DGA
dga_domains = domain_tld_df[domain_tld_df["dga_classification"] == 1]
dga_domains = dga_domains.drop_duplicates()
print(f"Cantidad de registros DGA únicos: {len(dga_domains)}")
print(dga_domains.head())

Cantidad de registros DGA únicos: 34
                              dns.rrname                 domain_tld  \
18              192.168.22.110phpmyadmin           22.110phpmyadmin   
24  192.168.22.110phpmyadmin.localdomain  110phpmyadmin.localdomain   
27                  proxim.ntkrnlpa.info              ntkrnlpa.info   
32           AOLDTCMA04.ad.aol.aoltw.net                  aoltw.net   
33                       gg.arrancar.org               arrancar.org   

    dga_classification  
18                   1  
24                   1  
27                   1  
32                   1  
33                   1  


### Parte 3

In [43]:
TLD_LIST_PATH = "top-1m.csv"

In [44]:
def load_tld_list():
    """Carga la lista de TLDs en memoria para evitar recargarla en cada consulta."""
    if not os.path.exists(TLD_LIST_PATH):
        raise FileNotFoundError(f"No se encontró el archivo {TLD_LIST_PATH}")

    # Leer CSV y extraer solo la columna de dominios
    df = pd.read_csv(TLD_LIST_PATH, header=None, names=["rank", "domain"])
    
    # Extraer solo los TLDs (última parte del dominio)
    df["tld"] = df["domain"].apply(lambda x: x.split(".")[-1])  # Extraer la última parte

    return set(df["tld"])  # Convertir a conjunto para búsqueda rápida

In [45]:
TLD_SET = load_tld_list()  # Cargarlo solo una vez en memoria

In [46]:
def is_suspicious_tld(tld):
    """Devuelve 1 si el TLD no está en la lista (sospechoso), 0 si está en la lista."""
    return 1 if tld.lower() not in TLD_SET else 0

In [56]:
# Ruta del archivo CSV con los dominios sospechosos
SUSPICIOUS_CSV_PATH = "dominios_clasificados.csv"

def filter_suspicious_domains():
    """Filtra los dominios sospechosos con TLDs fuera del top-1m."""
    if not os.path.exists(SUSPICIOUS_CSV_PATH):
        raise FileNotFoundError(f"No se encontró el archivo {SUSPICIOUS_CSV_PATH}")

    # Leer CSV
    df = pd.read_csv(SUSPICIOUS_CSV_PATH)

    # Suponiendo que la columna de dominios se llama 'domain'
    df["tld"] = df["dns.rrname"].apply(lambda x: x.split(".")[-1] if isinstance(x, str) else "")  # Extraer TLD

    # Filtrar dominios con TLDs fuera del top-1m
    df["is_suspicious"] = df["tld"].apply(is_suspicious_tld)
    filtered_df = df[df["is_suspicious"] == 1].drop_duplicates()  # Eliminar duplicados
    filtered_df.to_csv("dominios_sospechosos.csv", index=False)  # Guardar en CSV
    return filtered_df

# Ejecutar filtro
filtered_domains = filter_suspicious_domains()
print(filtered_domains)
print(f"Cantidad de dominios sospechosos: {len(filtered_domains)}")


                                         dns.rrname  \
3              safebrowsing.clients.google.com.home   
13                                        wpad.home   
18                         192.168.22.110phpmyadmin   
20              secure.informaction.com.localdomain   
21      safebrowsing.clients.google.com.localdomain   
24             192.168.22.110phpmyadmin.localdomain   
39                                www.theanime.cn.    
43   safebrowsing.clients.google.com.hackerlabs.vpn   
52                     secure.informaction.com.home   
63           secure.informaction.com.hackerlabs.vpn   
66                                             wpad   
67              safebrowsing.clients.google.com.lan   
71                                 "192.168.206.56"   
88                                  192.168.26-27.0   
126                                              FL   
131                                         saruman   
135                                 192.168.21.1201   
136       

In [None]:
import whois

def get_tld_creation_date(tld):
    """Obtiene la fecha de creación de un TLD desde WHOIS."""
    try:
       domain_info = whois.whois(tld) 
       if domain_info.creation_date:
            if isinstance(domain_info.creation_date, list):
                return domain_info.creation_date[0]
            return domain_info.creation_date
       return None
    except Exception as e:
        return f"Error: {str(e)}"

# Obtener la fecha de creación para los TLDs sospechosos
filtered_domains['creation_date'] = filtered_domains['domain_tld'].apply(get_tld_creation_date)

tld_creation_dates = {tld: get_tld_creation_date(tld) for tld in filtered_domains["dns.rrname"].unique()}

print("\nFechas de creación de los TLDs sospechosos:")
for tld, date in tld_creation_dates.items():
    print(f"{tld}")

print(f"Cantidad de creacion de dominios sospechosos: {len(tld_creation_dates)}")


Fechas de creación de los TLDs sospechosos:
safebrowsing.clients.google.com.home
wpad.home
192.168.22.110phpmyadmin
secure.informaction.com.localdomain
safebrowsing.clients.google.com.localdomain
192.168.22.110phpmyadmin.localdomain
www.theanime.cn. 
safebrowsing.clients.google.com.hackerlabs.vpn
secure.informaction.com.home
secure.informaction.com.hackerlabs.vpn
wpad
safebrowsing.clients.google.com.lan
"192.168.206.56"
192.168.26-27.0
FL
saruman
192.168.21.1201
nan
whitecell.localdomain
1922.168.22.254
1922.168.22.254.home
192.168.21-28.0
192.168.21-28.0.home
192.168.22.201:
ntp.ubuntu.com.localdomain
Cantidad de creacion de dominios sospechosos: 25


In [70]:
import re

def is_dga_domain(domain):
    """Determina si un dominio parece generado aleatoriamente (DGA)."""
    if isinstance(domain, str):  # Verifica que sea una cadena
        return bool(re.match(r'[a-zA-Z0-9]{6,}', domain))  # Solo letras y más de 8 caracteres
    return False  # Si no es una cadena, no es un dominio DGA

# Aplicar la detección de DGA a los dominios sospechosos
filtered_domains["is_dga"] = filtered_domains["dns.rrname"].apply(is_dga_domain)

# Mostrar los dominios confirmados como DGA
dga_domains = filtered_domains[filtered_domains["is_dga"]]
print("\nDominios que parecen DGA:")
print(dga_domains["dns.rrname"]) 
print(f"Cantidad de dominios DGA: {len(dga_domains)}")


Dominios que parecen DGA:
3                safebrowsing.clients.google.com.home
20                secure.informaction.com.localdomain
21        safebrowsing.clients.google.com.localdomain
43     safebrowsing.clients.google.com.hackerlabs.vpn
52                       secure.informaction.com.home
63             secure.informaction.com.hackerlabs.vpn
67                safebrowsing.clients.google.com.lan
131                                           saruman
151                             whitecell.localdomain
Name: dns.rrname, dtype: object
Cantidad de dominios DGA: 9


### Conclusion


No se llegaron a detectar dominios sospechosos los cuales posean caracteres aleatorios. Esto puede ser por varios factores pero en este caso el factor principal es por el uso de Gemini, especificamente por el prompt utilizado y por la temperatura del modelo. Gemini no siempre clasifica de manera correcta los dominios que son generados por algoritmos y con un prompt simple se identificaron pocos dominios DGA. Al tener un prompt mejor se incrementaron los dominios generados y la presición mejoro.  Se logra evidenciar que el prompt es vital en el rendimiento del modelo y si se busca llevar a cabo clasificaciones mas complejas se debe hacer más robusta la petición.