<a href="https://colab.research.google.com/github/doriantino/Audit-IT-Controles-Acces-PAM-SoD/blob/main/Audit_controle_Access_II.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Importation des bibliothèques et outils

In [2]:
import sqlite3 # pour intéragir les las bases de données SQLites
import pandas as pd # bibliothèque pour effectuer des annalyses de données
from datetime import datetime, timedelta # pour simuler des durée et intervalles de temps d'utilisation des applications
import random # pour générer des données aléatoires car on est dans le cadre d'une simulation
import os # pour faire des opérations sur les fichiers et dossiers

# --- Configuration de la base de données ---
DB_NAME = 'iam_audit.db' # variable qui contient le nom du fichier de notre db et aue nous utiliserons partout au besoin

### Configuration de la base de données

In [3]:
def setup_database():
    """
    Crée une base de données SQLite et y définit les tables nécessaires
    pour simuler un système IAM (Gestion des Identités et des Accès).
    Ces tables représentent les types de données qu'un auditeur pourrait
    s'attendre à extraire d'un système IAM réel.
    """
    # Se connecte à la base de données. Si le fichier n'existe pas, il est créé.
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor() # Un "cursor" (curseur) permet d'envoyer des commandes SQL à la base de données.

    # --- Table Users (Utilisateurs) ---
    # Contient les informations de base sur chaque utilisateur.
    # C'est du VRAI SQL ! Les mêmes commandes fonctionneraient sur d'autres bases de données.
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Users (
            UserID TEXT PRIMARY KEY,           -- Identifiant unique de l'utilisateur (clé primaire)
            Username TEXT NOT NULL UNIQUE,     -- Nom d'utilisateur (doit être unique et ne peut pas être vide)
            Email TEXT UNIQUE,                 -- Adresse email (doit être unique)
            Department TEXT,                   -- Département de l'utilisateur
            Status TEXT,                       -- Statut du compte (Ex: Active, Inactive, Locked)
            CreationDate TEXT,                 -- Date de création du compte
            LastLoginDate TEXT,                -- Date de la dernière connexion
            PasswordLastChanged TEXT           -- Date du dernier changement de mot de passe
        )
    ''')

    # --- Table Roles (Rôles) ---
    # Définit les différents rôles qui existent dans le système (ex: Administrateur, Comptable).
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Roles (
            RoleID INTEGER PRIMARY KEY AUTOINCREMENT, -- Identifiant unique du rôle (numéro qui s'incrémente automatiquement)
            RoleName TEXT NOT NULL UNIQUE,            -- Nom du rôle (doit être unique et non vide)
            Description TEXT                          -- Description du rôle
        )
    ''')

    # --- Table UserRoles (Rôles des Utilisateurs) ---
    # Fait le lien entre les utilisateurs et les rôles qui leur sont attribués.
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS UserRoles (
            UserRoleID INTEGER PRIMARY KEY AUTOINCREMENT, -- Identifiant unique de l'attribution de rôle
            UserID TEXT NOT NULL,                         -- L'ID de l'utilisateur concerné
            RoleID INTEGER NOT NULL,                      -- L'ID du rôle attribué
            AssignmentDate TEXT,                          -- Date d'attribution du rôle
            ExpiryDate TEXT,                              -- Date d'expiration du rôle (pour les accès temporaires)
            FOREIGN KEY (UserID) REFERENCES Users(UserID), -- Assure que UserID existe dans la table Users
            FOREIGN KEY (RoleID) REFERENCES Roles(RoleID)   -- Assure que RoleID existe dans la table Roles
        )
    ''')

    # --- Table Permissions (Permissions) ---
    # Définit les actions granulaires que les rôles peuvent accorder (ex: Lire, Modifier).
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS Permissions (
            PermissionID INTEGER PRIMARY KEY AUTOINCREMENT,   -- Identifiant unique de la permission
            PermissionName TEXT NOT NULL UNIQUE,              -- Nom de la permission
            Description TEXT                                  -- Description de la permission
        )
    ''')

    # --- Table RolePermissions (Permissions des Rôles) ---
    # Fait le lien entre les rôles et les permissions qu'ils octroient.
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS RolePermissions (
            RolePermissionID INTEGER PRIMARY KEY AUTOINCREMENT, -- Identifiant unique du lien rôle-permission
            RoleID INTEGER NOT NULL,                            -- L'ID du rôle
            PermissionID INTEGER NOT NULL,                      -- L'ID de la permission
            FOREIGN KEY (RoleID) REFERENCES Roles(RoleID),     -- Assure que RoleID existe dans la table Roles
            FOREIGN KEY (PermissionID) REFERENCES Permissions(PermissionID) -- Assure que PermissionID existe dans la table Permissions
        )
    ''')

    # --- Table AccessLogs (Journaux d'Accès) ---
    # Enregistre toutes les activités importantes des utilisateurs.
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS AccessLogs (
            LogID INTEGER PRIMARY KEY AUTOINCREMENT,   -- Identifiant unique du log
            UserID TEXT NOT NULL,                      -- L'ID de l'utilisateur qui a effectué l'action
            EventType TEXT NOT NULL,                   -- Type d'événement (Ex: Login, Logout, AccessDenied, RoleChange)
            Timestamp TEXT NOT NULL,                   -- Date et heure de l'événement
            SourceIP TEXT,                             -- Adresse IP d'où provient l'accès
            Result TEXT,                               -- Résultat de l'action (Ex: Success, Failure)
            Description TEXT,                          -- Description détaillée de l'événement
            FOREIGN KEY (UserID) REFERENCES Users(UserID) -- Assure que UserID existe dans la table Users
        )
    ''')

    conn.commit() # Très important : Valide (enregistre) tous les changements dans la base de données.
    conn.close() # Ferme la connexion à la base de données.
    print("Base de données SQLite et tables configurées avec succès.")

