In [2]:
import requests
import json
from datetime import datetime
import pandas as pd

# Paramètres d'authentification PISTE
CLIENT_ID = "980d9205-5180-43d4-bf91-b2190855e925"  
CLIENT_SECRET = "18775315-8a1f-48bb-97d0-8a1c708352fc" 


oauth_url = " https://oauth.piste.gouv.fr/api/oauth/token"
api_url = "https://api.piste.gouv.fr/dila/legifrance/lf-engine-app/consult/legiPart"


In [3]:

def get_access_token():
    data = {
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "scope": "openid"
    }

    response = requests.post(oauth_url, data=data)
    response.raise_for_status()
    token = response.json()["access_token"]
    return token

In [4]:
def date_to_timestamp_ms(date_str):
    dt = datetime.strptime(date_str, "%Y-%m-%d")
    timestamp_ms = int(dt.timestamp()) * 1000
    return timestamp_ms

### I) Code général des impôts  (2023-2024)

In [5]:
def get_cgi_version(token, timestamp_ms, year):
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
        "Content-Type": "application/json"
    }

    payload = {
        "date": timestamp_ms,
        "textId": "LEGITEXT000006069577"  # Code général des impôts
    }

    response = requests.post(api_url, headers=headers, json=payload)
    response.raise_for_status()
    
    # Convertir la réponse en DataFrame
    data = response.json()
    df = pd.DataFrame(data['sections'])
    
    # Sauvegarder en CSV
    csv_path = f"""./fiscal_data/cgi_{year}.csv"""
    df.to_csv(csv_path, index=False, encoding='utf-8')
    
    print(f"Version {year} sauvegardée avec succès dans {csv_path}")
    return df

In [37]:
token = get_access_token()

dates = {
    "2023": date_to_timestamp_ms("2023-01-01"),
    "2024": date_to_timestamp_ms("2024-01-01")
}

cgi_2023 = get_cgi_version(token, dates["2023"],"2023")
cgi_2024 = get_cgi_version(token, dates["2024"],"2024")


Version 2023 sauvegardée avec succès dans ./fiscal_data/cgi_2023.csv
Version 2024 sauvegardée avec succès dans ./fiscal_data/cgi_2024.csv


### II) Livre des procédures fiscales (LPF)

In [9]:
def get_lpf_version(token, timestamp_ms, year):
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
        "Content-Type": "application/json"
    }

    payload = {
        "date": timestamp_ms,
        "textId": "LEGITEXT000006069583"  # Livre des procédures fiscales
    }

    response = requests.post(api_url, headers=headers, json=payload)
    response.raise_for_status()
    
    # Convertir la réponse en DataFrame
    data = response.json()
    df = pd.DataFrame(data['sections'])

    # Sauvegarder en CSV
    csv_path = f"""./fiscal_data/lpf_{year}.csv"""
    df.to_csv(csv_path, index=False, encoding='utf-8')

    print(f"LPF - Version {year} sauvegardée avec succès dans {csv_path}")
    return df


In [10]:
token = get_access_token()

dates = {
    "2023": date_to_timestamp_ms("2023-01-01"),
    "2024": date_to_timestamp_ms("2024-01-01")
}

lpf_2023 = get_lpf_version(token, dates["2023"],"2023")
lpf_2024 = get_lpf_version(token, dates["2024"],"2024")


LPF - Version 2023 sauvegardée avec succès dans ./fiscal_data/lpf_2023.csv
LPF - Version 2024 sauvegardée avec succès dans ./fiscal_data/lpf_2024.csv


### III) BOFIP

In [27]:
def fetch_full_bofip() -> pd.DataFrame:
    """
    Récupère l'intégralité du dataset BOFiP-vigueur.
    """
    url = "https://data.economie.gouv.fr/api/records/1.0/search/"
    dataset = "bofip-vigueur"
    rows_per_page = 1000
    start = 0
    all_records = []
    
    while True:
        params = {
            "dataset": dataset,
            "rows": rows_per_page,
            "start": start,
        }
        
        response = requests.get(url, params=params)
        response.raise_for_status()
        data = response.json()
        records = data.get("records", [])
        
        if not records:
            break
        
        all_records.extend([record["fields"] for record in records])
        start += rows_per_page
        print(f"Fetched {len(all_records)} records so far...")
    
    df = pd.DataFrame(all_records)
    return df

