# Threat Hunting Lab 

## Fase 1 

In [1]:
import pandas as pd
import json
from tldextract import extract

In [2]:
file_path = 'large_eve.json'
with open(file_path, 'r') as file:
    data = [json.loads(line) for line in file]

print(f'Total de registros: {len(data)}')  # Debe ser 746909

Total de registros: 746909


In [3]:
dns_records = [record for record in data if record.get('event_type') == 'dns']
print(f'Total de registros DNS: {len(dns_records)}')  # Debe ser 21484

# 4. Mostrar 2 registros cualesquiera
print(json.dumps(dns_records[:2], indent=2))

Total de registros DNS: 15749
[
  {
    "timestamp": "2017-07-22T17:33:16.661646-0500",
    "flow_id": 1327836194150542,
    "pcap_cnt": 22269,
    "event_type": "dns",
    "vlan": 110,
    "src_ip": "2001:0dbb:0c18:0011:0260:6eff:fe30:0863",
    "src_port": 59680,
    "dest_ip": "2001:0500:0001:0000:0000:0000:803f:0235",
    "dest_port": 53,
    "proto": "UDP",
    "dns": {
      "type": "query",
      "id": 15529,
      "rrname": "api.wunderground.com",
      "rrtype": "A",
      "tx_id": 0
    }
  },
  {
    "timestamp": "2017-07-22T17:33:24.990320-0500",
    "flow_id": 2022925111925872,
    "pcap_cnt": 54352,
    "event_type": "dns",
    "vlan": 110,
    "src_ip": "2001:0dbb:0c18:0011:0260:6eff:fe30:0863",
    "src_port": 38051,
    "dest_ip": "2001:0500:0003:0000:0000:0000:0000:0042",
    "dest_port": 53,
    "proto": "UDP",
    "dns": {
      "type": "query",
      "id": 58278,
      "rrname": "stork79.dropbox.com",
      "rrtype": "A",
      "tx_id": 0
    }
  }
]


In [4]:
df = pd.json_normalize(dns_records)
print(f'Shape del DataFrame: {df.shape}')

Shape del DataFrame: (15749, 18)


In [6]:
df_a = df[df['dns.rrtype'] == 'A']
print(f'Total de registros DNS tipo A: {len(df_a)}')

Total de registros DNS tipo A: 2849


In [7]:
domain_column = 'dns.rrname'  # Ajustar según los datos
unique_domains = df_a[domain_column].dropna().unique()
print(f'Total de dominios únicos: {len(unique_domains)}')

Total de dominios únicos: 177


In [8]:
def get_tld(domain):
    extracted = extract(domain)
    if extracted.suffix:
        return f"{extracted.domain}.{extracted.suffix}"
    return extracted.domain

In [None]:
df_domains = pd.DataFrame({'domain'ffffff: unique_domains})
df_domains['domain_tld'] = df_domains['domain'].apply(get_tld)

In [10]:
df_domains.to_csv('filtered_domains.csv', index=False)
print(df_domains.head())


                                       domain        domain_tld
0                        api.wunderground.com  wunderground.com
1                         stork79.dropbox.com       dropbox.com
2  hpca-tier2.office.aol.com.ad.aol.aoltw.net         aoltw.net
3        safebrowsing.clients.google.com.home              home
4                         fxfeeds.mozilla.com       mozilla.com


In [24]:
df_domains.describe()

Unnamed: 0,dga_label
count,177.0
mean,-0.960452
std,0.268869
min,-1.0
25%,-1.0
50%,-1.0
75%,-1.0
max,1.0


## Fase 2

In [20]:
import os
import google.generativeai as genai

GOOGLE_API_KEY = "AIzaSyAjgUio7sSx1-oZPm8hVsNGbkp17p8OyMQ"
genai.configure(api_key=GOOGLE_API_KEY)

model = genai.GenerativeModel("models/gemini-1.5-pro")

import re

def classify_domain(domain):
    prompt = f"""
E¿El dominio {domain} es generado por un algoritmo DGA (1) o legítimo (0)?
"""
    try:
        response = model.generate_content(prompt)
        text = response.text.strip()

        # Buscar el primer 0 o 1 en cualquier parte del texto
        match = re.search(r'[01]', text)
        if match:
            return int(match.group(0))
        else:
            print(f"[!] Respuesta inesperada para {domain}: '{text}'")
            return -1
    except Exception as e:
        print(f"[X] Error clasificando {domain}: {e}")
        return -1

In [21]:
response = model.generate_content("Clasifica el dominio api.wunderground.com como DGA (1) o legítimo (0). Solo responde con 1 o 0.")
print(response.text.strip())

0


In [25]:
df_domains["dga_label"] = df_domains["domain"].apply(classify_domain)
df_domains["dga_label"].value_counts()

dga_label
0    136
1     41
Name: count, dtype: int64