### Exécution de la création de la base de données

In [4]:
if __name__ == "__main__":
    # Ceci est la partie du script qui sera exécutée lorsque vous lancerez ce fichier.

    # Vérifie si le fichier de la base de données existe déjà.
    # Si oui, il le supprime pour repartir de zéro à chaque exécution,
    # ce qui est pratique pour le développement.
    if os.path.exists(DB_NAME):
        os.remove(DB_NAME)
        print(f"Ancienne base de données '{DB_NAME}' supprimée pour un nouveau départ.")

    setup_database() # Appel de notre fonction pour créer la base de données et les tables.

    print(f"\nLe fichier de base de données '{DB_NAME}' a été créé dans l'environnement Colab.")
    print("Il ne contient pas encore de données, juste les structures de tables vides.")


Base de données SQLite et tables configurées avec succès.

Le fichier de base de données 'iam_audit.db' a été créé dans l'environnement Colab.
Il ne contient pas encore de données, juste les structures de tables vides.


### Insertion des données

In [5]:
def generate_synthetic_data(num_users=150, num_logs_per_user=70):
    """
    Génère et insère des données synthétiques dans les tables Users, Roles, UserRoles,
    Permissions, RolePermissions et AccessLogs de la base de données SQLite.
    Ces données sont conçues pour simuler des scénarios d'audit IAM (Gestion des Identités et des Accès).
    """
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()

    # Définition de la période pour les dates des événements
    start_date = datetime(2024, 1, 1)
    end_date = datetime(2024, 12, 31)

    # 1. Génération et insertion des rôles (si non déjà insérés)
    roles_data = [
        ('Standard User', 'Accès de base aux applications.'),
        ('Finance Analyst', 'Accès aux données financières et rapports.'),
        ('HR Manager', 'Gestion des données des employés.'),
        ('IT Administrator', 'Administration des systèmes et réseaux.'),
        ('Project Manager', 'Gestion des projets et ressources.'),
        ('External Auditor', 'Accès temporaire pour audit externe.'),
        ('Senior Developer', 'Développement et déploiement de code.'),
        ('Service Desk', 'Support technique de premier niveau.'),
        ('Executive', 'Accès de haut niveau aux rapports stratégiques.'),
        ('Marketing Specialist', 'Gestion des campagnes marketing.')
    ]
    # INSERT OR IGNORE: insère seulement si le RoleName n'existe pas déjà
    cursor.executemany("INSERT OR IGNORE INTO Roles (RoleName, Description) VALUES (?, ?)", roles_data)

    # Récupérer les rôles générés avec leurs IDs pour les liaisons futures
    cursor.execute("SELECT RoleID, RoleName FROM Roles")
    roles_from_db = cursor.fetchall() # Récupère tous les rôles et leurs IDs de la DB
    role_map = {name: id for id, name in roles_from_db} # Crée un dictionnaire {Nom du rôle: ID du rôle}

    # 2. Génération et insertion des permissions (si non déjà insérées)
    permissions_data = [
        ('Read Document', 'Permet de lire les documents.'),
        ('Edit Document', 'Permet de modifier les documents.'),
        ('Approve Transaction', 'Permet d\'approuver des transactions financières.'),
        ('Manage Users', 'Permet de créer, modifier ou supprimer des utilisateurs.'),
        ('Access Admin Panel', 'Permet d\'accéder au panneau d\'administration système.'),
        ('View Reports', 'Permet de visualiser des rapports.'),
        ('Create Invoice', 'Permet de créer une facture.'),
        ('Execute Payment', 'Permet d\'exécuter un paiement.'),
        ('Create Supplier', 'Permet de créer un nouveau fournisseur.')
    ]
    cursor.executemany("INSERT OR IGNORE INTO Permissions (PermissionName, Description) VALUES (?, ?)", permissions_data)

    # Récupérer les permissions générées avec leurs IDs
    cursor.execute("SELECT PermissionID, PermissionName FROM Permissions")
    permissions_from_db = cursor.fetchall()
    perm_map = {name: id for id, name in permissions_from_db} # Crée un dictionnaire {Nom permission: ID permission}

    # 3. Liaison RolePermissions: Définir quelles permissions chaque rôle possède
    # C'est ici que nous configurons les droits de base pour chaque rôle.
    role_permission_links = []

    # IT Administrator: Peut gérer les utilisateurs et accéder au panneau admin
    if 'IT Administrator' in role_map:
        for p_name in ['Manage Users', 'Access Admin Panel', 'View Reports']:
            if p_name in perm_map:
                role_permission_links.append((role_map['IT Administrator'], perm_map[p_name]))
    # Finance Analyst: Peut approuver des transactions et créer des factures
    if 'Finance Analyst' in role_map:
        for p_name in ['View Reports', 'Approve Transaction', 'Create Invoice']:
            if p_name in perm_map:
                role_permission_links.append((role_map['Finance Analyst'], perm_map[p_name]))
    # HR Manager: Peut lire et modifier des documents, et gérer des utilisateurs
    if 'HR Manager' in role_map:
        for p_name in ['Read Document', 'Edit Document', 'Manage Users']:
            if p_name in perm_map:
                role_permission_links.append((role_map['HR Manager'], perm_map[p_name]))
    # Standard User: Accès de base, peut lire des documents
    if 'Standard User' in role_map:
        if 'Read Document' in perm_map:
            role_permission_links.append((role_map['Standard User'], perm_map['Read Document']))

    # !!! IMPORTANT pour l'audit SoD !!!
    # Simuler un conflit : Un Finance Analyst a aussi la permission de créer un fournisseur ET d'exécuter un paiement.
    # Ceci sera un scénario que notre audit SoD devra détecter.
    if 'Finance Analyst' in role_map and 'Create Supplier' in perm_map and 'Execute Payment' in perm_map:
         # Pour l'exemple, nous attribuons ces permissions à ce rôle.
         role_permission_links.append((role_map['Finance Analyst'], perm_map['Execute Payment']))
         role_permission_links.append((role_map['Finance Analyst'], perm_map['Create Supplier']))

    cursor.executemany("INSERT OR IGNORE INTO RolePermissions (RoleID, PermissionID) VALUES (?, ?)", role_permission_links)


    # 4. Génération et insertion des utilisateurs
    departments = ['IT', 'Finance', 'HR', 'Sales', 'Marketing', 'Legal']
    user_ids = [] # Liste pour garder les UserID générés
    for i in range(num_users):
        user_id = f"U{i:05d}" # Ex: U00001
        username = f"user_{random.choice(departments).lower()}_{i}"
        email = f"{username}@example.com"
        department = random.choice(departments)
        status = random.choice(['Active', 'Active', 'Active', 'Inactive']) # Plus de comptes actifs

        # Génération des dates cohérentes
        creation_date = (start_date + timedelta(days=random.randint(0, 364))).strftime('%Y-%m-%d')

        # Si le compte est actif, la dernière connexion est plus récente que la création
        last_login_date = None
        if status == 'Active':
            creation_dt = datetime.strptime(creation_date, '%Y-%m-%d')
            last_login_date = (creation_dt + timedelta(days=random.randint(0, (end_date - creation_dt).days))).strftime('%Y-%m-%d')

        # Le mot de passe a été changé après la création
        password_last_changed = (datetime.strptime(creation_date, '%Y-%m-%d') + timedelta(days=random.randint(0, 90))).strftime('%Y-%m-%d')

        user_ids.append(user_id) # Ajoute l'ID utilisateur à notre liste
        try: # Ajout d'un bloc try-except pour une insertion plus robuste des utilisateurs
            cursor.execute(
                "INSERT INTO Users (UserID, Username, Email, Department, Status, CreationDate, LastLoginDate, PasswordLastChanged) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
                (user_id, username, email, department, status, creation_date, last_login_date, password_last_changed)
            )
        except sqlite3.IntegrityError as e:
            print(f"Erreur d'insertion utilisateur {user_id}: {e}. Le compte existe peut-être déjà.")
            continue # Passe à l'utilisateur suivant en cas d'erreur


    # 5. Attribuer des rôles aux utilisateurs
    for user_id in user_ids:
        # Attribuer un rôle principal basé sur le département de l'utilisateur
        cursor.execute("SELECT Department FROM Users WHERE UserID = ?", (user_id,))
        user_dept_data = cursor.fetchone() # Récupère la ligne de données pour le département

        if not user_dept_data: # Si l'utilisateur n'a pas été trouvé (devrait pas arriver si insert a marché)
            print(f"Attention: Utilisateur {user_id} non trouvé pour la recherche du département. Saut de l'attribution de rôle.")
            continue # Passe à l'utilisateur suivant

        dept = user_dept_data[0] # Extrait le département de la ligne récupérée

        primary_role_name = 'Standard User'
        if dept == 'IT': primary_role_name = 'IT Administrator'
        elif dept == 'Finance': primary_role_name = 'Finance Analyst'
        elif dept == 'HR': primary_role_name = 'HR Manager'
        elif dept == 'Marketing': primary_role_name = 'Marketing Specialist'

        primary_role_id = role_map.get(primary_role_name)
        if primary_role_id:
            assignment_date = (start_date + timedelta(days=random.randint(0, 364))).strftime('%Y-%m-%d')
            try: # Ajout d'un bloc try-except pour une insertion plus robuste des UserRoles
                cursor.execute(
                    "INSERT INTO UserRoles (UserID, RoleID, AssignmentDate) VALUES (?, ?, ?)",
                    (user_id, primary_role_id, assignment_date)
                )
            except sqlite3.IntegrityError as e:
                print(f"Erreur d'insertion de rôle ({primary_role_name}) pour {user_id}: {e}. Le rôle est peut-être déjà attribué.")
                # Continue même si l'insertion échoue, car d'autres rôles peuvent être ajoutés

        # Simuler quelques utilisateurs avec des rôles supplémentaires (potentiellement conflictuels)
        # pour que notre audit ait des choses à trouver.
        if random.random() < 0.05: # Environ 5% des utilisateurs auront un rôle supplémentaire
            extra_role_name = random.choice([r for r in role_map.keys() if r != primary_role_name])
            extra_role_id = role_map.get(extra_role_name)
            if extra_role_id:
                 assignment_date = (start_date + timedelta(days=random.randint(0, 364))).strftime('%Y-%m-%d')
                 try: # Ajout d'un bloc try-except ici aussi
                     cursor.execute(
                        "INSERT INTO UserRoles (UserID, RoleID, AssignmentDate) VALUES (?, ?, ?)",
                        (user_id, extra_role_id, assignment_date)
                    )
                 except sqlite3.IntegrityError as e:
                    print(f"Erreur d'insertion de rôle supplémentaire ({extra_role_name}) pour {user_id}: {e}.")
                    # Continue

    # 6. Génération des logs d'accès
    event_types = ['Login', 'Logout', 'AccessDenied', 'RoleChange', 'PasswordChange', 'ResourceAccess']
    results = ['Success', 'Failure']

    for user_id in user_ids:
        cursor.execute("SELECT Status, LastLoginDate FROM Users WHERE UserID = ?", (user_id,))
        user_data = cursor.fetchone() # Récupère la ligne de données pour l'utilisateur

        if not user_data: # Si l'utilisateur n'a pas été trouvé
            print(f"Attention: Utilisateur {user_id} non trouvé dans la table Users. Saut de la génération de logs.")
            continue # Passe à l'utilisateur suivant

        user_status, last_login_str = user_data # Extrait le statut et la dernière date de connexion
        current_user_last_login = datetime.strptime(last_login_str, '%Y-%m-%d') if last_login_str else None

        for _ in range(num_logs_per_user):
            event_time = start_date + timedelta(days=random.randint(0, (end_date - start_date).days),
                                                hours=random.randint(0, 23),
                                                minutes=random.randint(0, 59))

            # Pour les utilisateurs inactifs, réduisons la probabilité d'activités "normales"
            if user_status == 'Inactive' and current_user_last_login and event_time.date() > current_user_last_login.date():
                if random.random() < 0.8: # 80% de chance de ne pas avoir d'activité après sa "désactivation"
                    continue

            event_type = random.choice(event_types)
            source_ip = f"192.168.{random.randint(1, 254)}.{random.randint(1, 254)}"
            result = random.choice(results)
            description = f"Event {event_type} for {user_id}"

            # Simuler des logs d'échec de connexion pour les utilisateurs inactifs (pour l'audit)
            if user_status == 'Inactive' and event_type == 'Login' and random.random() < 0.3:
                result = 'Failure'
                description = f"Login attempt failed for inactive user {user_id}"

            # Simuler un changement de rôle par un admin (pour l'audit)
            if event_type == 'RoleChange':
                cursor.execute("SELECT R.RoleName FROM UserRoles UR JOIN Roles R ON UR.RoleID = R.RoleID WHERE UR.UserID = ?", (user_id,))
                current_roles = [r[0] for r in cursor.fetchall()]
                if 'IT Administrator' in current_roles:
                    description = f"IT Admin {user_id} changed role for another user (simulated)"
                else:
                    description = f"Role change event for user {user_id} (simulated)"

            try: # Ajout d'un bloc try-except pour une insertion plus robuste des AccessLogs
                cursor.execute(
                    "INSERT INTO AccessLogs (UserID, EventType, Timestamp, SourceIP, Result, Description) VALUES (?, ?, ?, ?, ?, ?)",
                    (user_id, event_type, event_time.strftime('%Y-%m-%d %H:%M:%S'), source_ip, result, description)
                )
            except sqlite3.IntegrityError as e:
                print(f"Erreur d'insertion de log pour {user_id}, événement {event_type}: {e}.")
                # Continue même si l'insertion échoue

    conn.commit()
    conn.close()
    print("Données synthétiques générées et insérées avec succès.")