In [28]:
# Récupération complète
df_all = fetch_full_bofip()

Fetched 1000 records so far...
Fetched 2000 records so far...
Fetched 3000 records so far...
Fetched 4000 records so far...
Fetched 5000 records so far...
Fetched 6000 records so far...
Fetched 7000 records so far...
Fetched 8000 records so far...
Fetched 8787 records so far...


In [30]:
csv_path = f"""./fiscal_data/bofip.csv"""
df_all.to_csv(csv_path, index=False, encoding='utf-8')

### IV) Jurisprudence

In [49]:
juridictions = [
    "Conseil d'État",
    "Cour administrative d'appel",
    "Tribunal administratif",
    "Cour de cassation",
    "Cour d'appel",
    "Tribunal judiciaire"
]

On contourne en les téléchargeant manuellement et en mettant les zip dans le dossier Jurisprudence/

In [10]:
import os
import zipfile
import pandas as pd
from lxml import etree

# Dossier racine contenant tous les fichiers zip
ROOT_FOLDER = './Jurisprudence'
OUTPUT_CSV = './fiscal_data/jurisprudence_global.csv'

# Fonction de parsing d'un fichier XML unique
def parse_xml(file):
    parser = etree.XMLParser(recover=True)
    tree = etree.parse(file, parser)
    root = tree.getroot()

    id_doc = root.findtext('Donnees_Techniques/Identification')
    date_maj = root.findtext('Donnees_Techniques/Date_Mise_Jour')
    code_juridiction = root.findtext('Dossier/Code_Juridiction')
    juridiction = root.findtext('Dossier/Nom_Juridiction')
    numero_dossier = root.findtext('Dossier/Numero_Dossier')
    date_lecture = root.findtext('Dossier/Date_Lecture')
    avocat_requerant = root.findtext('Dossier/Avocat_Requerant')
    type_decision = root.findtext('Dossier/Type_Decision')
    type_recours = root.findtext('Dossier/Type_Recours')
    code_publication = root.findtext('Dossier/Code_Publication')
    solution = root.findtext('Dossier/Solution')
    date_audience = root.findtext('Audience/Date_Audience')
    numero_role = root.findtext('Audience/Numero_Role')
    formation_jugement = root.findtext('Audience/Formation_Jugement')

    # Pour le texte intégral, on va récupérer tout le contenu XML sous forme brute
    texte_integral_elem = root.find('Decision/Texte_Integral')
    texte_integral = ''
    if texte_integral_elem is not None:
        texte_integral = etree.tostring(texte_integral_elem, method="text", encoding="unicode").strip()

    return {
        "id": id_doc,
        "date_maj": date_maj,
        "code_juridiction": code_juridiction,
        "juridiction": juridiction,
        "numero_dossier": numero_dossier,
        "date_decision": date_lecture,
        "avocat_requerant": avocat_requerant,
        "type_decision": type_decision,
        "type_recours": type_recours,
        "code_publication": code_publication,
        "solution": solution,
        "date_audience": date_audience,
        "numero_role": numero_role,
        "formation_jugement": formation_jugement,
        "texte_integral": texte_integral
    }

# Préparer le fichier de sortie (création des colonnes au début)
columns = ["id", "date_maj", "code_juridiction", "juridiction", "numero_dossier", 
          "date_decision", "avocat_requerant", "type_decision", "type_recours",
          "code_publication", "solution", "date_audience", "numero_role",
          "formation_jugement", "texte_integral"]
pd.DataFrame(columns=columns).to_csv(OUTPUT_CSV, index=False, encoding='utf-8')

