**0) Ancienne configuration du notebook**

In [5]:
import pandas as pd
import numpy as np
import boto3
import json
import os
from datetime import datetime
from tqdm import tqdm
import requests  # ← Correction ici (pas resquests)
import zipfile
from IPython.display import Markdown, display, HTML  # ← display pas displa
import warnings
warnings.filterwarnings('ignore')

In [6]:
bedrock_runtime = boto3.client('bedrock-runtime')
bedrock = boto3.client('bedrock')
s3 = boto3.client('s3')
comprehend = boto3.client('comprehend')
textract = boto3.client('textract')

print("✅ Configuration AWS complétée")

✅ Configuration AWS complétée


**1) Imports**

In [7]:
!pip install defeatbeta-api boto3 sec-parser pandas
import sec_parser as sp
import json
import boto3
import re
import os
import ast
from pathlib import Path
import warnings
from typing import Dict, List, Optional, Tuple
import pandas as pd
from pathlib import Path
import sec_parser as sp
from sec_parser.processing_steps import (
    TopSectionManagerFor10Q,
    IndividualSemanticElementExtractor,
    TopSectionTitleCheck,
)



**2) Helpers pour générer des versions markdown synthétiques des 10K**

In [8]:
# ---- Paramètres ----
ITEMS_WANTED = {"1", "3", "7", "8"}  # Items à garder
ITEM_ORDER = ["1", "3", "7", "8"]    # Ordre d'écriture

# Détecte "Item 1", "Item 1A", "ITEM 7 — ..." (tolérant)
ITEM_RE = re.compile(r"item\s+([0-9]+[a-z]?)\b", re.IGNORECASE)

def normalize(s: str) -> str:
    return re.sub(r"\s+", " ", s or "").strip()

def looks_like_toc(text: str) -> bool:
    """Heuristiques simples pour filtrer la table des matières."""
    t = (text or "").lower()
    if "table of contents" in t:
        return True
    if re.search(r"\.{5,}", text or ""):  # lignes avec "........"
        return True
    if len(text or "") < 30 and re.search(r"\s\d{1,3}$", text or ""):  # titre court + n° page
        return True
    return False

def find_item_code(text: str) -> Optional[str]:
    """Retourne '1','3','7','8', etc. si la ligne ressemble à un début d'Item."""
    txt = normalize(text)
    if looks_like_toc(txt):
        return None
    m = ITEM_RE.search(txt)
    return m.group(1).upper() if m else None

# ---------- Helpers robustes d'extraction de table en Markdown ----------
def _coerce_to_html_string(x) -> Optional[str]:
    """Essaye de convertir un objet (HtmlTag, bs4.Tag, etc.) en str HTML."""
    if x is None:
        return None
    try:
        if isinstance(x, str):
            return x
        if hasattr(x, "get_source_code"):
            s = x.get_source_code()
            if isinstance(s, str):
                return s
        return str(x)
    except Exception:
        return None

def table_to_markdown_robust(node: sp.TableElement) -> str:
    """
    1) Essaye node.table_to_markdown() si dispo
    2) Sinon, récupère l'HTML du tableau et:
       - tente TableParser(html).table_to_markdown() si dispo
       - ou pandas.read_html + DataFrame.to_markdown() en dernier recours
    """
    # 1) API directe
    if hasattr(node, "table_to_markdown"):
        try:
            return node.table_to_markdown()
        except Exception:
            pass

    # 2) Récupérer l'HTML
    html_table = None
    if hasattr(node, "get_source_code"):
        html_table = node.get_source_code()
    if not isinstance(html_table, str) and hasattr(node, "html_tag"):
        html_table = _coerce_to_html_string(node.html_tag)
        if not isinstance(html_table, str) and hasattr(node.html_tag, "get_source_code"):
            html_table = node.html_tag.get_source_code()
    if not isinstance(html_table, str):
        html_table = _coerce_to_html_string(html_table) or _coerce_to_html_string(node)

    if isinstance(html_table, str) and "<table" in html_table.lower():
        # a) TableParser -> DataFrame -> Markdown
        try:
            from sec_parser.semantic_elements.table_element.table_parser import TableParser
            df = TableParser(html_table).parse_as_df()
            if isinstance(df, pd.DataFrame) and not df.empty:
                return df.to_markdown(index=False)
        except Exception:
            pass
        # b) pandas.read_html -> Markdown
        try:
            dfs = pd.read_html(html_table)
            for d in dfs:
                if isinstance(d, pd.DataFrame) and not d.empty:
                    return d.to_markdown(index=False)
        except Exception:
            pass

    # 3) Dernier recours
    return "_Table not parsed (no available method for this sec_parser version)._"

