In [None]:

'''
Générer un identifiant de session unique (UUID)

Le stocker dans un cookie HTTP

Associer ce cookie à un utilisateur dans une base de données (SQLite, etc.)

À chaque chargement de page, vérifier si un cookie existe, le valider, et récupérer l'utilisateur correspondant.
'''


In [None]:
# Sert à rien

import sqlite3
import bcrypt
import re
import streamlit as st
from datetime import datetime, timedelta
import uuid
import os
from http.cookies import SimpleCookie


class CookieManager:
    def __init__(self):
        pass

    def set_cookie(self, name: str, value: str, days: int = 7):
        """Définit un cookie persistant pour `days` jours."""
        expires = (datetime.utcnow() + timedelta(days=days)).strftime("%a, %d %b %Y %H:%M:%S GMT")
        st.markdown(f"""
            <script>
            document.cookie = "{name}={value}; expires={expires}; path=/";
            </script>
        """, unsafe_allow_html=True)

    def get_cookie(self, name: str) -> str | None:
        """Récupère la valeur d’un cookie si présent."""
        if 'HTTP_COOKIE' in os.environ:
            cookies = SimpleCookie(os.environ['HTTP_COOKIE'])
            if name in cookies:
                return cookies[name].value
        return None

    def delete_cookie(self, name: str):
        """Supprime un cookie existant."""
        st.markdown(f"""
            <script>
            document.cookie = "{name}=; expires=Thu, 01 Jan 1970 00:00:00 GMT; path=/";
            </script>
        """, unsafe_allow_html=True)