# Parcours de tous les fichiers zip dans le dossier
for root, dirs, files in os.walk(ROOT_FOLDER):
    for filename in files:
        if filename.endswith('.zip'):
            zip_path = os.path.join(root, filename)
            print(f"Traitement du fichier : {zip_path}")

            with zipfile.ZipFile(zip_path, 'r') as z:
                for file_in_zip in z.namelist():
                    if file_in_zip.endswith('.xml'):
                        try:
                            with z.open(file_in_zip) as f:
                                record = parse_xml(f)
                                pd.DataFrame([record]).to_csv(OUTPUT_CSV, mode='a', header=False, index=False, encoding='utf-8')
                        except Exception as e:
                            print(f"Erreur sur le fichier {file_in_zip}: {e}")

Traitement du fichier : ./Jurisprudence/CE_202408.zip
Traitement du fichier : ./Jurisprudence/CE_202409.zip
Traitement du fichier : ./Jurisprudence/CAA_202309.zip
Traitement du fichier : ./Jurisprudence/TA_202408.zip
Traitement du fichier : ./Jurisprudence/TA_202409.zip
Traitement du fichier : ./Jurisprudence/CAA_202308.zip
Traitement du fichier : ./Jurisprudence/CAA_202409.zip
Traitement du fichier : ./Jurisprudence/TA_202308.zip
Traitement du fichier : ./Jurisprudence/TA_202309.zip
Traitement du fichier : ./Jurisprudence/CAA_202408.zip
Traitement du fichier : ./Jurisprudence/CE_202308.zip
Traitement du fichier : ./Jurisprudence/CE_202309.zip
Traitement du fichier : ./Jurisprudence/CE_202304.zip
Traitement du fichier : ./Jurisprudence/CE_202310.zip
Traitement du fichier : ./Jurisprudence/TA_202301.zip
Traitement du fichier : ./Jurisprudence/CE_202311.zip
Traitement du fichier : ./Jurisprudence/CE_202305.zip
Traitement du fichier : ./Jurisprudence/CAA_202401.zip
Traitement du fichier :

In [11]:
df_juri = pd.read_csv('./fiscal_data/jurisprudence_global.csv')

  df_juri = pd.read_csv('./fiscal_data/jurisprudence_global.csv')


In [12]:
df_juri.head()

Unnamed: 0,id,date_maj,code_juridiction,juridiction,numero_dossier,date_decision,avocat_requerant,type_decision,type_recours,code_publication,solution,date_audience,numero_role,formation_jugement,texte_integral
0,DCE_470759_20240827.xml,2024-08-29,CE,Section du Contentieux,470759,2024-08-27,OCCHIPINTI,Décision,Plein contentieux,C,Renvoi après cassation,2024-07-05,24412,4ème chambre jugeant seule,Vu la procédure suivante :\nPar une décision d...
1,DCE_471080_20240827.xml,2024-08-29,CE,Section du Contentieux,471080,2024-08-27,,Décision,Excès de pouvoir,C,Rejet,2024-07-05,24412,4ème chambre jugeant seule,Vu la procédure suivante :\n1° Sous le n° 4710...
2,DCE_475455_20240827.xml,2024-08-29,CE,Section du Contentieux,475455,2024-08-27,SCP RICHARD,Décision,Plein contentieux,C,Renvoi après cassation,2024-07-05,24412,4ème chambre jugeant seule,Vu la procédure suivante :\nMme D C a porté pl...
3,DCE_485331_20240827.xml,2024-08-29,CE,Section du Contentieux,485331,2024-08-27,"SARL MATUCHANSKY, POUPOT, VALDELIEVRE, RAMEIX",Décision,Plein contentieux,C,Renvoi après cassation,2024-07-05,24412,4ème chambre jugeant seule,Vu la procédure suivante :\nMme D C a porté pl...
4,DCE_488162_20240827.xml,2024-08-29,CE,Section du Contentieux,488162,2024-08-27,"SCP ROCHETEAU, UZAN-SARANO & GOULET",Décision,Plein contentieux,C,Renvoi après cassation,2024-07-05,24412,4ème chambre jugeant seule,Vu la procédure suivante :\nLe directeur de l'...


### V) QA assemblée nationale

In [6]:
import requests
import tarfile
import io
import pandas as pd
from lxml import etree
import re
import os
from pathlib import Path
import shutil