# ---------- Pipeline principal : parse Items + insérer tables brutes ----------
def parse_10k_html_with_tables(html_path: str) -> Dict[str, str]:
    """
    Parse un 10-K via Edgar10QParser et retourne {code_item: markdown}
    en injectant les tableaux aux bonnes positions (Markdown brut).
    """
    html = Path(html_path).read_text(encoding="utf-8", errors="ignore")
    elements = sp.Edgar10QParser().parse(html)  # contournement 10-K

    # 1) Trouver tous les débuts d'Item
    boundaries: List[tuple[int, str, str]] = []
    for idx, el in enumerate(elements):
        txt = getattr(el, "text", "") or ""
        if len(txt) > 2000:
            continue
        code = find_item_code(txt)
        if code:
            boundaries.append((idx, code, normalize(txt)))
    boundaries.sort(key=lambda t: t[0])

    # 2) Construire le markdown par item
    item_to_md: Dict[str, str] = {}
    label_map = {
        "1": "Item 1 — Business",
        "3": "Item 3 — Legal Proceedings",
        "7": "Item 7 — Management’s Discussion and Analysis",
        "8": "Item 8 — Financial Statements and Supplementary Data",
    }

    for b_i, (start_idx, code, title_text) in enumerate(boundaries):
        if code not in ITEMS_WANTED:
            continue

        # Fin de section = prochain Item
        end_idx = len(elements)
        for j in range(b_i + 1, len(boundaries)):
            next_idx, _, _ = boundaries[j]
            if next_idx > start_idx:
                end_idx = next_idx
                break

        parts: List[str] = [f"# {label_map.get(code, title_text)}\n"]

        table_counter = 0
        for k in range(start_idx + 1, end_idx):
            node = elements[k]
            if isinstance(node, sp.TableElement):
                table_counter += 1
                try:
                    md_table = table_to_markdown_robust(node)
                    parts.append(f"\n**Table {table_counter} (Item {code})**\n\n{md_table}\n")
                except Exception as e:
                    parts.append(f"\n**Table {table_counter} (Item {code}) — parsing error**\n\n_Error: {e}_\n")
            else:
                txt = (getattr(node, "text", "") or "").strip()
                if txt and not looks_like_toc(txt):
                    parts.append(txt + "\n")

        md = "\n".join(parts).strip()
        if code not in item_to_md or len(md) > len(item_to_md[code]):
            item_to_md[code] = md

    return item_to_md

def save_items_markdown(item_to_md: Dict[str, str], out_md: str) -> None:
    blocks: List[str] = []
    for code in ITEM_ORDER:
        if code in item_to_md:
            blocks.append(item_to_md[code])
    Path(out_md).write_text("\n\n---\n\n".join(blocks) + "\n", encoding="utf-8")

**3) Génération des versions markdown synthétiques des 10K dans le dossier fillings_markdown**

In [30]:
def generate_10k_descriptions(wanted_tickers=[], max_iter=float('inf')):
    """
    Génère des fichiers markdown simplifiés décrivant les rapports 10K 
    dans le dossier fillings_markdown
    """
    root_in = Path("fillings")
    root_out = Path("fillings_markdown")
    root_out.mkdir(parents=True, exist_ok=True)
    
    num_files = 0
    iter = 0
    for folder in sorted(root_in.iterdir()):
        iter += 1
        if iter > max_iter:
            break
        if not folder.is_dir():
            continue
        if wanted_tickers and folder.name not in wanted_tickers:
            continue
    
        out_dir = root_out
        out_dir.mkdir(parents=True, exist_ok=True)
    
        for path in sorted(folder.glob("*.html")):
            try:
                items_md = parse_10k_html_with_tables(str(path))
                md_full = "\n\n---\n\n".join(
                    items_md[k] for k in ITEM_ORDER if k in items_md
                ).strip()
    
                if not md_full:
                    md_full = "_(no selected items found)_"

                out_path = out_dir / f"{folder.name}.md"
                out_path.write_text(md_full + "\n", encoding="utf-8")
                num_files += 1
            except Exception as e:
                # On écrit un fichier d'erreur à côté pour diagnostiquer
                err_path = out_dir / f"{path.stem}.error.md"
                err_path.write_text(f"_Error parsing {path.name}: {e}_\n", encoding="utf-8")

    print(f"Markdown enregistrés dans: {root_out.resolve()}")
    print(f"Nombre de fichiers écrits: {num_files}")