### Définition des Règles de l'Audit IAM

#### Détecter les comptes actifs mais inutilisés depuis longtemps.

In [8]:
def check_dormant_active_accounts(df_users, threshold_days=180):
    """
    Vérifie les comptes utilisateurs actifs qui n'ont pas enregistré de connexion
    depuis un certain nombre de jours (comptes dormants actifs).
    Arguments:
        df_users (pd.DataFrame): DataFrame contenant les données des utilisateurs.
        threshold_days (int): Seuil en jours pour considérer un compte comme dormant.
    Retourne:
        list: Une liste de dictionnaires, chaque dictionnaire représentant un constat.
    """
    findings = []
    # Assurez-vous que LastLoginDate est au format datetime pour les comparaisons
    df_users['LastLoginDate'] = pd.to_datetime(df_users['LastLoginDate'], errors='coerce')

    # Date limite d'activité (aujourd'hui moins le seuil)
    inactive_threshold_date = datetime.now() - timedelta(days=threshold_days)

    # Filtrer les utilisateurs actifs dont la dernière connexion est avant la date seuil
    dormant_accounts = df_users[
        (df_users['Status'] == 'Active') &
        (df_users['LastLoginDate'].notna()) & # S'assure que la date de dernière connexion n'est pas NULL
        (df_users['LastLoginDate'] < inactive_threshold_date)
    ]

    for index, row in dormant_accounts.iterrows():
        findings.append({
            'Type de Constat': 'Compte Dormant Actif',
            'Gravité': 'Moyenne',
            'Description': f"Le compte '{row['Username']}' (UserID: {row['UserID']}) est Actif mais n'a pas été utilisé depuis le {row['LastLoginDate'].strftime('%Y-%m-%d')}. (Plus de {threshold_days} jours)",
            'Utilisateur': row['UserID'],
            'Date du Constat': datetime.now().strftime('%Y-%m-%d')
        })
    return findings