class AuthManager:
    def __init__(self, db_path="user.db"):
        self.db_path = db_path
        self.cookie_manager = CookieManager()
        self.cookie_name = "user_session"
        self.cookie_duration_days = 7
        self.init_db()

    def init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL,
                password TEXT NOT NULL,
                role TEXT DEFAULT 'user',
                registration_date TEXT NOT NULL
                )
            ''')
            c.execute('''
            CREATE TABLE IF NOT EXISTS sessions (
                session_id TEXT PRIMARY KEY,
                user_id INTEGER NOT NULL,
                expires_at TEXT NOT NULL,
                FOREIGN KEY(user_id) REFERENCES users(id)
            )
            ''')
            conn.commit()

    def hash_password(self, password):
        hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode('utf-8')
        return hashed

    def check_password(self, password, hashed_password):
        return bcrypt.checkpw(password.encode(), hashed_password.encode('utf-8'))

    def register(self, username, email, password):
        password_pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*?])[\S\s]{5,}$'
        if not re.match(password_pattern, password):
            return "❌ Mot de passe trop faible. Il doit contenir au moins 5 caractères, 1 majuscule, 1 minuscule, 1 chiffre et 1 caractère spécial."
        email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w{2,}$'
        if not re.match(email_pattern, email):
            return "❌ Email invalide"
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute("SELECT * FROM users WHERE email = ?", (email,))
            if c.fetchone():
                return "❌ Email déjà utilisé"
            hashed = self.hash_password(password)
            c.execute("INSERT INTO users (username, email, password, role, registration_date) VALUES (?, ?, ?, ?, ?)",
                      (username, email, hashed, 'user', datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
            conn.commit()
            return f"✅ Compte '{username}' créé avec succès !"

    def login(self, email, password):
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute("SELECT id, password FROM users WHERE email = ?", (email,))
            result = c.fetchone()
        if not result:
            return "Erreur : Utilisateur non trouvé"
        user_id, hashed_password = result
        if not self.check_password(password, hashed_password):
            return "Erreur : Mot de passe incorrect"
        return None  # connexion OK

    def create_session(self, email):
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute("SELECT id FROM users WHERE email = ?", (email,))
            user = c.fetchone()
            if not user:
                return None
            user_id = user[0]
            session_id = str(uuid.uuid4())
            expires_at = (datetime.utcnow() + timedelta(days=self.cookie_duration_days)).strftime("%Y-%m-%d %H:%M:%S")
            c.execute("INSERT INTO sessions (session_id, user_id, expires_at) VALUES (?, ?, ?)",
                      (session_id, user_id, expires_at))
            conn.commit()
            self.cookie_manager.set_cookie(self.cookie_name, session_id, self.cookie_duration_days)
            return session_id

    def get_current_user(self):
        session_id = self.cookie_manager.get_cookie(self.cookie_name)
        if not session_id:
            return None
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
            c.execute('''
                SELECT users.id, users.username, users.email, users.role FROM users
                JOIN sessions ON users.id = sessions.user_id
                WHERE sessions.session_id = ? AND sessions.expires_at > ?
            ''', (session_id, now_str))
            user = c.fetchone()
            if user:
                keys = ['id', 'username', 'email', 'role']
                return dict(zip(keys, user))
        return None

    def logout(self):
        session_id = self.cookie_manager.get_cookie(self.cookie_name)
        if session_id:
            with sqlite3.connect(self.db_path) as conn:
                c = conn.cursor()
                c.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
                conn.commit()
            self.cookie_manager.delete_cookie(self.cookie_name)

    def login_flow(self, email, password):
        error = self.login(email, password)
        if error:
            return error
        self.create_session(email)
        return None  # succès


In [None]:
#idem
import sqlite3
import bcrypt
import re
import uuid
from datetime import datetime, timedelta

class AuthManager:
    def __init__(self, db_path="users.db"):
        self.db_path = db_path
        self.init_db()

    def init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username TEXT NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    password TEXT NOT NULL,
                    role TEXT DEFAULT 'user',
                    registration_date TEXT NOT NULL
                )
            ''')
            c.execute('''
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id TEXT PRIMARY KEY,
                    user_id INTEGER NOT NULL,
                    expires_at TEXT NOT NULL,
                    FOREIGN KEY(user_id) REFERENCES users(id)
                )
            ''')
            conn.commit()

    
    def hash_password(self, password):
        return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode('utf-8')

    def check_password(self, password, hashed):
        return bcrypt.checkpw(password.encode(), hashed.encode('utf-8'))



    def register(self, username, email, password):
        
        # Validation mdp via regex : 5 caractères, majuscule, minuscule, chiffre, caractère spécial
        password_pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*?])[\S\s]{5,}$'
        if not re.match(password_pattern, password):
            return "❌ Mot de passe trop faible. Il doit contenir au moins 5 caractères, 1 majuscule, 1 minuscule, 1 chiffre et 1 caractère spécial."

        # Validation de l'email via regex : structure mail valide
        email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w{2,}$'
        if not re.match(email_pattern, email):
            return "❌ Email invalide"

        # Ouverture connexion (et fermeture automatique)
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()

            # Vérifie si l'email existe déjà
            c.execute("SELECT * FROM users WHERE email = ?", (email,))
            if c.fetchone():
                return "❌ Email déjà utilisé"

            # Hachage mdp
            hashed = self.hash_password(password)

            # Insertion des infos users
            c.execute("INSERT INTO users (username, email, password, role, registration_date) VALUES (?, ?, ?, ?)",
                      (username, email, hashed, 'user', datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

            # Enregistrement user
            conn.commit()
            return f"✅ Compte '{username}' créé avec succès !"
  

    
    # def login_v1_flow() en mode fuision de def login() et def create_session_v1()
    def login_and_create_session(self, email, password):
        """
        Vérifie l'identité de l'utilisateur et crée une session s'il est authentifié.
        Retourne session_id ou un message d'erreur.
        """
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute("SELECT id, password FROM users WHERE email = ?", (email,))
            result = c.fetchone()
    
            if not result:
                return "Erreur : Utilisateur non trouvé"
    
            user_id, hashed_password = result
            if not self.check_password(password, hashed_password):
                return "Erreur : Mot de passe incorrect"
    
            session_id = str(uuid.uuid4())
            expires_at = (datetime.utcnow() + timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
            c.execute("INSERT INTO sessions (session_id, user_id, expires_at) VALUES (?, ?, ?)",
                      (session_id, user_id, expires_at))
            conn.commit()
            return session_id
    

    def get_user_by_session(self, session_id):
        if not session_id:
            return None

        now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute('''
                SELECT u.id, u.username, u.email, u.role FROM users u
                JOIN sessions s ON u.id = s.user_id
                WHERE s.session_id = ? AND s.expires_at > ?
            ''', (session_id, now))
            row = c.fetchone()

        if row:
            return dict(zip(["id", "username", "email", "role"], row))
        return None


    def logout(self, session_id):
        """
        Supprime la session côté base de données.
        """
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
            conn.commit()
        # Mettre un st.rerun pour refraich et se deconnecter car session_id suprimé de la bdd
        # Si ca marche pas mettre petit script JS pour reload page : components.html("""<script>window.location.reload();</script>""",height=0,width=0,)
    



import streamlit as st

class CookieManager:
    def __init__(self, cookie_name="session_id"):
        self.cookie_name = cookie_name

    def set_cookie(self, value, days=7):
        js_code = f"""
        <script>
        document.cookie = "{self.cookie_name}={value}; path=/; max-age={days * 24 * 60 * 60}; SameSite=Lax";
        </script>
        """
        st.components.v1.html(js_code, height=0, width=0)

    def get_cookie(self):
        cookie_value = st.text_input("session_cookie", value="", label_visibility="collapsed")
        js_code = f"""
        <script>
        const value = document.cookie
            .split('; ')
            .find(row => row.startsWith('{self.cookie_name}='))
            ?.split('=')[1];
        if (value) {{
            const input = window.parent.document.querySelector('input[name="session_cookie"]');
            if (input) {{
                input.value = value;
                input.dispatchEvent(new Event('input', {{ bubbles: true }}));
            }}
        }}
        </script>
        """
        st.components.v1.html(js_code, height=0, width=0)
        return cookie_value or None

    def delete_cookie(self):
        js_code = f"""
        <script>
        document.cookie = "{self.cookie_name}=; path=/; max-age=0";
        </script>
        """
        st.components.v1.html(js_code, height=0, width=0)


In [None]:
# Treès good
import sqlite3
import bcrypt
import re
import uuid
from datetime import datetime, timedelta
from streamlit_cookies_manager import EncryptedCookieManager
import streamlit as st


class AuthManager:
    def __init__(self, db_path="users.db", cookie_name="session_id"):
        self.db_path = db_path
        self.cookie_name = cookie_name
        self.cookies = EncryptedCookieManager(
            prefix="",  # pas de préfixe
            password="SECRET",  # à modifier pour un vrai mot de passe secret
        )
        if not self.cookies.ready():
            st.stop()
        self.init_db()

    def init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username TEXT NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    password TEXT NOT NULL,
                    role TEXT DEFAULT 'user',
                    registration_date TEXT NOT NULL
                )
            ''')
            c.execute('''
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id TEXT PRIMARY KEY,
                    user_id INTEGER NOT NULL,
                    expires_at TEXT NOT NULL,
                    FOREIGN KEY(user_id) REFERENCES users(id)
                )
            ''')
            conn.commit()

    def hash_password(self, password):
        return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode('utf-8')

    def check_password(self, password, hashed):
        return bcrypt.checkpw(password.encode(), hashed.encode('utf-8'))

    def register(self, username, email, password):
        password_pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*?])[\S\s]{5,}$'
        if not re.match(password_pattern, password):
            return "❌ Mot de passe trop faible. Il doit contenir au moins 5 caractères, 1 majuscule, 1 minuscule, 1 chiffre et 1 caractère spécial."

        email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w{2,}$'
        if not re.match(email_pattern, email):
            return "❌ Email invalide"

        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute("SELECT * FROM users WHERE email = ?", (email,))
            if c.fetchone():
                return "❌ Email déjà utilisé"

            hashed = self.hash_password(password)
            c.execute("INSERT INTO users (username, email, password, role, registration_date) VALUES (?, ?, ?, ?, ?)",
                      (username, email, hashed, 'user', datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
            conn.commit()
        return f"✅ Compte '{username}' créé avec succès !"

    def login(self, email, password):
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute("SELECT id, password FROM users WHERE email = ?", (email,))
            result = c.fetchone()

            if not result:
                return "Erreur : Utilisateur non trouvé"
            user_id, hashed_password = result

            if not self.check_password(password, hashed_password):
                return "Erreur : Mot de passe incorrect"

            session_id = str(uuid.uuid4())
            expires_at = (datetime.utcnow() + timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")

            c.execute("INSERT INTO sessions (session_id, user_id, expires_at) VALUES (?, ?, ?)",
                      (session_id, user_id, expires_at))
            conn.commit()

        self.cookies[self.cookie_name] = session_id
        self.cookies.save()
        return "✅ Connexion réussie"

    def logout(self):
        session_id = self.cookies.get(self.cookie_name)
        if session_id:
            with sqlite3.connect(self.db_path) as conn:
                c = conn.cursor()
                c.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
                conn.commit()
        self.cookies[self.cookie_name] = ""
        self.cookies.save()
        st.rerun()

    def get_current_user(self):
        session_id = self.cookies.get(self.cookie_name)
        if not session_id:
            return None

        now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute('''
                SELECT u.id, u.username, u.email, u.role
                FROM users u
                JOIN sessions s ON u.id = s.user_id
                WHERE s.session_id = ? AND s.expires_at > ?
            ''', (session_id, now))
            row = c.fetchone()
        if row:
            return dict(zip(["id", "username", "email", "role"], row))
        return None


In [None]:
# La même que plus haut mais améliroré. Attention, unique sur username. Je suis pas d'accord c'est su le mail

import sqlite3
import bcrypt
import re
import uuid
from datetime import datetime, timedelta
from streamlit_cookies_manager import EncryptedCookieManager
import streamlit as st


############################################# CLASS AUTHMANAGER #############################################
class AuthManager:

#--------------------------- Attribut : chemin et nom bdd et lancement de "init_db()" --------------------------#
    def __init__(self, db_path="users.db", cookie_name="session_id", cookie_secret="Toulouse31"):
        self.db_path = db_path
        self.cookie_name = cookie_name
        # on créé une instance de EncryptedCookieManager et on ne met pas de préfix
        self.cookies = EncryptedCookieManager(prefix="", password=cookie_secret) 
        
        if not self.cookies.ready(): # Si cookie pas dispo (.ready lit via du JS)
            st.stop()
        self.init_db()
        self.clean_expired_sessions() # Nettoie la bdd des session expirées à chaque arrivée sur la page


#--------------------------- Initialisation bdd users et sessions ---------------------------#

    def init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username TEXT NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    password TEXT NOT NULL,
                    role TEXT DEFAULT 'user',
                    registration_date TEXT NOT NULL
                )
            ''')
            c.execute('''
                CREATE TABLE IF NOT EXISTS sessions (
                    session_id TEXT PRIMARY KEY,
                    user_id INTEGER NOT NULL,
                    expires_at TEXT NOT NULL,
                    FOREIGN KEY(user_id) REFERENCES users(id)
                )
            ''')
            conn.commit()