generate_10k_descriptions(["AAPL", "ACGL"])

Markdown enregistrés dans: /mnt/custom-file-systems/s3/shared/fillings_markdown
Nombre de fichiers écrits: 2


**4) Mise en place des requêtes LLM**

In [10]:
# AWS Bedrock Configuration
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")  # Change to your preferred region
BEDROCK_MODEL_ID = "anthropic.claude-3-5-sonnet-20241022-v2:0"  # Claude 3.5 Sonnet

# Initialize Bedrock client
bedrock_runtime = boto3.client(
    service_name='bedrock-runtime',
    region_name=AWS_REGION
)

In [11]:
def invoke_bedrock_model(prompt: str, max_tokens: int = 2000) -> str:
    """
    Invoke Amazon Bedrock model with a prompt.
    
    Args:
        prompt: The prompt to send to the model
        max_tokens: Maximum tokens in response
    
    Returns:
        Model response text
    """
    # Prepare the request body for Claude 3.5 Sonnet
    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": max_tokens,
        "messages": [
            {
                "role": "user",
                "content": prompt
            }
        ],
        "temperature": 0.7,
        "top_p": 0.9
    })
    
    try:
        # Invoke the model
        response = bedrock_runtime.invoke_model(
            modelId=BEDROCK_MODEL_ID,
            body=body
        )
        
        # Parse response
        response_body = json.loads(response['body'].read())
        return response_body['content'][0]['text']
    
    except Exception as e:
        print(f"Error invoking Bedrock: {e}")
        raise

**5) Evaluation par LLM d'un 10K + helpers**

In [12]:
def _extract_json_block(text: str) -> str:
    """
    Extrait le premier bloc JSON-like { ... } d'une réponse LLM.
    Tolère du texte avant/après. Renvoie la chaîne avec accolades incluses.
    Lève ValueError si rien n'est trouvé.
    """
    # Cherche le plus court bloc englobant équilibré à partir du 1er '{'
    start = text.find("{")
    if start == -1:
        raise ValueError("Réponse du LLM sans accolade '{' - JSON introuvable.")
    # Heuristique simple d'équilibrage des accolades
    depth = 0
    for i in range(start, len(text)):
        if text[i] == "{":
            depth += 1
        elif text[i] == "}":
            depth -= 1
            if depth == 0:
                return text[start:i+1]
    raise ValueError("Bloc JSON non équilibré - '}' manquante.")

def _parse_json_lenient(s: str) -> dict:
    """
    Essaie plusieurs parseurs pour supporter:
    - JSON standard (double quotes)
    - dict Python (quotes simples)
    - JSON encapsulé dans texte
    """
    # 1) Essai direct JSON
    try:
        return json.loads(s)
    except Exception:
        pass
    # 2) Remplacement naïf des quotes simples -> doubles (sans toucher aux nombres)
    try:
        # D'abord, si c'est un dict Python valide, literal_eval est le plus sûr
        return ast.literal_eval(s)
    except Exception:
        pass
    # 3) Tentative: convertir les quotes simples de clés/valeurs en doubles via regex prudente
    try:
        s2 = re.sub(r"(?P<q>')(?P<key>[^'\\]*?)\1\s*:", r'"\g<key>":', s)  # clés
        s2 = re.sub(r":\s*(?P<q>')(?P<val>[^'\\]*?)\1", r': "\g<val>"', s2) # valeurs textuelles
        return json.loads(s2)
    except Exception:
        pass
    raise ValueError("Impossible de parser la réponse du LLM en JSON/dict.")