def download_taz_files(year: int, output_dir: str = "./QA_AN") -> list:
    """
    Télécharge les fichiers .taz pour une année donnée
    
    Args:
        year (int): Année à télécharger
        output_dir (str): Dossier de sortie pour les fichiers .taz
        
    Returns:
        list: Liste des chemins des fichiers .taz téléchargés
    """
    # Créer le dossier de sortie s'il n'existe pas
    os.makedirs(output_dir, exist_ok=True)
    
    base_url = f"https://echanges.dila.gouv.fr/OPENDATA/Questions-Reponses/AN/{year}/"
    
    # Récupérer la liste des fichiers disponibles
    index_page = requests.get(base_url)
    index_page.raise_for_status()
    
    # Trouver les fichiers .taz
    files = sorted(set(re.findall(r'ANQ\d{8}\.taz', index_page.text)))
    print(f"✅ {len(files)} fichiers trouvés pour l'année {year}")
    
    downloaded_files = []
    
    # Télécharger chaque fichier
    for fichier in files:
        url = base_url + fichier
        taz_path = os.path.join(output_dir, fichier)
        
        # Vérifier si le fichier existe déjà
        if os.path.exists(taz_path):
            print(f"Le fichier {fichier} existe déjà, on passe au suivant")
            downloaded_files.append(taz_path)
            continue
            
        print(f"Téléchargement de {fichier}...")
        try:
            response = requests.get(url)
            response.raise_for_status()
            
            # Sauvegarder le fichier .taz
            with open(taz_path, 'wb') as f:
                f.write(response.content)
            downloaded_files.append(taz_path)
            
        except Exception as e:
            print(f"❌ Erreur lors du téléchargement de {fichier}: {e}")
            continue
    
    return downloaded_files

def extract_and_parse_xml(taz_file: str) -> pd.DataFrame:
    """
    Extrait et parse les fichiers XML d'un fichier .taz
    
    Args:
        taz_file (str): Chemin vers le fichier .taz
        
    Returns:
        pd.DataFrame: DataFrame contenant les données extraites
    """
    all_questions = []
    
    try:
        with tarfile.open(taz_file, 'r') as tar:
            for member in tar.getmembers():
                if member.isfile() and member.name.endswith(".xml"):
                    xml_file = tar.extractfile(member)
                    parser = etree.XMLParser(recover=True)
                    tree = etree.parse(xml_file, parser)
                    root = tree.getroot()
                    
                    # Extraire les métadonnées du JO
                    numero_jo = root.findtext('numero_jo')
                    date_publication = root.findtext('date_publication')
                    
                    # Parcourir les questions/réponses
                    for qr in root.findall('.//questionReponse'):
                        questions = qr.findall('question')
                        for q in questions:
                            question_num = q.findtext('numero')
                            question_page = q.findtext('page_jo')
                            
                            reponse = qr.find('reponse')
                            if reponse is not None:
                                reponse_page = reponse.findtext('page_jo')
                                reponse_texte = reponse.findtext('texteReponse')
                            else:
                                reponse_page = None
                                reponse_texte = None
                            
                            all_questions.append({
                                'numero_jo': numero_jo,
                                'date_publication': date_publication,
                                'question_numero': question_num,
                                'question_page': question_page,
                                'reponse_page': reponse_page,
                                'reponse_texte': reponse_texte
                            })
    except Exception as e:
        print(f"❌ Erreur lors du traitement de {taz_file}: {e}")
    
    return pd.DataFrame(all_questions)

def process_year(year: int, output_dir: str = "./QA_AN", fiscal_data_dir: str = "./fiscal_data"):
    """
    Traite une année complète : télécharge les fichiers .taz, extrait les données
    et crée un fichier CSV final
    
    Args:
        year (int): Année à traiter
        output_dir (str): Dossier pour les fichiers .taz
        fiscal_data_dir (str): Dossier pour le fichier CSV final
    """
    # Créer les dossiers nécessaires
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(fiscal_data_dir, exist_ok=True)
    
    # Télécharger les fichiers .taz
    taz_files = download_taz_files(year, output_dir)
    
    # Traiter chaque fichier .taz
    all_data = []
    for taz_file in taz_files:
        print(f"Traitement de {os.path.basename(taz_file)}...")
        df = extract_and_parse_xml(taz_file)
        if not df.empty:
            all_data.append(df)
    
    # Combiner tous les DataFrames
    if all_data:
        final_df = pd.concat(all_data, ignore_index=True)
        
        # Sauvegarder le fichier CSV final
        output_file = os.path.join(fiscal_data_dir, f"questions_reponses_AN_{year}.csv")
        final_df.to_csv(output_file, index=False, encoding='utf-8')
        print(f"✅ Fichier CSV créé : {output_file}")
        print(f"✅ {len(final_df)} questions traitées")
    else:
        print("❌ Aucune donnée n'a été extraite")