#--------------------------- méthode pour effacer les sessions exiprées de la bdd ---------------------------#
    def clean_expired_sessions(self):
        date_now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (date_now,))
            conn.commit()

    
#--------------------------- Hachage mdp ---------------------------#
    def hash_password(self, password):
        return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode('utf-8')


#--------------------------- Vérification mdp au moment de la connexion ---------------------------#
    def check_password(self, password, hashed):
        return bcrypt.checkpw(password.encode(), hashed.encode('utf-8'))


#--------------------------- Enregistrement ---------------------------#
    def register(self, username, email, password):
        
        # Validation de l'email via regex : structure mail valide
        email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w{2,}$'
        if not re.match(email_pattern, email):
            return "❌ Email invalide"
        
        # Validation mdp via regex : 5 caractères, majuscule, minuscule, chiffre, caractère spécial
        password_pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*?])[\S\s]{5,}$'
        if not re.match(password_pattern, password):
            return "❌ Mot de passe trop faible. Il doit contenir au moins 5 caractères, 1 majuscule, 1 minuscule, 1 chiffre et 1 caractère spécial."

        # Ouverture connexion (et fermeture automatique)
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            
        # Vérifie si l'email existe déjà
        c.execute("SELECT * FROM users WHERE email = ?", (email,))
        if c.fetchone():
            return "❌ Email déjà utilisé"

        # Hachage mdp
        hashed = self.hash_password(password)

        # Insertion des infos users
        date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        c.execute("INSERT INTO users (username, email, password, role, registration_date) VALUES (?, ?, ?, ?, ?)",
                  (username, email, hashed, 'user', date))

        # Enregistrement user ds bdd
        conn.commit()
        return f"✅ Compte '{username}' créé avec succès !"