#### Identification des activités sensibles effectuées en dehors des heures de travail par des utilisateurs privilégiés.

In [9]:
def check_privileged_activity_off_hours(df_access_logs, privileged_users_ids, start_hour=8, end_hour=18):
    """
    Vérifie les activités de comptes privilégiés en dehors des heures de travail.
    Arguments:
        df_access_logs (pd.DataFrame): DataFrame contenant les journaux d'accès.
        privileged_users_ids (list): Liste des UserID considérés comme privilégiés.
        start_hour (int): Heure de début des heures ouvrées (ex: 8 pour 8h du matin).
        end_hour (int): Heure de fin des heures ouvrées (ex: 18 pour 18h).
    Retourne:
        list: Une liste de dictionnaires, chaque dictionnaire représentant un constat.
    """
    findings = []
    df_access_logs['Timestamp'] = pd.to_datetime(df_access_logs['Timestamp'])

    # Filtrer les logs des utilisateurs privilégiés
    privileged_activity = df_access_logs[df_access_logs['UserID'].isin(privileged_users_ids)].copy()

    # Identifier les activités en dehors des heures ouvrées
    off_hours_activity = privileged_activity[
        (privileged_activity['Timestamp'].dt.hour < start_hour) |
        (privileged_activity['Timestamp'].dt.hour >= end_hour)
    ]

    for index, row in off_hours_activity.iterrows():
        findings.append({
            'Type de Constat': 'Activité Privilégiée Hors Heures',
            'Gravité': 'Élevée',
            'Description': f"L'utilisateur privilégié '{row['UserID']}' a effectué une activité ('{row['EventType']}') à {row['Timestamp'].strftime('%Y-%m-%d %H:%M:%S')} en dehors des heures ouvrées ({start_hour}h-{end_hour}h).",
            'Utilisateur': row['UserID'],
            'Date de l\'Activité': row['Timestamp'].strftime('%Y-%m-%d'),
            'Heure de l\'Activité': row['Timestamp'].hour
        })
    return findings

