# TD SIEM : Suricata, Filebeat, Elasticsearch et Kibana

## Objectifs du TD

Ce TD vous permettra de :
- Configurer Suricata (syst√®me de d√©tection d'intrusion) pour g√©n√©rer des logs EVE JSON
- Utiliser Filebeat pour collecter et envoyer automatiquement les logs vers Elasticsearch
- Explorer et analyser les donn√©es dans Elasticsearch
- Cr√©er des visualisations dans Kibana
- Configurer des alertes pour la d√©tection de menaces

## Architecture SIEM

```
Suricata ‚Üí Filebeat ‚Üí Elasticsearch ‚Üí Kibana
   (IDS)    (Collecteur)   (Stockage)   (Visualisation/Alerting)
```

## Pr√©requis

### 1. Installation des d√©pendances

Assurez-vous d'avoir install√© les d√©pendances du projet :
```bash
uv sync
```

### 2. Pr√©paration des r√©pertoires

Avant de d√©marrer la stack, cr√©ez les r√©pertoires n√©cessaires pour Suricata :

```bash
mkdir -p suricata/{logs,config,rules}
```

### 3. Configuration de Suricata

Cr√©ez le fichier `suricata/config/suricata.yaml` avec la configuration minimale suivante :

```yaml
# Configuration des logs EVE (JSON)
- eve-log:
    enabled: yes
    filetype: regular
    filename: eve.json
    types:
      - alert
      - http
      - dns
      - tls
      - files
      - ssh
      - stats
      - flow
    json: yes

# Interface r√©seau
af-packet:
  - interface: eth0
    cluster-id: 99
    cluster-type: cluster_flow
    defrag: yes

# R√®gles
default-rule-path: /var/lib/suricata/rules
rule-files:
  - suricata.rules
```

### 4. D√©marrage de la stack compl√®te SIEM

Le fichier `docker-compose-siem.yml` inclut toute la stack n√©cessaire pour ce TD :
- Cluster Elasticsearch (3 n≈ìuds)
- Kibana
- Suricata (IDS/IPS) - g√©n√®re des logs dans `./suricata/logs/eve.json`
- Filebeat (collecteur de logs) - lit depuis `./suricata/logs` et indexe dans Elasticsearch

**D√©marrage de la stack compl√®te :**
```bash
docker-compose -f docker-compose-siem.yml up -d
```

**V√©rification :**
- Elasticsearch : `https://localhost:9200` (ou le port configur√© dans `.env`)
- Kibana : `http://localhost:5601` (ou le port configur√© dans `.env`)

**Note :** Si vous utilisez un cluster avec s√©curit√© activ√©e, vous devrez utiliser les identifiants d√©finis dans le fichier `.env` (ELASTIC_PASSWORD).

**Important :** Filebeat indexe automatiquement les logs Suricata dans des index nomm√©s `suricata-YYYY.MM.DD`. Les donn√©es seront disponibles apr√®s que Suricata ait g√©n√©r√© des logs et que Filebeat les ait collect√©s.

### 4. V√©rification de la stack

La cellule suivante v√©rifie que tous les services sont correctement d√©marr√©s.

In [5]:
# V√©rification de la stack Docker
import subprocess
import os

def check_docker_container(container_name):
    """V√©rifie si un conteneur Docker est en cours d'ex√©cution"""
    try:
        result = subprocess.run(
            ['docker', 'ps', '--filter', f'name={container_name}', '--format', '{{.Names}}'],
            capture_output=True,
            text=True,
            timeout=5
        )
        return container_name in result.stdout
    except Exception:
        return False

def check_docker_compose():
    """V√©rifie si docker-compose est disponible"""
    try:
        subprocess.run(['docker', 'compose', 'version'], capture_output=True, check=True, timeout=5)
        return True
    except:
        try:
            subprocess.run(['docker-compose', 'version'], capture_output=True, check=True, timeout=5)
            return True
        except:
            return False

# V√©rifier Docker
try:
    subprocess.run(['docker', '--version'], capture_output=True, check=True, timeout=5)
    print("‚úÖ Docker est install√©")
except (FileNotFoundError, subprocess.CalledProcessError):
    print("‚ùå Docker n'est pas install√©")
    print("   Installez Docker: https://docs.docker.com/get-docker/")
    exit(1)

# V√©rifier docker-compose
if check_docker_compose():
    print("‚úÖ Docker Compose est disponible")
else:
    print("‚ùå Docker Compose n'est pas disponible")

# V√©rifier les conteneurs
services = ['es01', 'es02', 'es03', 'kibana', 'suricata', 'filebeat']
print("\nüì¶ Statut des services:")
for service in services:
    if check_docker_container(service):
        print(f"   ‚úÖ {service}: en cours d'ex√©cution")
    else:
        print(f"   ‚ö†Ô∏è  {service}: arr√™t√©")

print("\nüìù Pour arr√™ter:")
print("   docker-compose down")

‚úÖ Docker est install√©
‚úÖ Docker Compose est disponible

üì¶ Statut des services:
   ‚úÖ es01: en cours d'ex√©cution
   ‚úÖ es02: en cours d'ex√©cution
   ‚úÖ es03: en cours d'ex√©cution
   ‚úÖ kibana: en cours d'ex√©cution
   ‚úÖ suricata: en cours d'ex√©cution
   ‚úÖ filebeat: en cours d'ex√©cution

üìù Pour arr√™ter:
   docker-compose down


In [6]:
# Import des librairies n√©cessaires
from elasticsearch import Elasticsearch
from pprint import pprint
import json
import time
from datetime import datetime, timedelta

# Configuration de la connexion Elasticsearch
# Adaptez selon votre configuration (avec ou sans s√©curit√©)
ES_HOST = "https://localhost:9200"
ES_USER = "elastic"  # Modifiez si n√©cessaire
ES_PASSWORD = "changeme"  # D√©finissez votre mot de passe si s√©curit√© activ√©e

# Connexion au cluster Elasticsearch
if ES_PASSWORD:
    es = Elasticsearch(
        [ES_HOST],
        basic_auth=(ES_USER, ES_PASSWORD),
        verify_certs=False  # D√©sactiver la v√©rification SSL pour le d√©veloppement
    )
else:
    # Pour un cluster sans s√©curit√© (d√©veloppement uniquement)
    es = Elasticsearch("http://localhost:9200")

# V√©rification de la connexion
try:
    health = es.cluster.health()
    print("‚úÖ Connexion √©tablie avec Elasticsearch")
    print(f"   Statut du cluster: {health['status']}")
    print(f"   Nombre de n≈ìuds: {health['number_of_nodes']}")
except Exception as e:
    print(f"‚ùå Erreur de connexion: {e}")
    print("   V√©rifiez que votre cluster Elasticsearch est d√©marr√©")

‚úÖ Connexion √©tablie avec Elasticsearch
   Statut du cluster: green
   Nombre de n≈ìuds: 3


  _transport = transport_class(


## Partie 1 : Configuration de Suricata

### 1.1 V√©rification de l'installation de Suricata

In [7]:
# Cr√©ation des r√©pertoires n√©cessaires pour Suricata
import os

# Cr√©er les r√©pertoires si n√©cessaire
directories = [
    './suricata/logs',
    './suricata/config',
    './suricata/rules',
    './suricata_logs'
]

for directory in directories:
    os.makedirs(directory, exist_ok=True)
    print(f"‚úÖ R√©pertoire cr√©√©/v√©rifi√©: {directory}")

print("\nüìù Les r√©pertoires sont pr√™ts pour Suricata")

‚úÖ R√©pertoire cr√©√©/v√©rifi√©: ./suricata/logs
‚úÖ R√©pertoire cr√©√©/v√©rifi√©: ./suricata/config
‚úÖ R√©pertoire cr√©√©/v√©rifi√©: ./suricata/rules
‚úÖ R√©pertoire cr√©√©/v√©rifi√©: ./suricata_logs

üìù Les r√©pertoires sont pr√™ts pour Suricata


### 1.2 V√©rification des logs Suricata

Suricata g√©n√®re des logs au format JSON (EVE logs) dans `./suricata/logs/eve.json`. 
Filebeat lit automatiquement ces logs et les indexe dans Elasticsearch.

V√©rifions que les logs sont g√©n√©r√©s et que Filebeat les collecte correctement.

In [28]:
# V√©rification des logs Suricata et de Filebeat
import subprocess
import time

# V√©rifier que le fichier de logs existe
log_file = "./suricata/logs/eve.json"
if os.path.exists(log_file):
    # Compter le nombre de lignes (logs)
    with open(log_file, 'r') as f:
        log_count = sum(1 for line in f if line.strip())
    print(f"‚úÖ Fichier de logs trouv√©: {log_file}")
    print(f"   Nombre de lignes (logs): {log_count}")
    
    # Afficher les derni√®res lignes
    if log_count > 0:
        print("\nüìÑ Derniers logs (3 derni√®res lignes):")
        with open(log_file, 'r') as f:
            lines = f.readlines()
            for line in lines[-3:]:
                try:
                    log_entry = json.loads(line.strip())
                    print(f"   - {log_entry.get('event_type', 'unknown')} √† {log_entry.get('timestamp', 'N/A')}")
                except:
                    print(f"   - (ligne non-JSON)")
else:
    print(f"‚ö†Ô∏è  Fichier de logs non trouv√©: {log_file}")
    print("   Assurez-vous que Suricata est d√©marr√© et g√©n√®re des logs")
    print("   Les logs seront cr√©√©s automatiquement quand Suricata d√©tecte du trafic")

# V√©rifier le statut de Filebeat
print("\nüîç V√©rification de Filebeat:")
try:
    result = subprocess.run(
        ['docker', 'logs', '--tail', '10', 'filebeat'],
        capture_output=True,
        text=True,
        timeout=5
    )
    if result.returncode == 0:
        print("‚úÖ Filebeat est en cours d'ex√©cution")
        # Chercher des erreurs dans les logs
        if 'error' in result.stdout.lower() or 'error' in result.stderr.lower():
            print("   ‚ö†Ô∏è  Des erreurs d√©tect√©es dans les logs Filebeat")
        else:
            print("   ‚úÖ Aucune erreur d√©tect√©e")
    else:
        print("   ‚ö†Ô∏è  Impossible de r√©cup√©rer les logs Filebeat")
except Exception as e:
    print(f"   ‚ö†Ô∏è  Erreur lors de la v√©rification: {e}")

‚úÖ Fichier de logs trouv√©: ./suricata/logs/eve.json
   Nombre de lignes (logs): 64741

üìÑ Derniers logs (3 derni√®res lignes):
   - flow √† 2026-01-17T16:33:56.263903+0000
   - flow √† 2026-01-17T16:33:56.263916+0000
   - stats √† 2026-01-17T16:33:56.656711+0000

üîç V√©rification de Filebeat:
‚úÖ Filebeat est en cours d'ex√©cution
   ‚ö†Ô∏è  Des erreurs d√©tect√©es dans les logs Filebeat


In [12]:
# V√©rification des index cr√©√©s par Filebeat
# Filebeat 9.x cr√©e des data streams avec le pattern: .ds-suricata-YYYY.MM.DD-*
# ou des index normaux avec le pattern: suricata-YYYY.MM.DD

from datetime import datetime

# Obtenir la date actuelle pour construire le nom d'index
today = datetime.now().strftime("%Y.%m.%d")

print(f"üîç Recherche des index Suricata cr√©√©s par Filebeat...")
print(f"   Patterns attendus: suricata-* ou .ds-suricata-*")
print(f"   Date du jour: {today}\n")

# Lister tous les index qui correspondent aux patterns
all_indices = []
try:
    # Chercher les index normaux (utiliser get qui fonctionne mieux)
    try:
        indices_normal = es.indices.get(index="suricata-*")
        if indices_normal:
            all_indices.extend(indices_normal.keys())
    except:
        # Fallback sur get_alias si get √©choue
        try:
            indices_normal = es.indices.get_alias(index="suricata-*")
            if indices_normal:
                all_indices.extend(indices_normal.keys())
        except:
            pass
    
    # Chercher les data streams (commencent par .ds-) - get_alias ne fonctionne pas avec les data streams
    try:
        indices_ds = es.indices.get(index=".ds-suricata-*")
        if indices_ds:
            all_indices.extend(indices_ds.keys())
    except:
        pass
    
    if all_indices:
        print(f"‚úÖ {len(all_indices)} index(s) trouv√©(s):")
        for index_name in sorted(all_indices):
            try:
                count = es.count(index=index_name)
                print(f"   - {index_name}: {count['count']} documents")
            except:
                print(f"   - {index_name}: (erreur lors du comptage)")
    else:
        print("‚ö†Ô∏è  Aucun index trouv√© avec les patterns 'suricata-*' ou '.ds-suricata-*'")
        print("   Cela peut signifier que:")
        print("   1. Filebeat n'a pas encore index√© de donn√©es")
        print("   2. Suricata n'a pas encore g√©n√©r√© de logs")
        print("   3. Il y a un probl√®me de configuration")
        print("   V√©rifiez les logs Filebeat: docker logs filebeat")
except Exception as e:
    print(f"‚ö†Ô∏è  Erreur lors de la recherche des index: {e}")
    print("   V√©rifiez que Elasticsearch est accessible et que Filebeat fonctionne")

üîç Recherche des index Suricata cr√©√©s par Filebeat...
   Patterns attendus: suricata-* ou .ds-suricata-*
   Date du jour: 2026.01.17

‚úÖ 2 index(s) trouv√©(s):
   - .ds-suricata-2026.01.17-2026.01.17-000001: 121272 documents
   - .ds-suricata-2026.01.17-2026.01.17-000001: 121272 documents




## Partie 2 : Utilisation des donn√©es index√©es par Filebeat

Filebeat indexe automatiquement les logs Suricata dans des index nomm√©s `suricata-YYYY.MM.DD`. 
Nous allons maintenant explorer ces donn√©es dans Elasticsearch.

**Note importante :** Les donn√©es doivent √™tre index√©es par Filebeat. Si vous ne voyez pas d'index, 
attendez quelques instants que Suricata g√©n√®re des logs et que Filebeat les collecte.

In [29]:
# Fonction utilitaire pour obtenir le nom de l'index √† utiliser
def get_suricata_index():
    """Retourne le nom de l'index Suricata le plus r√©cent ou un pattern"""
    try:
        all_indices = []
        # Chercher les index normaux (utiliser get qui fonctionne mieux)
        try:
            indices_normal = es.indices.get(index="suricata-*")
            if indices_normal:
                all_indices.extend(indices_normal.keys())
        except:
            # Fallback sur get_alias si get √©choue
            try:
                indices_normal = es.indices.get_alias(index="suricata-*")
                if indices_normal:
                    all_indices.extend(indices_normal.keys())
            except:
                pass
        
        # Chercher les data streams (commencent par .ds-) - get_alias ne fonctionne pas avec les data streams
        try:
            indices_ds = es.indices.get(index=".ds-suricata-*")
            if indices_ds:
                all_indices.extend(indices_ds.keys())
        except:
            pass
        
        if all_indices:
            # Retourner l'index le plus r√©cent
            return sorted(all_indices)[-1]
        else:
            # Si aucun index n'existe, utiliser le pattern
            return "suricata-*"
    except:
        # En cas d'erreur, utiliser le pattern
        return "suricata-*"

# Fonction pour obtenir l'index de recherche (utilise le pattern si l'index sp√©cifique n'existe pas)
def get_search_index():
    """Retourne l'index √† utiliser pour les recherches (inclut les data streams)"""
    index_name = get_suricata_index()
    if index_name == "suricata-*":
        # Utiliser les deux patterns pour couvrir index normaux et data streams
        return "suricata-*,.ds-suricata-*"
    # V√©rifier si l'index existe
    try:
        if es.indices.exists(index=index_name):
            return index_name
        else:
            return "suricata-*,.ds-suricata-*"
    except:
        return "suricata-*,.ds-suricata-*"

# D√©terminer l'index √† utiliser
index_name = get_suricata_index()
print(f"üìä Index d√©tect√©: {index_name}")

# V√©rifier si des index existent (chercher les deux types)
try:
    all_indices_found = []
    
    # Chercher les index normaux
    try:
        indices_normal = es.indices.get(index="suricata-*")
        if indices_normal:
            all_indices_found.extend(indices_normal.keys())
    except:
        pass
    
    # Chercher les data streams
    try:
        indices_ds = es.indices.get(index=".ds-suricata-*")
        if indices_ds:
            all_indices_found.extend(indices_ds.keys())
    except:
        pass
    
    if all_indices_found:
        print(f"‚úÖ {len(all_indices_found)} index(s) trouv√©(s):")
        for idx in sorted(all_indices_found):
            try:
                count = es.count(index=idx)
                print(f"   - {idx}: {count['count']} document(s)")
            except:
                print(f"   - {idx}: (erreur lors du comptage)")
    else:
        print("‚ö†Ô∏è  Aucun index trouv√© avec les patterns 'suricata-*' ou '.ds-suricata-*'")
        print("   Filebeat cr√©era des index automatiquement lors de la premi√®re indexation")
        print("   Assurez-vous que:")
        print("   1. Suricata g√©n√®re des logs dans ./suricata/logs/eve.json")
        print("   2. Filebeat est d√©marr√© et fonctionne correctement")
except Exception as e:
    print(f"‚ö†Ô∏è  Erreur lors de la v√©rification des index: {e}")

üìä Index d√©tect√©: .ds-suricata-2026.01.17-2026.01.17-000001
‚úÖ 2 index(s) trouv√©(s):
   - .ds-suricata-2026.01.17-2026.01.17-000001: 124107 document(s)
   - .ds-suricata-2026.01.17-2026.01.17-000001: 124107 document(s)


In [35]:
# affiche le nombre de documents dans l'index
es.count(index=index_name)

KeyboardInterrupt: 

### 2.1 Exploration des donn√©es index√©es

Les donn√©es sont d√©j√† index√©es par Filebeat. Explorons-les maintenant.

In [14]:
# Recherche de tous les logs index√©s par Filebeat
# Utiliser le pattern suricata-* pour rechercher dans tous les index

try:
    # V√©rifier d'abord si l'index existe
    if not es.indices.exists(index=index_name):
        print(f"‚ö†Ô∏è  L'index {index_name} n'existe pas encore.")
        print("   Attendez que Filebeat indexe les logs de Suricata.")
        print("   Vous pouvez v√©rifier les logs Filebeat avec: docker logs filebeat")
    else:
        # Recherche de base
        query_all = {
            "match_all": {}
        }
        
        result = es.search(index=index_name, query=query_all, size=10)
        total = result['hits']['total']['value']
        
        print(f"üìä Total de documents dans l'index '{index_name}': {total}")
        print(f"   Documents retourn√©s: {len(result['hits']['hits'])}\n")
        
        if total > 0:
            print("üìÑ Exemples de documents index√©s:\n")
            for i, hit in enumerate(result['hits']['hits'][:5], 1):
                source = hit['_source']
                print(f"Document {i}:")
                print(f"   Type d'√©v√©nement: {source.get('event_type', 'N/A')}")
                if 'src_ip' in source:
                    print(f"   Source: {source.get('src_ip', 'N/A')}:{source.get('src_port', 'N/A')}")
                    print(f"   Destination: {source.get('dest_ip', 'N/A')}:{source.get('dest_port', 'N/A')}")
                if 'alert' in source:
                    alert = source['alert']
                    print(f"   Alerte: {alert.get('signature', 'N/A')}")
                    print(f"   S√©v√©rit√©: {alert.get('severity', 'N/A')}")
                if '@timestamp' in source:
                    print(f"   Timestamp: {source.get('@timestamp', 'N/A')}")
                print()
        else:
            print("‚ö†Ô∏è  L'index existe mais ne contient pas encore de documents.")
            print("   V√©rifiez que Suricata g√©n√®re des logs et que Filebeat les collecte.")
            
except Exception as e:
    print(f"‚ùå Erreur lors de la recherche: {e}")
    print("   V√©rifiez que Elasticsearch est accessible et que l'index existe.")

üìä Total de documents dans l'index '.ds-suricata-2026.01.17-2026.01.17-000001': 10000
   Documents retourn√©s: 10

üìÑ Exemples de documents index√©s:

Document 1:
   Type d'√©v√©nement: flow
   Source: 192.168.65.1:41588
   Destination: 192.168.65.7:2376
   Timestamp: 2026-01-17T16:31:15.700Z

Document 2:
   Type d'√©v√©nement: flow
   Source: 192.168.65.1:21106
   Destination: 192.168.65.7:2376
   Timestamp: 2026-01-17T16:31:15.700Z

Document 3:
   Type d'√©v√©nement: flow
   Source: 192.168.65.1:33296
   Destination: 192.168.65.7:2376
   Timestamp: 2026-01-17T16:31:15.700Z

Document 4:
   Type d'√©v√©nement: flow
   Source: 192.168.65.1:63138
   Destination: 192.168.65.7:2376
   Timestamp: 2026-01-17T16:31:15.700Z

Document 5:
   Type d'√©v√©nement: flow
   Source: 192.168.65.1:31983
   Destination: 192.168.65.7:2376
   Timestamp: 2026-01-17T16:31:15.700Z





## Partie 3 : Exploration des donn√©es dans Elasticsearch

### 3.1 Recherche de base dans les logs Suricata

In [17]:
# Recherche de tous les logs
query_all = {
    "match_all": {}
}

result = es.search(index=index_name, query=query_all, size=10)
print(f"üìä Total de documents: {result['hits']['total']['value']}")
print(f"   Documents retourn√©s: {len(result['hits']['hits'])}\n")

for hit in result['hits']['hits']:
    source = hit['_source']
    print(f"üìÑ Document {hit['_id']}:")
    print(f"   Type: {source.get('event_type', 'N/A')}")
    if 'alert' in source:
        print(f"   Alerte: {source['alert'].get('signature', 'N/A')}")
    print(f"   Source IP: {source.get('src_ip', 'N/A')} ‚Üí Dest IP: {source.get('dest_ip', 'N/A')}")
    print()

üìä Total de documents: 10000
   Documents retourn√©s: 10

üìÑ Document VRfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document VhfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document VxfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document WBfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document WRfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document WhfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document WxfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document XBfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ Document XRfMzJsBtz7UXYiikMD4:
   Type: flow
   Source IP: 192.168.65.1 ‚Üí Dest IP: 192.168.65.7

üìÑ



### 3.2 Recherche d'alertes sp√©cifiques

In [18]:
# Rechercher toutes les alertes (event_type = "alert")
query_alerts = {
    "bool": {
        "must": [
            {"term": {"event_type": "alert"}}
        ]
    }
}

result = es.search(index=index_name, query=query_alerts, size=100)
print(f"üö® Nombre d'alertes trouv√©es: {result['hits']['total']['value']}\n")

for hit in result['hits']['hits']:
    source = hit['_source']
    alert = source.get('alert', {})
    print(f"‚ö†Ô∏è  Alerte ID: {alert.get('signature_id', 'N/A')}")
    print(f"   Signature: {alert.get('signature', 'N/A')}")
    print(f"   Cat√©gorie: {alert.get('category', 'N/A')}")
    print(f"   S√©v√©rit√©: {alert.get('severity', 'N/A')}")
    print(f"   Source: {source.get('src_ip', 'N/A')}:{source.get('src_port', 'N/A')}")
    print(f"   Destination: {source.get('dest_ip', 'N/A')}:{source.get('dest_port', 'N/A')}")
    print()

üö® Nombre d'alertes trouv√©es: 0





### 3.3 Recherche d'alertes de haute s√©v√©rit√©

In [19]:
# Rechercher les alertes de haute s√©v√©rit√© (severity <= 1)
query_high_severity = {
    "bool": {
        "must": [
            {"term": {"event_type": "alert"}},
            {"range": {"alert.severity": {"lte": 1}}}
        ]
    }
}

result = es.search(index=index_name, query=query_high_severity, size=100)
print(f"üî¥ Alertes de haute s√©v√©rit√© (‚â§1): {result['hits']['total']['value']}\n")

for hit in result['hits']['hits']:
    source = hit['_source']
    alert = source.get('alert', {})
    print(f"üî¥ CRITIQUE - S√©v√©rit√© {alert.get('severity', 'N/A')}")
    print(f"   {alert.get('signature', 'N/A')}")
    print(f"   Source IP suspecte: {source.get('src_ip', 'N/A')}")
    print()

üî¥ Alertes de haute s√©v√©rit√© (‚â§1): 0





### 3.4 Agr√©gations : Statistiques sur les alertes

In [20]:
# Agr√©gations pour analyser les donn√©es
aggs_query = {
    "size": 0,
    "aggs": {
        "alertes_par_categorie": {
            "terms": {
                "field": "alert.category",
                "size": 10
            }
        },
        "alertes_par_severite": {
            "terms": {
                "field": "alert.severity",
                "size": 5
            }
        },
        "top_source_ips": {
            "terms": {
                "field": "src_ip",
                "size": 10
            }
        },
        "top_dest_ips": {
            "terms": {
                "field": "dest_ip",
                "size": 10
            }
        },
        "top_signatures": {
            "terms": {
                "field": "alert.signature.keyword",
                "size": 10
            }
        }
    }
}

result = es.search(index=index_name, body=aggs_query)

print("üìä Statistiques sur les alertes\n")
print("=" * 50)

# Alertes par cat√©gorie
if 'alertes_par_categorie' in result['aggregations']:
    print("\nüìÅ Alertes par cat√©gorie:")
    for bucket in result['aggregations']['alertes_par_categorie']['buckets']:
        print(f"   {bucket['key']}: {bucket['doc_count']}")

# Alertes par s√©v√©rit√©
if 'alertes_par_severite' in result['aggregations']:
    print("\n‚ö†Ô∏è  Alertes par s√©v√©rit√©:")
    for bucket in result['aggregations']['alertes_par_severite']['buckets']:
        print(f"   S√©v√©rit√© {bucket['key']}: {bucket['doc_count']}")

# Top IPs sources
if 'top_source_ips' in result['aggregations']:
    print("\nüåê Top 10 IPs sources:")
    for bucket in result['aggregations']['top_source_ips']['buckets']:
        print(f"   {bucket['key']}: {bucket['doc_count']} √©v√©nements")

# Top signatures
if 'top_signatures' in result['aggregations']:
    print("\nüîç Top 10 signatures:")
    for bucket in result['aggregations']['top_signatures']['buckets']:
        print(f"   {bucket['key']}: {bucket['doc_count']}")

üìä Statistiques sur les alertes


üìÅ Alertes par cat√©gorie:

‚ö†Ô∏è  Alertes par s√©v√©rit√©:

üåê Top 10 IPs sources:
   192.168.65.1: 80907 √©v√©nements
   192.168.65.7: 40147 √©v√©nements
   192.168.65.3: 427 √©v√©nements

üîç Top 10 signatures:




## Partie 4 : Visualisation dans Kibana

### 4.1 Acc√®s √† Kibana

1. Ouvrez votre navigateur et allez sur `http://localhost:5601`
2. Connectez-vous avec les identifiants d√©finis dans votre fichier `.env` (si s√©curit√© activ√©e)
3. Si c'est la premi√®re fois, vous devrez peut-√™tre accepter les termes de licence

### 4.2 Cr√©ation d'un index pattern

Pour visualiser les donn√©es dans Kibana, vous devez cr√©er un **index pattern** :

1. Allez dans **Management** ‚Üí **Stack Management** ‚Üí **Index Patterns**
2. Cliquez sur **Create index pattern**
3. Entrez le pattern : `suricata-*` ou `.ds-suricata-*` (selon la version de Filebeat)
   - Filebeat 9.x cr√©e des **data streams** avec le pattern `.ds-suricata-*`
   - Les versions ant√©rieures cr√©ent des index normaux avec `suricata-*`
4. S√©lectionnez le champ de temps : `@timestamp` (ajout√© par Filebeat)
5. Cliquez sur **Create index pattern**

**Note :** 
- Filebeat 9.x utilise des data streams (recommand√© par Elastic)
- Les data streams sont automatiquement g√©r√©s et optimis√©s par Elasticsearch
- Utilisez `suricata-*` ou `.ds-suricata-*` selon ce que Filebeat a cr√©√©

### 4.3 Cr√©ation de visualisations

Dans Kibana, cr√©ez les visualisations suivantes :

#### Visualisation 1 : Timeline des alertes
- **Type** : Line chart ou Area chart
- **M√©trique** : Count
- **Buckets** : Date Histogram sur `@timestamp`
- **Filtre** : `event_type: alert`

#### Visualisation 2 : Top 10 signatures d'alertes
- **Type** : Horizontal Bar chart
- **M√©trique** : Count
- **Buckets** : Terms sur `alert.signature.keyword` (Top 10)

#### Visualisation 3 : R√©partition par s√©v√©rit√©
- **Type** : Pie chart
- **M√©trique** : Count
- **Buckets** : Terms sur `alert.severity`

#### Visualisation 4 : Carte des IPs sources
- **Type** : Coordinate Map ou Tile Map
- **M√©trique** : Count
- **Buckets** : Geohash sur `src_ip` (si g√©olocalisation activ√©e)

### 4.4 Cr√©ation d'un Dashboard

1. Allez dans **Dashboard** ‚Üí **Create dashboard**
2. Ajoutez les visualisations cr√©√©es pr√©c√©demment
3. Configurez les filtres temporels
4. Sauvegardez le dashboard

**Note :** Pour ce TD, nous allons √©galement cr√©er des visualisations programmatiquement via l'API.

In [21]:
# V√©rification de l'acc√®s √† Kibana (via l'API REST)
import requests
import urllib3

# D√©sactiver les avertissements SSL pour le d√©veloppement
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

KIBANA_URL = "http://localhost:5601"
KIBANA_USER = "elastic"  # Modifiez si n√©cessaire
KIBANA_PASSWORD = None  # D√©finissez votre mot de passe si s√©curit√© activ√©e

# Test de connexion √† Kibana
try:
    if KIBANA_PASSWORD:
        response = requests.get(
            f"{KIBANA_URL}/api/status",
            auth=(KIBANA_USER, KIBANA_PASSWORD),
            verify=False,
            timeout=5
        )
    else:
        response = requests.get(f"{KIBANA_URL}/api/status", timeout=5)
    
    if response.status_code == 200:
        print("‚úÖ Kibana est accessible")
        status = response.json()
        print(f"   Version: {status.get('version', {}).get('number', 'N/A')}")
    else:
        print(f"‚ö†Ô∏è  Kibana r√©pond mais avec le code: {response.status_code}")
except requests.exceptions.ConnectionError:
    print("‚ùå Impossible de se connecter √† Kibana")
    print("   V√©rifiez que Kibana est d√©marr√© sur http://localhost:5601")
except Exception as e:
    print(f"‚ö†Ô∏è  Erreur: {e}")

‚úÖ Kibana est accessible
   Version: N/A


## Partie 5 : Alerting dans Kibana

### 5.1 Cr√©ation de r√®gles d'alerte

Kibana permet de cr√©er des r√®gles d'alerte bas√©es sur des requ√™tes Elasticsearch. Voici quelques exemples de r√®gles utiles pour un SIEM :

#### R√®gle 1 : Alerte sur haute s√©v√©rit√©
- **Condition** : Nombre d'alertes avec `severity <= 1` > 0
- **Action** : Envoyer une notification (email, Slack, etc.)

#### R√®gle 2 : Alerte sur scan de ports
- **Condition** : Plus de 10 alertes de type "SCAN" en 5 minutes
- **Action** : Notification imm√©diate

#### R√®gle 3 : Alerte sur IP suspecte
- **Condition** : Plus de 5 alertes depuis la m√™me IP source en 10 minutes
- **Action** : Bloquer l'IP (si int√©gration avec firewall)

### 5.2 Configuration des alertes via l'API

Pour cr√©er des alertes programmatiquement, utilisez l'API Kibana Alerting.

In [22]:
# Exemple de cr√©ation d'une r√®gle d'alerte Kibana
# Note: Cette fonctionnalit√© n√©cessite Kibana avec les fonctionnalit√©s d'alerting activ√©es

def create_alert_rule(rule_name, query, threshold, time_window="5m"):
    """
    Cr√©e une r√®gle d'alerte Kibana
    
    Args:
        rule_name: Nom de la r√®gle
        query: Requ√™te Elasticsearch (dict)
        threshold: Seuil d'alerte
        time_window: Fen√™tre temporelle (ex: "5m", "1h")
    """
    alert_rule = {
        "name": rule_name,
        "consumer": "alerts",
        "enabled": True,
        "rule_type_id": ".es-query",
        "schedule": {
            "interval": "1m"
        },
        "actions": [
            {
                "group": "query matched",
                "id": "my-connector-id",  # √Ä remplacer par un ID de connecteur r√©el
                "params": {
                    "message": f"Alerte: {rule_name} - Seuil d√©pass√©!"
                }
            }
        ],
        "params": {
            "index": ["suricata-*"],
            "query": query,
            "size": 100,
            "timeField": "@timestamp",
            "timeWindowSize": time_window,
            "threshold": [
                {
                    "field": "count",
                    "value": threshold,
                    "comparator": ">"
                }
            ]
        }
    }
    
    return alert_rule

# Exemple: R√®gle d'alerte pour haute s√©v√©rit√©
high_severity_query = {
    "bool": {
        "must": [
            {"term": {"event_type": "alert"}},
            {"range": {"alert.severity": {"lte": 1}}}
        ]
    }
}

alert_rule = create_alert_rule(
    rule_name="Alerte haute s√©v√©rit√© Suricata",
    query=high_severity_query,
    threshold=0,
    time_window="5m"
)

print("üìã Exemple de r√®gle d'alerte cr√©√©e:")
print(json.dumps(alert_rule, indent=2))

print("\nüí° Pour cr√©er cette r√®gle dans Kibana:")
print("   1. Allez dans Stack Management ‚Üí Rules and Connectors")
print("   2. Cr√©ez un connecteur (email, Slack, etc.)")
print("   3. Cr√©ez une nouvelle r√®gle avec les param√®tres ci-dessus")

üìã Exemple de r√®gle d'alerte cr√©√©e:
{
  "name": "Alerte haute s\u00e9v\u00e9rit\u00e9 Suricata",
  "consumer": "alerts",
  "enabled": true,
  "rule_type_id": ".es-query",
  "schedule": {
    "interval": "1m"
  },
  "actions": [
    {
      "group": "query matched",
      "id": "my-connector-id",
      "params": {
        "message": "Alerte: Alerte haute s\u00e9v\u00e9rit\u00e9 Suricata - Seuil d\u00e9pass\u00e9!"
      }
    }
  ],
  "params": {
    "index": [
      "suricata-*"
    ],
    "query": {
      "bool": {
        "must": [
          {
            "term": {
              "event_type": "alert"
            }
          },
          {
            "range": {
              "alert.severity": {
                "lte": 1
              }
            }
          }
        ]
      }
    },
    "size": 100,
    "timeField": "@timestamp",
    "timeWindowSize": "5m",
    "threshold": [
      {
        "field": "count",
        "value": 0,
        "comparator": ">"
      }
    ]
  }
}



In [23]:
# D√©tection de patterns suspects

# 1. D√©tection d'IP source avec beaucoup d'alertes (possible scan/attaque)
def detect_suspicious_source_ips(min_alerts=3):
    """D√©tecte les IPs sources avec un nombre √©lev√© d'alertes"""
    query = {
        "size": 0,
        "query": {
            "term": {"event_type": "alert"}
        },
        "aggs": {
            "suspicious_ips": {
                "terms": {
                    "field": "src_ip",
                    "size": 20,
                    "min_doc_count": min_alerts
                },
                "aggs": {
                    "alert_details": {
                        "top_hits": {
                            "size": 5,
                            "_source": {
                                "includes": ["alert.signature", "alert.severity", "dest_ip", "dest_port"]
                            }
                        }
                    }
                }
            }
        }
    }
    
    result = es.search(index=index_name, body=query)
    
    print(f"üîç IPs sources suspectes (‚â•{min_alerts} alertes):\n")
    for bucket in result['aggregations']['suspicious_ips']['buckets']:
        ip = bucket['key']
        count = bucket['doc_count']
        print(f"‚ö†Ô∏è  {ip}: {count} alertes")
        
        # Afficher les d√©tails des alertes
        for hit in bucket['alert_details']['hits']['hits']:
            source = hit['_source']
            alert = source.get('alert', {})
            print(f"   - {alert.get('signature', 'N/A')} (S√©v√©rit√©: {alert.get('severity', 'N/A')})")
        print()
    
    return result

# 2. D√©tection d'alertes r√©centes (derni√®res 30 minutes)
def detect_recent_alerts(minutes=30):
    """D√©tecte les alertes r√©centes"""
    time_threshold = datetime.now() - timedelta(minutes=minutes)
    
    query = {
        "bool": {
            "must": [
                {"term": {"event_type": "alert"}}
            ],
            "filter": [
                {
                    "range": {
                        "@timestamp": {
                            "gte": time_threshold.isoformat()
                        }
                    }
                }
            ]
        }
    }
    
    result = es.search(index=index_name, query=query, size=100, sort=[{"@timestamp": {"order": "desc"}}])
    
    print(f"üïê Alertes des derni√®res {minutes} minutes: {result['hits']['total']['value']}\n")
    
    for hit in result['hits']['hits']:
        source = hit['_source']
        alert = source.get('alert', {})
        timestamp = source.get('@timestamp', source.get('timestamp', 'N/A'))
        print(f"‚è∞ {timestamp}")
        print(f"   {alert.get('signature', 'N/A')}")
        print(f"   {source.get('src_ip', 'N/A')} ‚Üí {source.get('dest_ip', 'N/A')}")
        print()
    
    return result

# Ex√©cuter les d√©tections
print("=" * 60)
detect_suspicious_source_ips(min_alerts=1)
print("\n" + "=" * 60)
detect_recent_alerts(minutes=60)

üîç IPs sources suspectes (‚â•1 alertes):


üïê Alertes des derni√®res 60 minutes: 0



ObjectApiResponse({'took': 9, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}})

In [24]:
# Exemple de requ√™te pour analyser les patterns temporels (utile pour ML)
# Cette requ√™te peut √™tre utilis√©e comme base pour un job ML

def analyze_temporal_patterns():
    """Analyse les patterns temporels dans les alertes"""
    
    # Agr√©gation par heure de la journ√©e
    query = {
        "size": 0,
        "query": {
            "term": {"event_type": "alert"}
        },
        "aggs": {
            "alerts_by_hour": {
                "date_histogram": {
                    "field": "@timestamp",
                    "calendar_interval": "hour",
                    "min_doc_count": 1
                },
                "aggs": {
                    "by_severity": {
                        "terms": {
                            "field": "alert.severity",
                            "size": 5
                        }
                    },
                    "unique_ips": {
                        "cardinality": {
                            "field": "src_ip"
                        }
                    }
                }
            },
            "alerts_by_day": {
                "date_histogram": {
                    "field": "@timestamp",
                    "calendar_interval": "day",
                    "min_doc_count": 1
                }
            }
        }
    }
    
    result = es.search(index=index_name, body=query)
    
    print("üìà Analyse des patterns temporels\n")
    
    # Afficher les alertes par heure
    if 'alerts_by_hour' in result['aggregations']:
        print("üïê Distribution des alertes par heure:")
        for bucket in result['aggregations']['alerts_by_hour']['buckets']:
            time_str = bucket['key_as_string']
            count = bucket['doc_count']
            unique_ips = bucket['unique_ips']['value']
            print(f"   {time_str}: {count} alertes ({unique_ips} IPs uniques)")
    
    print(f"\nüìä Total d'alertes analys√©es: {result['hits']['total']['value']}")
    
    return result

# Ex√©cuter l'analyse
try:
    analyze_temporal_patterns()
except Exception as e:
    print(f"‚ö†Ô∏è  Erreur lors de l'analyse: {e}")
    print("   Assurez-vous que les donn√©es contiennent des timestamps valides")

üìà Analyse des patterns temporels

üïê Distribution des alertes par heure:

üìä Total d'alertes analys√©es: 0


### 6.4 D√©tection d'anomalies basique (sans ML)

M√™me sans activer ML, nous pouvons cr√©er des r√®gles de d√©tection d'anomalies basiques.

In [25]:
# D√©tection d'anomalies basique : identifier les comportements inhabituels

def detect_anomalies():
    """D√©tecte des anomalies basiques dans les logs"""
    
    anomalies = []
    
    # 1. IP source avec trop d'alertes diff√©rentes (possible botnet/scan)
    query1 = {
        "size": 0,
        "query": {"term": {"event_type": "alert"}},
        "aggs": {
            "ips_with_many_alerts": {
                "terms": {
                    "field": "src_ip",
                    "size": 10
                },
                "aggs": {
                    "unique_signatures": {
                        "cardinality": {
                            "field": "alert.signature_id"
                        }
                    },
                    "unique_dest_ips": {
                        "cardinality": {
                            "field": "dest_ip"
                        }
                    }
                }
            }
        }
    }
    
    result1 = es.search(index=index_name, body=query1)
    
    print("üîç D√©tection d'anomalies basiques\n")
    print("=" * 60)
    
    # Analyser les r√©sultats
    for bucket in result1['aggregations']['ips_with_many_alerts']['buckets']:
        ip = bucket['key']
        total_alerts = bucket['doc_count']
        unique_signatures = bucket['unique_signatures']['value']
        unique_dest_ips = bucket['unique_dest_ips']['value']
        
        # Crit√®res d'anomalie
        if unique_signatures > 3 or unique_dest_ips > 5:
            anomaly = {
                "type": "Suspicious scanning activity",
                "ip": ip,
                "total_alerts": total_alerts,
                "unique_signatures": unique_signatures,
                "unique_dest_ips": unique_dest_ips
            }
            anomalies.append(anomaly)
            
            print(f"üö® ANOMALIE D√âTECT√âE:")
            print(f"   IP: {ip}")
            print(f"   Type: Activit√© de scan suspecte")
            print(f"   Raison: {unique_signatures} signatures diff√©rentes, {unique_dest_ips} IPs de destination diff√©rentes")
            print(f"   Total alertes: {total_alerts}")
            print()
    
    # 2. Alertes de haute s√©v√©rit√© r√©centes
    query2 = {
        "bool": {
            "must": [
                {"term": {"event_type": "alert"}},
                {"range": {"alert.severity": {"lte": 1}}}
            ],
            "filter": [
                {
                    "range": {
                        "@timestamp": {
                            "gte": (datetime.now() - timedelta(hours=1)).isoformat()
                        }
                    }
                }
            ]
        }
    }
    
    result2 = es.search(index=index_name, query=query2, size=100)
    
    if result2['hits']['total']['value'] > 0:
        print("üö® ALERTES CRITIQUES R√âCENTES:")
        for hit in result2['hits']['hits']:
            source = hit['_source']
            alert = source.get('alert', {})
            print(f"   - {alert.get('signature', 'N/A')}")
            print(f"     Source: {source.get('src_ip', 'N/A')}")
            print(f"     S√©v√©rit√©: {alert.get('severity', 'N/A')}")
        print()
    
    return anomalies

# Ex√©cuter la d√©tection
anomalies = detect_anomalies()

if not anomalies:
    print("‚úÖ Aucune anomalie majeure d√©tect√©e avec les crit√®res actuels")

üîç D√©tection d'anomalies basiques

‚úÖ Aucune anomalie majeure d√©tect√©e avec les crit√®res actuels


## Partie 7 : Exercices pratiques

### Exercice 1 : Recherche d'IPs malveillantes

Cr√©ez une requ√™te pour trouver toutes les alertes provenant d'une IP sp√©cifique et analysez les signatures d√©clench√©es.

### Exercice 2 : Analyse de trafic HTTP

Recherchez tous les √©v√©nements HTTP et identifiez les requ√™tes suspectes (ex: tentatives d'acc√®s √† `/admin`, `/wp-admin`, etc.).

### Exercice 3 : Dashboard personnalis√©

Cr√©ez un dashboard Kibana avec :
- Timeline des alertes
- Top 10 des signatures
- Carte des IPs sources
- Graphique de s√©v√©rit√©

### Exercice 4 : R√®gle d'alerte personnalis√©e

Cr√©ez une r√®gle d'alerte qui se d√©clenche lorsqu'une IP g√©n√®re plus de 5 alertes en 10 minutes.

### Exercice 5 : D√©tection de patterns

√âcrivez une requ√™te pour d√©tecter les scans de ports (plusieurs tentatives de connexion sur diff√©rents ports depuis la m√™me IP).

In [26]:
# EXERCICE 1 : Recherche d'IPs malveillantes
# Compl√©tez cette fonction pour rechercher toutes les alertes d'une IP sp√©cifique

def search_alerts_by_ip(ip_address):
    """
    Recherche toutes les alertes pour une IP source donn√©e
    
    Args:
        ip_address: Adresse IP √† rechercher (ex: "192.168.1.100")
    
    Returns:
        R√©sultats de la recherche
    """
    # TODO: Cr√©er la requ√™te Elasticsearch
    query = {
        "bool": {
            "must": [
                {"term": {"event_type": "alert"}},
                {"term": {"src_ip": ip_address}}
            ]
        }
    }
    
    result = es.search(index=index_name, query=query, size=100)
    
    print(f"üîç Alertes pour l'IP {ip_address}: {result['hits']['total']['value']}\n")
    
    signatures = {}
    for hit in result['hits']['hits']:
        source = hit['_source']
        alert = source.get('alert', {})
        sig_id = alert.get('signature_id', 'N/A')
        sig_name = alert.get('signature', 'N/A')
        
        if sig_id not in signatures:
            signatures[sig_id] = {
                "name": sig_name,
                "count": 0,
                "severity": alert.get('severity', 'N/A')
            }
        signatures[sig_id]["count"] += 1
    
    print("üìã Signatures d√©clench√©es:")
    for sig_id, info in signatures.items():
        print(f"   [{sig_id}] {info['name']}: {info['count']} fois (S√©v√©rit√©: {info['severity']})")
    
    return result

# Test avec une IP de test
# R√©cup√©rer d'abord quelques alertes pour obtenir une IP de test
test_query = {
    "bool": {
        "must": [
            {"term": {"event_type": "alert"}}
        ]
    }
}
test_result = es.search(index=index_name, query=test_query, size=1)

if test_result['hits']['total']['value'] > 0:
    test_ip = test_result['hits']['hits'][0]['_source'].get('src_ip')
    if test_ip:
        print(f"\nüß™ Test avec l'IP: {test_ip}\n")
        search_alerts_by_ip(test_ip)
    else:
        print("‚ö†Ô∏è  Aucune IP source trouv√©e dans les alertes")
else:
    print("‚ö†Ô∏è  Aucune alerte trouv√©e pour le test")

‚ö†Ô∏è  Aucune alerte trouv√©e pour le test


In [27]:
# EXERCICE 2 : Analyse de trafic HTTP suspect
# Recherchez les requ√™tes HTTP suspectes

def find_suspicious_http_requests():
    """Recherche les requ√™tes HTTP suspectes"""
    
    # URLs suspectes communes
    suspicious_paths = ["/admin", "/wp-admin", "/phpmyadmin", "/.env", "/config.php"]
    
    query = {
        "bool": {
            "must": [
                {"term": {"event_type": "http"}}
            ],
            "should": [
                {"wildcard": {"http.url": f"*{path}*"}} for path in suspicious_paths
            ],
            "minimum_should_match": 1
        }
    }
    
    result = es.search(index=index_name, query=query, size=100)
    
    print(f"üö® Requ√™tes HTTP suspectes trouv√©es: {result['hits']['total']['value']}\n")
    
    for hit in result['hits']['hits']:
        source = hit['_source']
        http = source.get('http', {})
        print(f"‚ö†Ô∏è  {source.get('src_ip', 'N/A')} ‚Üí {http.get('hostname', 'N/A')}{http.get('url', 'N/A')}")
        print(f"   M√©thode: {http.get('http_method', 'N/A')}, Status: {http.get('status', 'N/A')}")
        print()
    
    return result

# Ex√©cuter la recherche
try:
    find_suspicious_http_requests()
except Exception as e:
    print(f"‚ö†Ô∏è  Erreur: {e}")
    print("   V√©rifiez que vous avez des √©v√©nements HTTP dans vos logs")

üö® Requ√™tes HTTP suspectes trouv√©es: 0



## Partie 8 : R√©sum√© et bonnes pratiques

### Bonnes pratiques pour un SIEM

1. **Indexation optimale**
   - Utilisez des index avec rotation temporelle (ex: `suricata-YYYY.MM.DD`)
   - Configurez des index templates pour automatiser la cr√©ation
   - D√©finissez des politiques de r√©tention (ILM - Index Lifecycle Management)

2. **Mapping des champs**
   - D√©finissez correctement les types de champs (IP, date, keyword, text)
   - Utilisez des champs `keyword` pour les agr√©gations
   - Utilisez des champs `text` pour la recherche full-text

3. **S√©curit√©**
   - Activez la s√©curit√© Elasticsearch en production
   - Utilisez des certificats SSL/TLS
   - Configurez des r√¥les et permissions appropri√©s

4. **Performance**
   - Monitorer la sant√© du cluster
   - Ajuster le nombre de shards selon le volume de donn√©es
   - Utiliser des alias pour les index

5. **Alerting**
   - Cr√©ez des r√®gles d'alerte pertinentes (pas trop, pas trop peu)
   - Testez r√©guli√®rement les alertes
   - Documentez les proc√©dures de r√©ponse aux incidents

6. **Maintenance**
   - Surveillez les logs Elasticsearch et Kibana
   - Effectuez des sauvegardes r√©guli√®res
   - Mettez √† jour r√©guli√®rement les r√®gles Suricata

### Ressources suppl√©mentaires

- **Documentation Suricata** : https://suricata.readthedocs.io/
- **Documentation Filebeat** : https://www.elastic.co/guide/en/beats/filebeat/current/index.html
- **Documentation Elasticsearch** : https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
- **Documentation Kibana** : https://www.elastic.co/guide/en/kibana/current/index.html
- **Elastic Security** : https://www.elastic.co/security

### Prochaines √©tapes

1. Int√©grer d'autres sources de logs (Apache, Nginx, Windows Event Logs, etc.)
2. Configurer des enrichissements (g√©olocalisation IP, threat intelligence)
3. Cr√©er des playbooks de r√©ponse aux incidents
4. Mettre en place une stack compl√®te avec Logstash si n√©cessaire
5. Explorer Elastic Security (anciennement SIEM) pour des fonctionnalit√©s avanc√©es

In [19]:
# Fonction utilitaire : Nettoyage des ressources
def cleanup():
    """Supprime l'index de test (optionnel)"""
    response = input("Voulez-vous supprimer l'index 'suricata-logs'? (oui/non): ")
    if response.lower() in ['oui', 'o', 'yes', 'y']:
        if es.indices.exists(index=index_name):
            es.indices.delete(index=index_name)
            print(f"‚úÖ Index '{index_name}' supprim√©")
        else:
            print(f"‚ÑπÔ∏è  L'index '{index_name}' n'existe pas")
    else:
        print("‚ÑπÔ∏è  Index conserv√©")

# D√©commenter pour nettoyer
# cleanup()

print("\n" + "=" * 60)
print("‚úÖ TD termin√©!")
print("=" * 60)
print("\nüìö N'oubliez pas de:")
print("   1. Explorer Kibana pour cr√©er vos visualisations")
print("   2. Configurer des alertes dans Kibana")
print("   3. Tester avec de vrais logs Suricata")
print("   4. Documenter vos r√®gles et proc√©dures")
print("\nüéì Bonne continuation avec votre SIEM!")


‚úÖ TD termin√©!

üìö N'oubliez pas de:
   1. Explorer Kibana pour cr√©er vos visualisations
   2. Configurer des alertes dans Kibana
   3. Tester avec de vrais logs Suricata
   4. Documenter vos r√®gles et proc√©dures

üéì Bonne continuation avec votre SIEM!