In [26]:
 #Filtrar solo dominios clasificados como DGA
dga_domains = df_domains[df_domains["dga_label"] == 1]
dga_domains_unique = dga_domains.drop_duplicates(subset=["domain"])

print("Cantidad de dominios clasificados como DGA (con posibles duplicados):", dga_domains.shape[0])
print("Cantidad de dominios DGA únicos:", dga_domains_unique.shape[0])
dga_domains_unique.head()

Cantidad de dominios clasificados como DGA (con posibles duplicados): 41
Cantidad de dominios DGA únicos: 41


Unnamed: 0,domain,domain_tld,dga_label
2,hpca-tier2.office.aol.com.ad.aol.aoltw.net,aoltw.net,1
3,safebrowsing.clients.google.com.home,home,1
14,safebrowsing.clients.google.com.stayonline.net,stayonline.net,1
18,192.168.22.110phpmyadmin,110phpmyadmin,1
20,secure.informaction.com.localdomain,localdomain,1


## Fase 3 

In [27]:
import pandas as pd
from urllib.parse import urlparse

# Cargar el archivo top-1m.csv
tld_df = pd.read_csv("top-1m.csv", header=None, usecols=[1], names=["domain"])

# Función para extraer el TLD del dominio
def extract_tld(domain):
    parts = domain.split(".")
    return ".".join(parts[-2:]) if len(parts) > 1 else domain  # Obtiene el TLD correcto

# Crear la lista de TLDs únicos
tld_df["tld"] = tld_df["domain"].apply(extract_tld)
tld_set = set(tld_df["tld"])  # Convertir a conjunto para búsqueda rápida

In [28]:
tld_df.head()

Unnamed: 0,domain,tld
0,google.com,google.com
1,www.google.com,google.com
2,microsoft.com,microsoft.com
3,data.microsoft.com,microsoft.com
4,events.data.microsoft.com,microsoft.com


In [29]:
def is_tld_suspicious(tld):
    return 0 if tld in tld_set else 1

In [30]:
dga_domains_unique["suspicious_tld"] = dga_domains_unique["domain_tld"].apply(is_tld_suspicious)

In [31]:
filtered_domains = dga_domains_unique[dga_domains_unique["suspicious_tld"] == 1].drop_duplicates(subset=["domain"])
print("Cantidad de dominios sospechosos después del filtrado:", filtered_domains.shape[0])


Cantidad de dominios sospechosos después del filtrado: 32


In [32]:
def get_tld_creation_date(tld):
    
    prompt = f"""
    ¿En qué año fue creado el TLD '{tld}'? Responde solo con el año de creación.
    """
    try:
        response = model.generate_content(prompt)
        text = response.text.strip()
        year_match = re.search(r'\d{4}', text)
        if year_match:
            return int(year_match.group(0))
        else:
            print(f"[!] No se encontró fecha para {tld}, respuesta: '{text}'")
            return None
    except Exception as e:
        print(f"[X] Error obteniendo fecha de {tld}: {e}")
        return None


In [33]:
filtered_domains["tld_creation_year"] = filtered_domains["domain_tld"].apply(get_tld_creation_date)


[!] No se encontró fecha para FL, respuesta: 'Nunca se creó. El TLD .fl fue propuesto para Florida, pero nunca se implementó.'
[!] No se encontró fecha para 192.168.22.201, respuesta: 'Un TLD (dominio de nivel superior) es la parte final de un nombre de dominio, como .com, .org, .net, etc.  192.168.22.201 es una dirección IP, no un nombre de dominio y, por lo tanto, no tiene un TLD. Las direcciones IP no se "crean" en el mismo sentido que los TLD. Son parte del esquema de direccionamiento IP definido en los estándares de red.

Por lo tanto, no hay un año de "creación" para 192.168.22.201.'


In [34]:
import re

def looks_like_dga(domain):
    
    domain_name = domain.split('.')[0]  # Tomamos solo el subdominio principal
    return int(bool(re.search(r'[^aeiou]{5,}', domain_name)))  # 1 = DGA, 0 = No DGA

filtered_domains["confirmed_dga"] = filtered_domains["domain"].apply(looks_like_dga)


In [35]:
confirmed_dga_domains = filtered_domains[filtered_domains["confirmed_dga"] == 1]
print("Dominios confirmados como DGA:", confirmed_dga_domains.shape[0])
confirmed_dga_domains.head()


Dominios confirmados como DGA: 2


Unnamed: 0,domain,domain_tld,dga_label,suspicious_tld,tld_creation_year,confirmed_dga
160,vtlfccmfxlkgifuf.com,vtlfccmfxlkgifuf.com,1,1,2023.0,1
167,ejfodfmfxlkgifuf.xyz,ejfodfmfxlkgifuf.xyz,1,1,2022.0,1