#### Identification des conflits de SoD spécifiques, comme un rôle ayant deux permissions incompatibles (ici, créer un fournisseur et exécuter un paiement).

In [10]:
def check_sod_violations_finance(df_user_roles_perms, role_name='Finance Analyst', perm1='Create Supplier', perm2='Execute Payment'):
    """
    Vérifie les violations de Séparation des Tâches (SoD) pour le rôle de Finance Analyst
    ayant à la fois les permissions de créer un fournisseur et d'exécuter un paiement.
    Arguments:
        df_user_roles_perms (pd.DataFrame): DataFrame combiné des utilisateurs, rôles et permissions.
        role_name (str): Nom du rôle à auditer (ex: 'Finance Analyst').
        perm1 (str): Première permission du conflit (ex: 'Create Supplier').
        perm2 (str): Deuxième permission du conflit (ex: 'Execute Payment').
    Retourne:
        list: Une liste de dictionnaires, chaque dictionnaire représentant un constat.
    """
    findings = []

    # Filtrer les utilisateurs ayant le rôle spécifique
    finance_analysts = df_user_roles_perms[df_user_roles_perms['RoleName'] == role_name]

    # Identifier ceux qui ont la première permission critique
    has_perm1 = finance_analysts[finance_analysts['PermissionName'] == perm1]
    # Identifier ceux qui ont la deuxième permission critique
    has_perm2 = finance_analysts[finance_analysts['PermissionName'] == perm2]

    # Trouver les UserID qui apparaissent dans les deux listes (ayant les deux permissions)
    users_with_sod_conflict = pd.merge(
        has_perm1['UserID'].drop_duplicates(),
        has_perm2['UserID'].drop_duplicates(),
        on='UserID'
    )

    for index, row in users_with_sod_conflict.iterrows():
        findings.append({
            'Type de Constat': 'SoD - Conflit de Permissions (Finance)',
            'Gravité': 'Critique',
            'Description': f"L'utilisateur '{row['UserID']}' avec le rôle '{role_name}' détient à la fois les permissions '{perm1}' et '{perm2}', ce qui constitue un conflit de Séparation des Tâches.",
            'Utilisateur': row['UserID'],
            'Rôle': role_name,
            'Permissions en Conflit': f"{perm1} et {perm2}",
            'Date du Constat': datetime.now().strftime('%Y-%m-%d')
        })
    return findings