def _validate_schema(payload: dict) -> dict:
    """
    Valide les clés, types et bornes. Convertit si besoin (ex: float -> int toléré si entier).
    Lève ValueError si non conforme. Renvoie le dict (potentiellement casté).
    """
    required_types = {
        "ton narcissique": int,
        "répétitions": int,
        "impact risque 1": int,
        "probabilité risque 1": float,
        "impact risque 2": int,
        "probabilité risque 2": float,
        "impact risque 3": int,
        "probabilité risque 3": float,
        "impact risque 4": int,
        "probabilité risque 4": float,
        "impact risque 5": int,
        "probabilité risque 5": float,
        "santé financière": int,
        "perspective évolution": int,
        "résumé": str,
    }

    missing = [k for k in required_types if k not in payload]
    if missing:
        raise ValueError(f"Clés manquantes: {missing}")

    out = {}
    # Bornes: notes sur 5 (0..5), proba en [0..1]
    def as_int_0_5(x, key):
        if isinstance(x, float) and x.is_integer():
            x = int(x)
        if not isinstance(x, int):
            raise ValueError(f"'{key}' doit être un entier (0..5).")
        if not (0 <= x <= 5):
            raise ValueError(f"'{key}' hors bornes (0..5): {x}")
        return x

    def as_float_0_1(x, key):
        # Autoriser string "0.7" -> float
        if isinstance(x, str):
            try:
                x = float(x.strip().replace(",", "."))
            except Exception:
                raise ValueError(f"'{key}' doit être un float (0..1).")
        if not isinstance(x, (int, float)):
            raise ValueError(f"'{key}' doit être un float (0..1).")
        x = float(x)
        if not (0.0 <= x <= 1.0):
            raise ValueError(f"'{key}' hors bornes (0..1): {x}")
        return x

    for key, typ in required_types.items():
        val = payload[key]
        if key == "résumé":
            if not isinstance(val, str) or not val.strip():
                raise ValueError("'résumé' doit être une chaîne non vide.")
            out[key] = val.strip()
        elif "probabilité" in key:
            out[key] = as_float_0_1(val, key)
        else:
            out[key] = as_int_0_5(val, key)

    return out

def assess_1_10K(path: str) -> Dict[str, object]:
    """
    Analyse le fichier markdown d'un 10-K, interroge un LLM et renvoie un JSON validé. Lève une
    erreur en cas de problème.
    
    Args:
        path (str): Chemin vers le fichier markdown du 10-K.

    Returns:
        Dict[str, object]: JSON d'évaluation (conforme au schéma spécifié).
    """
    path_md = Path(path)
    md_content = path_md.read_text(encoding="utf-8", errors="ignore")
    prompt = (
        "Tu analyses et évalues des rapports financiers annuels 10-K d'entreprises cotées "
        "en bourse aux États-Unis. Ces documents sont rédigés par les entreprises elles-mêmes et ne sont "
        "pas toujours complètement honnêtes. ILs peuvent essayer de cacher la vérité. Tu dois donc "
        "être méfiant vis-à-vis de la véracité des propos. Le ton de l'écriture, les répétitions peuvent "
        "donc être utiles pour mieux cerner l'entreprise. Tu vas devoir faire plusieurs analyses mentalement "
        " et uniquement envoyer un JSON de réponse. Tu dois évaluer en imaginant que tu compares le document "
        "fourni avec celui d'autres entreprises."
        "- Analyse le ton narcissique du texte puis donne une note sur 5 où 0 est très narcissique. "
        "- Evalue le taux de répétitions et de mentions inutiles dans le texte puis donne une note sur 5. "
        " La note de 5 signifie qu'il n'y a presque pas de répétitions."
        "- Cite 5 risques pour l'entreprise. Pour chacun, précise l'impact que ça peut avoir sur le "
        "titre boursier de l'entreprise ainsi que la probabilité que le risque ait cet impact. Evalue "
        "l'impact sur 5 où 0 signifie qu'il y a aucun impact et donne la probabilité."
        "- Evalue la santé financière sur 5. 5 correspond à une très bonne santé."
        "- Donne une note sur 5 à la perspective d'évolution de l'action boursière. 5 correspond à la "
        "meilleure perspective possible. "
        "Tu dois uniquement renvoyer une réponse sous le format JSON suivant : "
        "{'ton narcissique': int,"
        "'répétitions': int,"
        "'impact risque 1': int,"
        "'probabilité risque 1': float,"
        "'impact risque 2': int,"
        "'probabilité risque 2': float,"
        "'impact risque 3': int,"
        "'probabilité risque 3': float,"
        "'impact risque 4': int,"
        "'probabilité risque 4': float,"
        "'impact risque 5': int,"
        "'probabilité risque 5': float,"
        "'santé financière': int,"
        "'perspective évolution': int,"
        "'résumé': str"
        "}"
        "La clé 'résumé' correspond à un petit texte en français qui explique ce qui a été analysé "
        "sans parler des notes. Il faut expliquer en quoi il s'agit d'une action fiable ou non en "
        "justifiant tes arguments avec ce que tu as vu. Voici le rapport 10K :\n"
    )

    # Appel LLM (fonction utilitaire fournie ailleurs dans ton projet)
    response_text = invoke_bedrock_model(prompt + md_content, max_tokens=200000)

    # Extraction + parsing
    json_block = _extract_json_block(response_text)
    payload = _parse_json_lenient(json_block)

    # Validation stricte
    validated = _validate_schema(payload)

    return validated


