In [13]:
from pathlib import Path
import json
import sys
import logging
import yaml
import requests
import jsonschema

In [14]:
logger = logging.getLogger(__name__)

In [15]:
DATA_GOUV_API = "https://www.data.gouv.fr/api/1"
RECOMMENDATION_SCORE = 50
CATALOG_SCHEMAS = 'https://schema.data.gouv.fr/schemas/schemas.json'
JSONSCHEMA_URL = "https://raw.githubusercontent.com/opendatateam/udata-recommendations/master/udata_recommendations/schema.json"

In [16]:
def consolidated_schemas():
    """Find TableSchema schemas that are consolidated"""
    r = requests.get(CATALOG_SCHEMAS)
    schemas = r.json()['schemas']
    return { 
        s['name']: s['consolidation_dataset_id'] 
        for s in schemas 
        if s['consolidation_dataset_id'] and s['schema_type'] == 'tableschema'
    }

In [17]:
def datasets_for_schema(schema):
    """Fetch datasets on datagouv with the schema attribute set to a specific value"""
    ids = []
    url = f"{DATA_GOUV_API}/datasets?schema={schema}&page_size=100"
    while True:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        data = response.json()

        ids.extend([d["id"] for d in data["data"]])
        if data["next_page"] is None:
            break
        url = data["next_page"]

    return ids

In [18]:
def build_recommendation(consolidated_dataset_id, dataset_id):
    return {
        "id": dataset_id,
        "recommendations": [
            {"id": consolidated_dataset_id, "score": RECOMMENDATION_SCORE}
        ],
    }

In [19]:
def validate_recommendations(recommendations):
    """" Validate recommendations according to the JSON schema"""
    r = requests.get(JSONSCHEMA_URL, timeout=10)
    r.raise_for_status()
    schema = r.json()

    jsonschema.validate(recommendations, schema=schema)

In [20]:
recommendations = []
for schema_id, schema_details in consolidated_schemas().items():
    consolidated_dataset_id = schema_details
    logger.info(
        f"Working on schema {schema_id}, consolidated on {consolidated_dataset_id}"
    )

    dataset_ids = datasets_for_schema(schema_id)
    logger.info(f"Found {len(dataset_ids)} associated with schema {schema_id}")

    recommendations.extend([
        build_recommendation(consolidated_dataset_id, d) for d in dataset_ids
    ])

ids = []
recommendations_clean = []
for r in recommendations:
    if r["id"] not in ids:
        ids.append(r["id"])
        recommendations_clean.append(r)
validate_recommendations(recommendations_clean)

etalab/schema-irve
etalab/schema-lieux-covoiturage
etalab/schema-stationnement


In [None]:
with open(TMP_FOLDER + '/recommendations.json', 'w') as fp:
    json.dump(recommendations, fp, indent=2)