### Définition de la Fonction de Génération des Recommandations

In [17]:
# --- Fonction de Génération des Recommandations d'Audit (MODIFIÉE) ---

def generate_recommendations(all_findings):
    """
    Génère des recommandations d'audit regroupées par type de constat.
    Les recommandations sont génériques pour chaque type et listent les UserID affectés.
    Arguments:
        all_findings (list): Une liste de dictionnaires, chaque dictionnaire représentant un constat.
    Retourne:
        dict: Un dictionnaire où les clés sont les types de constat et les valeurs sont
              des dictionnaires contenant le texte de recommandation générique et la liste
              des utilisateurs affectés.
    """
    grouped_recommendations_info = {}

    for finding in all_findings:
        finding_type = finding['Type de Constat']
        user_id = finding.get('Utilisateur', 'N/A')

        # Initialise l'entrée pour ce type de constat s'il n'existe pas encore
        if finding_type not in grouped_recommendations_info:
            grouped_recommendations_info[finding_type] = {
                'recommandation_text': '',
                'affected_users': set() # Utilise un set pour éviter les UserID en double
            }

        # Ajoute l'UserID à l'ensemble des utilisateurs affectés pour ce type de constat
        if user_id != 'N/A':
            grouped_recommendations_info[finding_type]['affected_users'].add(user_id)

        # Définit le texte de recommandation générique une seule fois par type
        # On s'assure de ne pas répéter le UserID dans la recommandation générique
        if not grouped_recommendations_info[finding_type]['recommandation_text']:
            if finding_type == 'Compte Dormant Actif':
                grouped_recommendations_info[finding_type]['recommandation_text'] = (
                    f"Il est recommandé de : \n"
                    f"  - Procéder à une revue trimestrielle des comptes actifs n'ayant pas enregistré de connexion depuis plus de 180 jours.\n"
                    f"  - Mettre en place une politique de désactivation automatique après 90 ou 180 jours d'inactivité, après approbation managériale.\n"
                    f"  - Archiver ou supprimer les comptes dont l'inactivité est confirmée et non justifiée, conformément à l'ISO 27001 (A.9.2.1)."
                )
            elif finding_type == 'Activité Privilégiée Hors Heures':
                grouped_recommendations_info[finding_type]['recommandation_text'] = (
                    f"Il est recommandé de : \n"
                    f"  - Mettre en place des plages horaires d'accès strictes pour les comptes à privilèges et configurer des alertes en cas de violation.\n"
                    f"  - Implémenter une solution de Privileged Access Management (PAM) pour la gestion des sessions et la traçabilité en temps réel.\n"
                    f"  - Exiger une Multi-Factor Authentication (MFA) pour tout accès privilégié, conforme à l'ISO 27001 (A.9.2.4) et RGPD (Article 32)."
                )
            elif finding_type == 'SoD - Conflit de Permissions (Finance)':
                grouped_recommendations_info[finding_type]['recommandation_text'] = (
                    f"Il est recommandé de : \n"
                    f"  - Réviser les attributions de rôles et permissions afin d'éliminer le cumul de fonctions conflictuelles.\n"
                    f"  - Mettre en place une matrice de SoD claire et l'intégrer dans le processus d'attribution et de revue des rôles applicatifs.\n"
                    f"  - En cas d'impossibilité de séparation stricte, implémenter des contrôles compensatoires efficaces (ex: revue et approbation par un tiers indépendant) et documentés, en ligne avec SOX (Section 404) et ISO 27001 (A.6.1.2).\n"
                    f"  - Utiliser des outils d'analyse de SoD pour scanner régulièrement les droits et les activités."
                )
            # Ajouter d'autres types de constats et leurs recommandations génériques ici si nécessaire

    return grouped_recommendations_info

### Compilation des recommandations sur un fichier excel prêt pour visualisations

In [18]:
# --- Fonctions utilitaires pour le traitement des constats ---

