In [31]:
# *** INITIALISATION ***#

import os
from pathlib import Path
from dotenv import load_dotenv
import datetime

load_dotenv()

# Timestamp
date_now = datetime.date.today().isoformat()
datetime_now = datetime.datetime.now().isoformat()

root_dir = os.path.abspath(os.curdir)
print(root_dir)

/home/colin/git/decp-airflow


In [32]:
# *** TÉLÉCHARGEMENT ***#

import pandas as pd
from requests import get

# decp_augmente_valides_file: Path = Path(f'data/decp_augmente_valides_{date_now}.csv')
decp_augmente_valides_file: Path = Path(f"data/decp_augmente_valides_2024-04-12.csv")


if not (os.path.exists(decp_augmente_valides_file)):
    request = get(os.getenv("DECP_ENRICHIES_VALIDES_URL"))
    with open(decp_augmente_valides_file, "wb") as file:
        file.write(request.content)
else:
    print("DECP d'aujourd'hui déjà téléchargées")

df: pd.DataFrame = pd.read_csv(
    decp_augmente_valides_file, sep=";", dtype="object", index_col=None
)

DECP d'aujourd'hui déjà téléchargées


In [33]:
# *** ANALYSE DE BASE ***#

# df.info(verbose=True)
# obsolete number of rows 994123
# valid number of rows 837115

In [34]:
from numpy import NaN

df.replace([NaN, None], "", inplace=True, regex=False)
# df[['id', 'datePublicationDonnees']].loc[df['datePublicationDonnees'].str.contains("September")]

In [35]:
# *** REDRESSEMENT ***#

# Dates

columns_date = ["datePublicationDonnees", "dateNotification"]

date_replacements = {
    # ID marché invalide et SIRET de l'acheteur
    "0002-11-30": "",
    "September, 16 2021 00:00:00": "2021-09-16",  # 20007695800012 19830766200017 (plein !)
    "16 2021 00:00:00": "",
    "0222-04-29": "2022-04-29",  # 202201L0100
    "0021-12-05": "2022-12-05",  # 20222022/1400
    "0001-06-21": "",  # 0000000000000000 21850109600018
    "0019-10-18": "",  # 0000000000000000 34857909500012
    "5021-02-18": "2021-02-18",  # 20213051200 21590015000016
    "2921-11-19": "",  # 20220057201 20005226400013
    "0022-04-29": "2022-04-29",  # 2022AOO-GASL0100 25640454200035
}

for col in columns_date:
    df[col] = df[col].replace(date_replacements, regex=False)


# Nombres

df["dureeMois"] = df["dureeMois"].replace("", NaN)
df["montant"] = df["montant"].replace("", NaN)

# Identifiants de marchés

id_replacements = {"[,\./]": "_"}

df["id"] = df["id"].replace(id_replacements, regex=True)

# Nature

nature_replacements = {"Marche": "Marché", "subsequent": "subséquent"}

df["nature"] = df["nature"].str.capitalize()
df["nature"] = df["nature"].replace(nature_replacements, regex=True)

In [7]:
# ***  TYPES DE DONNÉES ***#

numeric_dtypes = {
    "dureeMois": "Int64",  # contrairement à int64, Int64 autorise les valeurs nulles https://pandas.pydata.org/docs/user_guide/integer_na.html
    "montant": "float64",
}

for column in numeric_dtypes:
    df[column] = df[column].astype(numeric_dtypes[column])


date_dtypes = ["datePublicationDonnees", "dateNotification"]