#--------------------------- login ---------------------------#  
    def login(self, email, password):
        
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            # Cherche le users via le mail en fonction des id et password
            c.execute("SELECT id, password FROM users WHERE email = ?", (email,))
            # récupère via la variable user la ligne unique correspondante
            user = c.fetchone()

            # On vérifie si l'utilisateur existe dans bdd
            if not user:
                return "❌ Utilisateur non trouvé"
            # S'il existe on vérifie le mdp
            user_id, hashed = user # unpacking du tuple pris dans "user = c.fetchone()" ou hashed = user[1]
            if not self.check_password(password, hashed):
                return "❌ Mot de passe incorrect"

            # Si tout est ok, on créé la persistance de session
            # Créé le token unique
            session_id = str(uuid.uuid4())
            # Créé la date d'expiration du token
            expires_at = (datetime.utcnow() + timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") 
            # Rentre ds la bdd les infos
            c.execute("INSERT INTO sessions (session_id, user_id, expires_at) VALUES (?, ?, ?)",
                      (session_id, user_id, expires_at))
            # Enregistre ds la bdd les infos
            conn.commit()

        # créé le cookie avec la session_id créée par str(uuid.uuid4())
        self.cookies[self.cookie_name] = session_id
        # Enregistre le cookie et envoie au navigateur
        self.cookies.save()
        return "✅ Connexion réussie"


#--------------------------- logout ---------------------------#   
    def logout(self):

        # Cherche si l’utilisateur a un cookie de session
        session_id = self.cookies.get(self.cookie_name)
        
        # Si session_id existe, on efface de la bdd
        if session_id:
            with sqlite3.connect(self.db_path) as conn:
                conn.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
                conn.commit()
                
            #Vide le cookie
            self.cookies[self.cookie_name] = ""
            # Enregistre le cookie vide dans le navigateur
            self.cookies.save()
            # Recharge la page
            st.rerun() # A enlever pour mettre sur la page view

        else:
            st.warning("Aucun utilisateur connecté.") #idem


#--------------------------- Vérifie si cookie de session valide ---------------------------#   
    def get_current_user(self):
        
        # récupère le cookie de session du navigateur
        session_id = self.cookies.get(self.cookie_name)

        # Si session-id vide ( "" ) pas connecté.
        if not session_id:
            return None
        
        date_now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

        # Si "session_id" existe, vérifie si "session_id" non expirée
        with sqlite3.connect(self.db_path) as conn:
            c = conn.cursor()
            c.execute('''SELECT u.id, u.username, u.email, u.role FROM users u JOIN sessions s ON u.id = s.user_id WHERE s.session_id = ? AND s.expires_at > ?''', (session_id, date_now))
            user_session = c.fetchone()

            # Si session_id valide on se connecte
            if user_session:
                # On pourrait return True inuqement mais ici on prend les infos afin de savoir qui est connecté et pourvoir s'en servir si on le veut
                return {"id": user_session[0], "username": user_session[1], "email": user_session[2], "role": user_session[3]}
            else:
                return None





# Exemple utilisation

auth = AuthManager(db_path="users.db", cookie_secret="TON_SECRET")

user = auth.get_current_user()

if user:
    st.success(f"Bonjour {user['username']} 👋")
    if st.button("Se déconnecter"):
        auth.logout()
else:
    st.subheader("Connexion")
    email = st.text_input("Email")
    password = st.text_input("Mot de passe", type="password")
    if st.button("Connexion"):
        st.info(auth.login(email, password))

    st.subheader("Créer un compte")
    username = st.text_input("Nom d'utilisateur")
    email_new = st.text_input("Nouvel email")
    password_new = st.text_input("Nouveau mot de passe", type="password")
    if st.button("Créer un compte"):
        st.info(auth.register(username, email_new, password_new))