def parse_findings_to_dataframe(findings):
    """
    Convertit une liste de dictionnaires de constats en un DataFrame Pandas.
    Arguments:
        findings (list): Une liste de dictionnaires, chaque dictionnaire représentant un constat.
    Retourne:
        pd.DataFrame: Un DataFrame Pandas structuré avec les détails des constats.
    """
    # Liste de colonnes que nous attendons de chaque constat.
    # On peut les définir ici pour s'assurer d'une structure uniforme.
    standard_columns = [
        'Type de Constat', 'Gravité', 'Utilisateur', 'Description',
        'Date du Constat', 'Date de l\'Activité', 'Heure de l\'Activité',
        'Rôle', 'Permissions en Conflit'
    ]

    # Créer un DataFrame à partir des constats.
    # Pandas gère bien les dictionnaires avec des clés manquantes en remplissant avec NaN.
    df_findings = pd.DataFrame(findings)

    # Assurez-vous que toutes les colonnes standard sont présentes, remplissez avec pd.NA si manquantes
    for col in standard_columns:
        if col not in df_findings.columns:
            df_findings[col] = pd.NA

    # Réordonner les colonnes pour une meilleure lisibilité
    df_findings = df_findings[standard_columns]

    return df_findings

### Exécution

In [19]:
if __name__ == "__main__":
    # Ceci est le bloc d'exécution principal qui orchestre l'ensemble du processus d'audit.

    print("--- Démarrage de l'Audit IAM ---")

    # 1. Nettoyage et Configuration de la base de données
    if os.path.exists(DB_NAME):
        os.remove(DB_NAME)
        print(f"Ancienne base de données '{DB_NAME}' supprimée pour un nouveau départ.")
    print("\n--- Configuration de la base de données IAM ---")
    setup_database() # Appel de la fonction pour créer la structure des tables.

    # 2. Génération et insertion des données synthétiques
    print("\n--- Génération et insertion des données synthétiques ---")
    generate_synthetic_data(num_users=150, num_logs_per_user=70) # Appel de la fonction pour remplir la DB.
    print(f"\nLa base de données IAM '{DB_NAME}' est prête et remplie pour l'audit !")

    # --- Fonction d'extraction et chargement SQL vers Pandas ---
    # Définie ici pour être accessible à toutes les étapes d'extraction.
    def extract_data_to_pandas(query):
        """
        Exécute une requête SQL sur la base de données SQLite et renvoie le résultat
        dans un DataFrame Pandas.
        """
        conn = sqlite3.connect(DB_NAME)
        df = pd.read_sql_query(query, conn)
        conn.close()
        return df

    # 3. Extraction des données nécessaires via SQL pour les fonctions d'audit
    print("\n--- Extraction des données via SQL pour l'audit ---")

    # Données des utilisateurs (pour l'audit des comptes dormants)
    df_users = extract_data_to_pandas("SELECT UserID, Username, Status, LastLoginDate FROM Users;")
    print("DataFrame 'Users' extrait pour l'audit des comptes dormants.")

    # Données des logs d'accès (pour activités privilégiées hors heures)
    # On filtre les types d'événements pertinents pour cette analyse.
    df_access_logs = extract_data_to_pandas("SELECT UserID, EventType, Timestamp FROM AccessLogs WHERE EventType IN ('Login', 'RoleChange', 'AccessDenied', 'ResourceAccess');")
    print("DataFrame 'AccessLogs' extrait pour l'audit des activités hors heures.")

    # Données combinées utilisateurs, rôles et permissions (pour l'audit SoD)
    # Cette requête JOIN plusieurs tables pour obtenir une vue complète des droits.
    sql_query_user_roles_perms = """
    SELECT
        U.UserID,
        U.Username,
        R.RoleName,
        P.PermissionName
    FROM Users U
    JOIN UserRoles UR ON U.UserID = UR.UserID
    JOIN Roles R ON UR.RoleID = R.RoleID
    JOIN RolePermissions RP ON R.RoleID = RP.RoleID
    JOIN Permissions P ON RP.PermissionID = P.PermissionID;
    """
    df_user_roles_perms = extract_data_to_pandas(sql_query_user_roles_perms)
    print("DataFrame 'UserRolesPerms' extrait pour l'audit SoD.")

    # 4. Identification des utilisateurs privilégiés
    # On identifie les IT Administrators pour l'audit des activités privilégiées.
    sql_query_privileged_users = """
    SELECT U.UserID
    FROM Users U
    JOIN UserRoles UR ON U.UserID = UR.UserID
    JOIN Roles R ON UR.RoleID = R.RoleID
    WHERE R.RoleName = 'IT Administrator';
    """
    df_privileged_users = extract_data_to_pandas(sql_query_privileged_users)
    privileged_users_ids = df_privileged_users['UserID'].tolist()
    print(f"Identifiants des utilisateurs privilégiés (IT Administrator) : {privileged_users_ids[:5]}...")


    # 5. Application des règles d'audit
    print("\n--- Application des règles d'audit IAM ---")
    all_findings = [] # Cette liste va collecter tous les constats d'audit.

    # 5.1. Audit des comptes dormants actifs
    # On utilise .copy() pour s'assurer que la fonction travaille sur une copie indépendante du DataFrame.
    findings_dormant = check_dormant_active_accounts(df_users.copy())
    all_findings.extend(findings_dormant) # Ajoute les constats à la liste globale.
    print(f"Constats 'Comptes Dormants Actifs' : {len(findings_dormant)} trouvés.")

    # 5.2. Audit des activités privilégiées hors heures
    findings_off_hours = check_privileged_activity_off_hours(df_access_logs.copy(), privileged_users_ids)
    all_findings.extend(findings_off_hours)
    print(f"Constats 'Activité Privilégiée Hors Heures' : {len(findings_off_hours)} trouvés.")

    # 5.3. Audit des violations SoD pour la finance
    findings_sod_finance = check_sod_violations_finance(df_user_roles_perms.copy())
    all_findings.extend(findings_sod_finance)
    print(f"Constats 'SoD - Conflit de Permissions (Finance)' : {len(findings_sod_finance)} trouvés.")


    # 6. Affichage des constats consolidés
    print("\n--- Synthèse des Constats d'Audit IAM ---")
    if all_findings:
        # Group findings by 'Type de Constat'
        findings_by_type = {}
        for finding in all_findings:
            finding_type = finding['Type de Constat']
            if finding_type not in findings_by_type:
                findings_by_type[finding_type] = 0
            findings_by_type[finding_type] += 1

        print("\nCatégories de Constats :")
        for finding_type, count in findings_by_type.items():
            print(f"  - {finding_type}: {count} constats")

        # Sauvegarder les constats détaillés dans un fichier texte
        detailed_findings_folder = "audit_details"
        os.makedirs(detailed_findings_folder, exist_ok=True)
        detailed_findings_file = os.path.join(detailed_findings_folder, "detailed_findings.txt")

        with open(detailed_findings_file, "w", encoding="utf-8") as f_details:
            f_details.write("--- Rapport Détaillé des Constats d'Audit IAM ---\n\n")
            if all_findings:
                for i, finding in enumerate(all_findings):
                    f_details.write(f"\nConstat {i+1}:\n")
                    for key, value in finding.items():
                        f_details.write(f"  {key}: {value}\n")
            else:
                f_details.write("Aucun constat d'audit IAM détaillé détecté sur la période.\n")
        print(f"\nLes détails complets des constats sont disponibles dans le fichier : {detailed_findings_file}")

        # --- Traitement et Exportation vers Excel ---
        print("\n--- Exportation des constats vers Excel ---")
        # Appel de la fonction utilitaire parse_findings_to_dataframe
        df_all_findings_structured = parse_findings_to_dataframe(all_findings)
        excel_output_file = os.path.join(detailed_findings_folder, "audit_findings.xlsx")

        try:
            df_all_findings_structured.to_excel(excel_output_file, index=False)
            print(f"Les constats structurés ont été exportés vers : {excel_output_file}")
            print("\nVous pouvez maintenant utiliser ce fichier Excel pour créer un mini-dashboard !")
        except Exception as e:
            print(f"Erreur lors de l'exportation vers Excel : {e}")

    else:
        print("Aucun constat d'audit IAM détecté sur la période.")

    # --- 7. Génération et Affichage des Recommandations ---
    print("\n\n--- Recommandations d'Audit ---")
    # Appel à la fonction generate_recommendations modifiée
    grouped_recommendations_info = generate_recommendations(all_findings)

    if grouped_recommendations_info:
        for finding_type, info in grouped_recommendations_info.items():
            print(f"\n### Type de Constat : {finding_type}")
            print(info['recommandation_text'])
            if info['affected_users']:
                print(f"  Utilisateurs affectés : {', '.join(sorted(list(info['affected_users'])))}\n")
            else:
                print("  Aucun utilisateur affecté spécifiquement mentionné.")
    else:
        print("Aucune recommandation générée car aucun constat n'a été détecté.")

    print(f"\n\n--- Audit IAM terminé. {len(all_findings)} constats et {len(grouped_recommendations_info)} types de recommandations ont été identifiés. ---")

--- Démarrage de l'Audit IAM ---
Ancienne base de données 'iam_audit.db' supprimée pour un nouveau départ.

--- Configuration de la base de données IAM ---
Base de données SQLite et tables configurées avec succès.

--- Génération et insertion des données synthétiques ---
Données synthétiques générées et insérées avec succès.

La base de données IAM 'iam_audit.db' est prête et remplie pour l'audit !

--- Extraction des données via SQL pour l'audit ---
DataFrame 'Users' extrait pour l'audit des comptes dormants.
DataFrame 'AccessLogs' extrait pour l'audit des activités hors heures.
DataFrame 'UserRolesPerms' extrait pour l'audit SoD.
Identifiants des utilisateurs privilégiés (IT Administrator) : ['U00000', 'U00009', 'U00020', 'U00025', 'U00026']...

--- Application des règles d'audit IAM ---
Constats 'Comptes Dormants Actifs' : 114 trouvés.
Constats 'Activité Privilégiée Hors Heures' : 717 trouvés.
Constats 'SoD - Conflit de Permissions (Finance)' : 21 trouvés.

--- Synthèse des Constats