**6) Evaluation de tous les 10K simplifiés présents dans fillings_markdown**

In [None]:
def assess_all_10K(max_iter=float('inf')) -> List[object]:
    """
    Analyse tous les fichiers markdown 10-K dans 'fillings_markdown' et renvoie
    une liste des json d'évaluation. 
    Args:
        max_iter (int): Nombre maximum de fichiers à traiter (par défaut tous).
    """
    results = []
    iter = 0
    for md_file in Path("fillings_markdown").glob("*.md"):
        iter += 1
        if iter > max_iter:
            break
        try:
            result = assess_1_10K(str(md_file))
            result["action"] = md_file.stem
            results.append(result)
        except Exception as e:
            results.append({"action": md_file.stem, "error": str(e)})
    return results

#a = assess_all_10K()

[{'ton narcissique': 4,
  'répétitions': 4,
  'impact risque 1': 4,
  'probabilité risque 1': 0.7,
  'impact risque 2': 4,
  'probabilité risque 2': 0.6,
  'impact risque 3': 3,
  'probabilité risque 3': 0.5,
  'impact risque 4': 3,
  'probabilité risque 4': 0.4,
  'impact risque 5': 3,
  'probabilité risque 5': 0.4,
  'santé financière': 4,
  'perspective évolution': 4,
  'résumé': "Agilent Technologies est un leader mondial dans les sciences de la vie, le diagnostic et les marchés de la chimie appliquée. L'entreprise montre une bonne santé financière avec une trésorerie solide ($1.3B), une bonne diversification géographique et une croissance stable. Les principaux risques incluent: dépendance aux dépenses d'investissement des clients, exposition aux fluctuations économiques mondiales, risques liés à la chaîne d'approvisionnement, concurrence accrue et risques réglementaires. Malgré une baisse des revenus de 5% en 2024, l'entreprise maintient de bonnes marges et continue d'investir da

In [None]:
def compute_scores(list_jsons: list) -> list[float]:
    """
    Donne le score final à partir d'un json avec toutes les informations mais pas encore formatté
    """
    w_ton = 0.05
    w_risk = 0.45
    w_repetitions = 0.1
    w_sante_financiere = 0.2
    w_perspective = 0.2

    result = []
    for json in list_jsons:
        score_risk = 0
        for risk in range(1, 6):
            score_risk += float(json[f'impact risque {risk}'])*float(json[f'probabilité risque {risk}'])
        score = w_ton*float(json['ton narcissique']) + w_repetitions*float(json['répétitions']) + w_sante_financiere*float(json['santé financière']) + w_perspective*float(json['perspective évolution'])+ w_risk*score_risk/5
        result.append(score)
    return result

#print(compute_scores(a))

def group_jsons(list_10K: list, list_yahoo: list) -> list:
    """
    Regroupe les données des évaluations 10-K avec les données Yahoo Finance.
    
    Args:
        list_10K (list): Liste des évaluations 10-K.
        list_yahoo (list): Liste des données Yahoo Finance.

    Returns:
        list: Liste des dictionnaires regroupés par action.
    """
    grouped = []
    yahoo_dict = {item["action"]: item for item in list_yahoo}
    seen_actions = set()
    for item_10K in list_10K:
        action = item_10K.get("action")
        seen_actions.add(action)
        yahoo_data = yahoo_dict.get(action, {})
        combined = {**item_10K, **yahoo_data}
        grouped.append(combined)

    for action, yahoo_data in yahoo_dict.items():
        if action not in seen_actions:
            grouped.append(yahoo_data)

    return grouped


[3.019, 3.142, 2.942, 3.1189999999999998]


**Méthode alternative pour les logos des entreprises : crée le dossier sp500_logos**

In [None]:
"""import requests, pandas as pd, time, pathlib, urllib.parse

WIKI_LIST_URL = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
LOGO_TOKEN = "VOTRE_TOKEN_LOGO_DEV"           # https://www.logo.dev/ (inscription gratuite)
OUT_DIR = pathlib.Path("sp500_logos"); OUT_DIR.mkdir(exist_ok=True)
HEADERS = {"User-Agent": "sp500-logos/1.0"}

# 1) Constituants S&P500 depuis Wikipédia (éviter 403)
resp = requests.get(WIKI_LIST_URL, headers=HEADERS, timeout=30)
resp.raise_for_status()  # lève une erreur si 4xx/5xx
# Passer le HTML brut à pandas (plus fiable que lui laisser faire la requête)
tables = pd.read_html(resp.text, flavor="lxml")  # ou bs4 si tu préfères
# Trouver la table qui contient bien 'Symbol' et 'Security'
df = next(t for t in tables if {"Symbol","Security"}.issubset(set(t.columns)))
df = df[["Symbol", "Security"]].rename(columns={"Security": "Company"})

# 2) Domaine officiel via Wikidata (P856)
def wikidata_official_site(qid):
    url = f"https://www.wikidata.org/wiki/Special:EntityData/{qid}.json"
    j = requests.get(url, headers=HEADERS, timeout=20).json()
    claims = j["entities"][qid]["claims"]
    if "P856" in claims:
        return claims["P856"][0]["mainsnak"]["datavalue"]["value"]
    return None

def wikipedia_title_to_qid(title):
    api = "https://en.wikipedia.org/w/api.php"
    params = {"action":"query","prop":"pageprops","titles":title,"format":"json"}
    r = requests.get(api, params=params, headers=HEADERS, timeout=20).json()
    page = next(iter(r["query"]["pages"].values()))
    return page.get("pageprops", {}).get("wikibase_item")

def guess_domain(url):
    if not url: return None
    u = urllib.parse.urlparse(url if url.startswith("http") else "https://" + url)
    return u.netloc or u.path

domains = []
for title in df["Company"].tolist():
    qid = wikipedia_title_to_qid(title)
    site = wikidata_official_site(qid) if qid else None
    domains.append(guess_domain(site))
    time.sleep(0.1)  # être gentil avec les APIs

df["domain"] = domains

# 3) Logos via Logo.dev (image simple)
def logo_url(domain, size=256, fmt="png"):
    return f"https://img.logo.dev/{domain}?token={LOGO_TOKEN}&size={size}&format={fmt}"

saved = []
for sym, comp, dom in df[["Symbol","Company","domain"]].itertuples(index=False):
    if not dom: 
        saved.append(None); 
        continue
    url = logo_url(dom)
    try:
        img = requests.get(url, headers=HEADERS, timeout=20)
        if img.ok and img.headers.get("Content-Type","").startswith(("image/",)):
            ext = img.headers["Content-Type"].split("/")[-1].split(";")[0]
            path = OUT_DIR / f"{sym}_{dom}.{ext}"
            path.write_bytes(img.content)
            saved.append(str(path))
        else:
            saved.append(None)
    except Exception:
        saved.append(None)
    time.sleep(0.05)

df["logo_file"] = saved
df.to_csv("sp500_logos_index.csv", index=False)
print("OK → fichiers dans", OUT_DIR.resolve())
"""

SyntaxError: invalid character '²' (U+00B2) (3935029318.py, line 6)