In [8]:
process_year(2024)

✅ 38 fichiers trouvés pour l'année 2024
Le fichier ANQ20240001.taz existe déjà, on passe au suivant
Le fichier ANQ20240002.taz existe déjà, on passe au suivant
Le fichier ANQ20240003.taz existe déjà, on passe au suivant
Le fichier ANQ20240004.taz existe déjà, on passe au suivant
Le fichier ANQ20240005.taz existe déjà, on passe au suivant
Le fichier ANQ20240006.taz existe déjà, on passe au suivant
Le fichier ANQ20240007.taz existe déjà, on passe au suivant
Le fichier ANQ20240008.taz existe déjà, on passe au suivant
Le fichier ANQ20240009.taz existe déjà, on passe au suivant
Le fichier ANQ20240010.taz existe déjà, on passe au suivant
Le fichier ANQ20240011.taz existe déjà, on passe au suivant
Le fichier ANQ20240012.taz existe déjà, on passe au suivant
Le fichier ANQ20240013.taz existe déjà, on passe au suivant
Le fichier ANQ20240014.taz existe déjà, on passe au suivant
Le fichier ANQ20240015.taz existe déjà, on passe au suivant
Le fichier ANQ20240016.taz existe déjà, on passe au suivant


### VI) Senat

In [1]:
import requests
import zipfile
import io
import pandas as pd
import re
import ast

def extract_questions_reponses_senat_sql():
    # Télécharger le fichier zip depuis data.gouv
    url = "https://www.data.gouv.fr/en/datasets/r/407fcc2b-9c31-4f3d-b22b-c9b30cb69108"
    response = requests.get(url)
    response.raise_for_status()

    # Lire le zip en mémoire
    with zipfile.ZipFile(io.BytesIO(response.content)) as z:
        sql_filename = [name for name in z.namelist() if name.endswith('.sql')][0]
        print(f"Lecture du fichier SQL : {sql_filename}")

        with z.open(sql_filename) as f:
            sql_content = f.read().decode("ISO-8859-15")

    # Extraire les lignes INSERT INTO
    insert_lines = re.findall(r"INSERT INTO .*? VALUES\s*(.*);", sql_content, flags=re.DOTALL)

    # On récupère toutes les valeurs
    rows = []
    for block in insert_lines:
        # On sépare les tuples
        tuples = re.findall(r"\((.*?)\)", block)
        for tup in tuples:
            # Séparer les champs individuellement
            fields = []
            splitted = re.split(r",(?![^']*'\s*,)", tup)  # gère les virgules dans les valeurs texte
            for field in splitted:
                # Nettoyage des quotes SQL
                field = field.strip()
                if field == 'NULL':
                    fields.append(None)
                else:
                    fields.append(ast.literal_eval(field))
            rows.append(fields)

    # Colonnes présentes dans le dump Sénat
    columns = [
        'id',
        'numero_question',
        'date_question',
        'intitule_ministere',
        'auteur',
        'groupe',
        'titre',
        'url_question',
        'texte_question',
        'url_reponse',
        'texte_reponse',
        'date_reponse'
    ]

    # Création du DataFrame
    df = pd.DataFrame(rows, columns=columns)

    # Export CSV
    df.to_csv("questions_reponses_senat.csv", index=False, encoding="utf-8")
    print(f"✅ Extraction SQL Sénat terminée avec {len(df)} questions")
    return df

In [27]:
extract_questions_reponses_senat()

ParserError: Error tokenizing data. C error: Expected 2 fields in line 3, saw 3