for column in date_dtypes:
    df[column] = pd.to_datetime(df[column], format="mixed", dayfirst=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 837135 entries, 0 to 837134
Data columns (total 54 columns):
 #   Column                              Non-Null Count   Dtype         
---  ------                              --------------   -----         
 0   titulaire_denominationSociale_1     837135 non-null  object        
 1   titulaire_id_1                      837135 non-null  object        
 2   titulaire_typeIdentifiant_1         837135 non-null  object        
 3   titulaire_denominationSociale_2     837135 non-null  object        
 4   titulaire_id_2                      837135 non-null  object        
 5   titulaire_typeIdentifiant_2         837135 non-null  object        
 6   titulaire_denominationSociale_3     837135 non-null  object        
 7   titulaire_id_3                      837135 non-null  object        
 8   titulaire_typeIdentifiant_3         837135 non-null  object        
 9   procedure                           837135 non-null  object        
 10  nature  

array(['Marché', 'Accord-cadre', 'MARCHE', 'Marché subséquent',
       'ACCORD-CADRE', 'MARCHE SUBSEQUENT', '', 'Marché de partenariat',
       'MARCHE DE PARTENARIAT'], dtype=object)

In [8]:
# ***  ANALYSES AVANCÉES ***#

# Les sources d'où proviennent les données
sources = df["source"].unique()

for source in sources:
    print(
        f"""
# {source}
Nombre de marchés : {df[["source"]].loc[df["source"]==source].index.size}
Nombre d'acheteurs uniques : {len(df[["acheteur.id", "source"]].loc[df["source"]==source]['acheteur.id'].unique())}
"""
    )


# megalis-bretagne
Nombre de marchés : 21728
Nombre d'acheteurs uniques : 707


# data.gouv.fr_pes
Nombre de marchés : 279345
Nombre d'acheteurs uniques : 10298


# e-marchespublics
Nombre de marchés : 75614
Nombre d'acheteurs uniques : 1630


# data.gouv.fr_aife
Nombre de marchés : 255022
Nombre d'acheteurs uniques : 3422


# decp_aws
Nombre de marchés : 125312
Nombre d'acheteurs uniques : 3759


# marches-publics.info
Nombre de marchés : 71327
Nombre d'acheteurs uniques : 4787


# ternum-bfc
Nombre de marchés : 8747
Nombre d'acheteurs uniques : 385


# 
Nombre de marchés : 40
Nombre d'acheteurs uniques : 1



In [9]:
# VERS LE FORMAT DECP-TABLE-SCHEMA #

# Explosion des champs titulaires sur plusieurs lignes

df["titulaire.id"] = [[] for r in range(len(df))]
df["titulaire.denominationSociale"] = [[] for r in range(len(df))]
df["titulaire.typeIdentifiant"] = [[] for r in range(len(df))]

for num in range(1, 4):
    mask = df[f"titulaire_id_{num}"] != ""
    df.loc[mask, "titulaire.id"] += df.loc[mask, f"titulaire_id_{num}"].apply(
        lambda x: [x]
    )
    df.loc[mask, "titulaire.denominationSociale"] += df.loc[
        mask, f"titulaire_denominationSociale_{num}"
    ].apply(lambda x: [x])
    df.loc[mask, "titulaire.typeIdentifiant"] += df.loc[
        mask, f"titulaire_typeIdentifiant_{num}"
    ].apply(lambda x: [x])

df = df.explode(
    ["titulaire.id", "titulaire.denominationSociale", "titulaire.typeIdentifiant"],
    ignore_index=True,
)

In [10]:
# Ajout colonnes manquantes

df["uid"] = df["acheteur.id"] + df["id"]
df["donneesActuelles"] = ""  # TODO
df["anomalies"] = ""  # TODO

df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 930599 entries, 0 to 930598
Data columns (total 60 columns):
 #   Column                              Non-Null Count   Dtype         
---  ------                              --------------   -----         
 0   titulaire_denominationSociale_1     930599 non-null  object        
 1   titulaire_id_1                      930599 non-null  object        
 2   titulaire_typeIdentifiant_1         930599 non-null  object        
 3   titulaire_denominationSociale_2     930599 non-null  object        
 4   titulaire_id_2                      930599 non-null  object        
 5   titulaire_typeIdentifiant_2         930599 non-null  object        
 6   titulaire_denominationSociale_3     930599 non-null  object        
 7   titulaire_id_3                      930599 non-null  object        
 8   titulaire_typeIdentifiant_3         930599 non-null  object        
 9   procedure                           930599 non-null  object        
 10  nature  

In [11]:
# *** ENREGISTREMENT AU FORMAT CSV ET PARQUET ***#

import shutil

distdir = Path("./dist")
if os.path.exists(distdir):
    shutil.rmtree("dist")
os.mkdir("dist")
os.chdir(distdir)

# CSV

# Schéma cible
df_standard = pd.read_csv(
    "https://raw.githubusercontent.com/ColinMaudry/decp-table-schema/main/exemples/exemple-valide.csv",
    index_col=None,
)

cible_colonnes = df_standard.columns
df = df[cible_colonnes]
# df_sans_titulaires = df.drop(columns=['titulaire.id', 'titulaire.denominationSociale', 'titulaire.typeIdentifiant'])

df.to_csv("decp.csv", index=None)
df_sans_titulaires = df.drop(
    columns=[
        "titulaire.id",
        "titulaire.denominationSociale",
        "titulaire.typeIdentifiant",
    ]
)
df_sans_titulaires = df.drop_duplicates()
df_sans_titulaires.to_csv("decp-sans-titulaires.csv", index=None)


df.to_parquet("decp.parquet", index=None)

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.

In [None]:
# VALIDATION #

from tableschema import Table, CastError

table = Table(
    "decp.csv",
    schema="https://raw.githubusercontent.com/ColinMaudry/decp-table-schema/main/schema.json",
)
try:
    table.read(limit=50)
except CastError as exception:
    print(exception.errors)

In [None]:
# DATA PACKAGE #

from frictionless import Package, Resource, Pipeline, steps

decp_resource: Resource = Resource(path="decp.csv")

# Cette méthode détecte les caractéristiques du CSV et tente de deviner les datatypes
decp_resource.infer()

decp_resource = decp_resource.transform(
    Pipeline(
        steps=[
            steps.field_update(name="acheteur.id", descriptor={"type": "string"}),
            steps.field_update(name="acheteur.nom", descriptor={"type": "string"}),
            steps.field_update(name="titulaire.id", descriptor={"type": "string"}),
        ]
    )
)

decp_sans_titulaire_resource = Resource(path="decp-sans-titulaires.csv")
decp_sans_titulaire_resource.infer()


decp_sans_titulaire_resource = decp_sans_titulaire_resource.transform(
    Pipeline(
        steps=[
            steps.field_update(name="acheteur.id", descriptor={"type": "string"}),
            steps.field_update(name="acheteur.nom", descriptor={"type": "string"}),
            steps.field_remove(name="titulaire.id"),
            steps.field_remove(name="titulaire.denominationSociale"),
            steps.field_remove(name="titulaire.typeIdentifiant"),
        ]
    )
)

package = Package(
    name="decp",
    title="DECP CSV",
    description="Données essentielles de la commande publique (FR) au format CSV.",
    resources=[decp_resource],
    # it's possible to provide all the official properties like homepage, version, etc
)

package.to_json("datapackage.json")

In [20]:
# *** SQLITE ***#

from datapackage_to_datasette import datapackage_to_datasette

if os.path.exists("decp.sqlite"):
    os.remove("decp.sqlite")

datapackage_to_datasette(
    dbname="decp.sqlite",
    data_package="datapackage.json",
    metadata_filename="datasette_metadata.json",
    write_mode="replace",
)

os.chdir("/home/git/decp-airflow")

FrictionlessException: [package-error] The data package has an error: cannot retrieve metadata "datapackage.json" because "[Errno 2] No such file or directory: 'datapackage.json'" 

In [None]:
# *** PUBLICATION SUR DATA.GOUV.FR ***#

import requests
import json

api_key = os.getenv("DATAGOUVFR_API_KEY")

api = "https://www.data.gouv.fr/api/1"
dataset_id = "608c055b35eb4e6ee20eb325"
resource_id_decp = "8587fe77-fb31-4155-8753-f6a3c5e0f5c9"
# resource_id_sans_titulaires="834c14dd-037c-4825-958d-0a841c4777ae"
resource_id_datapackage = "65194f6f-e273-4067-8075-56f072d56baf"
resource_id_sqlite = "c6b08d03-7aa4-4132-b5b2-fd76633feecc"


def update_resource(api, dataset_id, resource_id, file_path, api_key):
    url = f"{api}/datasets/{dataset_id}/resources/{resource_id}/upload/"
    headers = {"X-API-KEY": api_key}
    files = {"file": open(file_path, "rb")}
    response = requests.post(url, files=files, headers=headers)
    return response.json()


print("Mise à jour de decp.csv...")
print(
    json.dumps(
        update_resource(api, dataset_id, resource_id_decp, "decp.csv", api_key),
        indent=4,
    )
)

# print("\nMise à jour de decp-sans-titulaires.csv...")
# print(json.dumps(update_resource(api, dataset_id, resource_id_sans_titulaires, "decp-sans-titulaires.csv", api_key), indent=4))

print("\nMise à jour de datapackage.json...")
print(
    json.dumps(
        update_resource(
            api, dataset_id, resource_id_datapackage, "datapackage.json", api_key
        ),
        indent=4,
    )
)

print("\nMise à jour de decp.sqlite...")
print(
    json.dumps(
        update_resource(api, dataset_id, resource_id_sqlite, "decp.sqlite", api_key),
        indent=4,
    )
)