In [None]:
import requests
import json
from tqdm import tqdm
from datetime import datetime
from typing import Generator
import os
import io
from dotenv import load_dotenv

load_dotenv()

!mkdir -p ./cache/articles
CACHE_DIR = "./cache"

CLIENT_ID = os.environ.get("CLIENT_ID", None)
assert CLIENT_ID

CLIENT_SECRET = os.environ.get("CLIENT_SECRET", None)
assert CLIENT_SECRET

res = requests.post(
    "https://oauth.piste.gouv.fr/api/oauth/token",
    data={
        "grant_type": "client_credentials",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "scope": "openid",
    },
)
token = json.loads(res.content)["access_token"]

In [None]:
headers = {
    "Authorization": f"Bearer {token}",
    "accept": "application/json",
    "Content-Type": "application/json",
}

OUTPUT_REPO_PATH = "../legifrance"

In [None]:
URL_BASE = "https://api.piste.gouv.fr/dila/legifrance/lf-engine-app"

res = requests.post(
    URL_BASE + "/list/code",
    json.dumps(
        {
            "pageSize": 100,
            "pageNumber": 1,
            "states": ["VIGUEUR"],  # "VIGUEUR", "ABROGE", "VIGEUR_DIIF"
        }
    ),
    headers=headers,
)

l = json.loads(res.content)

In [None]:
for i, c in enumerate(l["results"]):
    if c["etat"] == "VIGUEUR":
        print(f"{i}: {c['titre']} - {c['cid']}")

In [None]:
code = "LEGITEXT000044595989"

In [None]:
def fetch_tm():
    date_str = datetime.now().strftime("%Y-%m-%d")

    res = requests.post(
        URL_BASE + "/consult/legi/tableMatieres",
        json.dumps({"textId": code, "nature": "CODE", "date": date_str}),
        headers=headers,
    )

    tm = json.loads(res.content)

    # for debugging
    with open("code.json", "w") as f:
        f.write(json.dumps(tm, indent=4))

    return tm


def _yield_article_ids(tm):
    if len(tm["articles"]) > 0:
        for article in tm["articles"]:
            yield (article["cid"], article["id"])

    if len(tm["sections"]) > 0:
        for section in tm["sections"]:
            yield from _yield_article_ids(section)


def _fetch_and_cache_article_with_history(path: str, cid: str):
    res = requests.post(
        URL_BASE + "/consult/getArticleByCid",
        json.dumps({"cid": cid}),
        headers=headers,
    )
    assert res.status_code == 200, res.status_code

    article = json.loads(res.content)

    with open(path, "w") as f:
        json.dump(article, f, indent=4)

    return article


def fetch_article_with_history(cid: str, id: str):
    path = f"{CACHE_DIR}/articles/{cid}.json"

    try:
        with open(path, "r") as f:
            article = json.load(f)

            ids = {a["id"] for a in article["listArticle"]}
            if id not in ids:
                print(f"Outdated {cid}, refetching")
                return _fetch_and_cache_article_with_history(path, cid)

            return article

    except (IOError, ValueError):
        return _fetch_and_cache_article_with_history(path, cid)


def fetch_articles(tm):
    return [
        fetch_article_with_history(cid, i)
        for (cid, i) in tqdm(list(_yield_article_ids(tm)))
    ]

In [None]:
def get_commits(article):
    commits = {}
    for version in article["listArticle"]:
        modifs = version["lienModifications"]
        date = version["dateDebut"]
        textCids = sorted({m["textCid"] for m in modifs})

        if len(textCids) == 0:
            textCids = {"???"}
            # TODO

        commitId = f"{date}-{'-'.join(textCids)}"
        # TODO
        # TODO add nota?
        commitTitle = "Modifications par " + " & ".join(
            {m["textTitle"] if m["textTitle"] is not None else "?TODO?" for m in modifs}
        )
        text = version["texteHtml"]  # TODO html?

        assert commitId not in commits
        commits[commitId] = {
            "commitTitle": commitTitle,
            "articles": {version["cid"]: text},
            "date": date,
        }

    return commits


def merge_commits(all_commits):
    merged = {}
    for partial in all_commits:
        for commitId, c in partial.items():
            if commitId in merged:
                assert merged[commitId]["date"] == c["date"]
                # TODO: humans ...
                # assert merged[commitId]['commitTitle'] == c['commitTitle'], merged[commitId]['commitTitle']  + " !== " + c['commitTitle']

                for articleCid, text in c["articles"].items():
                    assert articleCid not in merged[commitId]["articles"]
                    merged[commitId]["articles"][articleCid] = text

            else:
                merged[commitId] = c

    return merged


def last_text(commits: list[dict], cid):
    for c in reversed(commits):
        if cid in c["articles"]:
            return c["articles"][cid]

    return "<TODO>"


def print_tm(tm, commits, file, level=1):
    if tm["etat"] == "ABROGE":
        return

    print(f'{("#" * level)} {tm["title"]}', file=file)

    if len(tm["articles"]) > 0:
        for article in tm["articles"]:
            if article["etat"] != "ABROGE":
                print(f"{('#' * (level + 1))} Article {article['num']}", file=file)
                print(last_text(commits, article["cid"]), file=file)
                print("\n", file=file)

    if len(tm["sections"]) > 0:
        for section in tm["sections"]:
            print_tm(section, commits, file=file, level=level + 1)

    if "commentaire" in tm and tm["commentaire"] is not None:
        print(tm["commentaire"], file=file)
    # assert False, tm
    # TODO

def process(tm: dict, articles: list[dict]) -> Generator[str, None, None]:
    all_commits = [get_commits(a) for a in articles]
    merged = merge_commits(all_commits)
    sorted_commits = sorted(merged.values(), key=lambda c: c["date"])

    for i in range(0, len(sorted_commits) - 1):
        f = io.StringIO()
        print_tm(tm, sorted_commits[: (i + 1)], file=f)

        date = sorted_commits[i]["date"] / 1000  # TODO ms vs s
        title = sorted_commits[i]["commitTitle"]

        yield (f.getvalue(), date, title)

tm = fetch_tm()
articles = fetch_articles(tm)
commits = process(tm, articles)

In [None]:
!rm -rf ../legifrance
!mkdir ../legifrance
!git init ../legifrance

import os
import subprocess
import pytz

tz = pytz.timezone("UTC")

for (full_code_text, date, title) in commits:
    with open(f"{OUTPUT_REPO_PATH}/{tm['title']}.md", "w") as f:
        f.write(full_code_text)

    date_dt = datetime.fromtimestamp(date, tz)

    # TODO
    if date_dt.year >= 2038:
        date_dt = datetime(2038, 1, 1)

    date_str = date_dt.isoformat()
    date_with_format_str = "format:iso8601:" + date_str

    env = os.environ.copy()
    env["GIT_COMMITTER_DATE"] = date_with_format_str

    subprocess.run(["git", "add", "."], cwd=OUTPUT_REPO_PATH)
    subprocess.run(
        [
            "git",
            "commit",
            "--date",
            date_with_format_str,
            "-m",
            title,
        ],
        env=env,
        cwd=OUTPUT_REPO_PATH,
    )

subprocess.run(
    [
        "git",
        "remote",
        "add",
        "origin",
        "git@github.com:LexHub-project/legifrance.git",
    ],
    cwd=OUTPUT_REPO_PATH,
)

subprocess.run(
    [
        "git",
        "push",
        "-f",
        "origin",
        "main",
    ],
    cwd=OUTPUT_REPO_PATH,
)