This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.

If a copy of the MPL was not distributed with this file, You can obtain one at https://mozilla.org/MPL/2.0/.

# Populating the database

This notebook will guide you through the process of adding data to the database.

First we import the required libraries and check the connection works.

**Note for CGP**:
Because containers are spun up only when "poked", you may need to run this twice in order to give the container time to spin up if you receive a `TimeoutError` after making the request.

In [1]:
import httpx
import csv
import json
import os
import gzip
import asyncio
import time
import itertools
import pandas as pd
from datetime import datetime
from dateutil import parser

# whether to ingest synthetic data or real data
SYNTHETIC = True

AUDIENCE = os.getenv("AUTH0_AUDIENCE")
BASE_URL = os.getenv("INGESTION_BASE_URL")
AUTH0_DOMAIN = os.getenv("INGESTION_AUTH0_DOMAIN")
AUTH0_CLIENT_ID_UPDATER = os.getenv("INGESTION_AUTH0_CLIENT_ID_UPDATER")
AUTH0_CLIENT_SECRET_UPDATER = os.getenv("INGESTION_AUTH0_CLIENT_SECRET_UPDATER")
AUTH0_CLIENT_ID_ADMIN = os.getenv("INGESTION_AUTH0_CLIENT_ID_ADMIN")
AUTH0_CLIENT_SECRET_ADMIN = os.getenv("INGESTION_AUTH0_CLIENT_SECRET_ADMIN")


def log(response):
    to_print = f"{response.status_code}: " if response.status_code != 200 else ""
    if hasattr(response, "content") and response.content is not None and response.content != b"":
        try:
            to_print += json.dumps(json.loads(response.content), indent=4)
        except Exception as e:
            to_print += "<could not decode response>"
    else:
        to_print += "<no content>"
    print(to_print)


response = httpx.get(f"{BASE_URL}/heartbeat")

print(f"Welcome to {os.getenv('APP_NAME')}!")
log(response)

Welcome to flowkit-ui-backend!
{
    "datetime": "2023-03-17T15:23:46.249913+00:00",
    "docker_image": "flowminder/flowkit-ui-backend:890c0e0",
    "git_branch": "feature-scope-mapping",
    "git_commit": "890c0e0",
    "git_tag": null,
    "python_package": "flowkit-ui-backend",
    "python_version": "3.9.15",
    "api_version_url_appendix": "v1",
    "api_version": "1.2.1"
}


Then we obtain M2M tokens to execute the requests.

In [2]:
response = httpx.post(
    url=f"https://{AUTH0_DOMAIN}/oauth/token",
    headers={"Content-Type": "application/json"},
    data=f'{{"client_id":"{AUTH0_CLIENT_ID_ADMIN}","client_secret":"{AUTH0_CLIENT_SECRET_ADMIN}","audience":"{AUDIENCE}","grant_type":"client_credentials"}}',
)
admin_token = json.loads(response.content)["access_token"]
print(response)

response = httpx.post(
    url=f"https://{AUTH0_DOMAIN}/oauth/token",
    headers={"Content-Type": "application/json"},
    data=f'{{"client_id":"{AUTH0_CLIENT_ID_UPDATER}","client_secret":"{AUTH0_CLIENT_SECRET_UPDATER}","audience":"{AUDIENCE}","grant_type":"client_credentials"}}',
)
updater_token = json.loads(response.content)["access_token"]
print(response)

<Response [200 OK]>
<Response [200 OK]>


Now we get some info from the backend so we know what's already in the database.
If the database has been re-provisioned, this may come back empty. If that happens, don't worry and proceed to the next step where the cause for this issue will be rectified.

We'll do a quick check for `categories` but you can also check `languages`, `indicators` or any other top-level element in the `config.json` file.

In [3]:
response = httpx.get(
    url=f"{BASE_URL}/categories", headers={"Authorization": f"Bearer {admin_token}"}
)
log(response)
categories = json.loads(response.content)["categories"]

{
    "categories": []
}


If any of categories, indicators, spatial or temporal resolutions are missing, we need to load the config first and then repeat the data retrieval.

Since the payload can get quite large, we'll compress it before sending it to the API. The backend API supports both compressed and uncompressed requests; provided you set the appropriate encoding in the header:

```python
headers={
    # always send the type
    "Content-Type": "application/json",
    # encoding required for gzip-compressed payloads
    "Content-Encoding": "gzip",
    [...]
}
```

In [4]:
# get config directly from the resources
with open(f"{os.getenv('PACKAGE_NAME')}/src/impl/resources/config.json") as json_data:
    config = json.load(json_data)

response = httpx.post(
    url=f"{BASE_URL}/setup",
    headers={
        "Content-Type": "application/json",
        "Content-Encoding": "gzip",
        "Authorization": f"Bearer {admin_token}",
    },
    data=gzip.compress(json.dumps(config).encode("utf-8")),
    timeout=3600,
)
log(response)

204: <no content>


Either way, the db should now have a basic setup.
Let's check if we have all the metadata we need before we proceed.
While we're at it, we save the categories so we can use them for the ingestion in the next step.

In [5]:
response = httpx.get(
    url=f"{BASE_URL}/categories", headers={"Authorization": f"Bearer {admin_token}"}
)
log(response)
categories = json.loads(response.content)["categories"]

{
    "categories": [
        {
            "category_id": "residents",
            "type": "single_location",
            "order": 1,
            "flowgeek_url": "https://www.flowgeek.org/methods/calculating-mobility-indicators/residents-indicators",
            "label": "Residents",
            "description": "Residents-class indicators describe long-term (monthly) changes in the number of people whose home location is within each area.",
            "label_fr": "R\u00e9sidents",
            "description_fr": "Les indicateurs relatifs aux r\u00e9sidents d\u00e9crivent les variations (mensuelles) \u00e0 long terme du nombre de personnes dont le lieu de r\u00e9sidence se trouve dans chaque zone."
        },
        {
            "category_id": "relocations",
            "type": "flow",
            "order": 2,
            "flowgeek_url": "https://www.flowgeek.org/methods/calculating-mobility-indicators/relocation-indicators/",
            "label": "Relocation",
            "description"

## Data cleaning

The data format - although agreed upon in principle - is not well-defined enough to guarantee successful insertion.
Before we can ingest it, we need to pre-process it in order to

- remove empty/invalid data
- correctly format the dates
- check all expected fields are present
- columns are named/ordered correctly
- data is sorted (which helps increase ingestion speed)

In [6]:
real_data_files = {
    "residents": {"category_id": "residents", "srid": 3, "trid": 2},
    "relocations": {"category_id": "relocations", "srid": 3, "trid": 2},
    "presence": {"category_id": "presence", "srid": 3, "trid": 4},
    "movements": {"category_id": "movements", "srid": 3, "trid": 4},
}
synthetic_files = {
    "residents_admin3_monthly_small": {"category_id": "residents", "srid": 3, "trid": 2},
    "relocations_admin3_monthly_small": {"category_id": "relocations", "srid": 3, "trid": 2},
    "presence_admin3_daily_small": {"category_id": "presence", "srid": 3, "trid": 4},
    "movements_admin3_daily_small": {"category_id": "movements", "srid": 3, "trid": 4},
}
files = synthetic_files if SYNTHETIC else real_data_files

parent_dir = f"{os.getenv('PACKAGE_NAME')}/src/impl/resources"
data_dir = f"{parent_dir}/data/synthetic" if SYNTHETIC else f"{parent_dir}/data"

for file_name in files.keys():
    file_path = f"{data_dir}/{file_name}.csv"
    preprocessed_path = f"/tmp/{file_name}_preprocessed.csv"
    df = pd.read_csv(file_path)

    # make sure only rows with data are kept
    # then sort by date, and spatial unit(s) if applicable
    # finally rename columns
    if file_name in ["residents", "presence"]:
        # min columns: date, spatial unit, one data column
        df = df.dropna(thresh=3)
        df = df.sort_values(by=[df.columns[0], df.columns[1]])
        df = df.rename(columns={df.columns[0]: "date", df.columns[1]: "spatial_unit"})
    elif file_name in ["relocations", "movements"]:
        # min columns: date, 2 spatial units, one data column
        df = df.dropna(thresh=4)
        df = df.sort_values(by=[df.columns[0], df.columns[1], df.columns[2]])
        df = df.rename(
            columns={df.columns[0]: "date", df.columns[1]: "origin", df.columns[1]: "destination"}
        )

    df.to_csv(preprocessed_path, index=False)
    print(f"Saved CSV to {preprocessed_path}")

Saved CSV to /tmp/residents_admin3_monthly_small_preprocessed.csv
Saved CSV to /tmp/relocations_admin3_monthly_small_preprocessed.csv
Saved CSV to /tmp/presence_admin3_daily_small_preprocessed.csv
Saved CSV to /tmp/movements_admin3_daily_small_preprocessed.csv


We can check the files before starting the ingestion:

In [7]:
%%bash -s /tmp/residents_admin3_monthly_small_preprocessed.csv
# check the file
wc -l "$1"
echo '--------------------'
head -n 3 "$1"
echo '--------------------'
tail -n 3 "$1"

3129 /tmp/residents_admin3_monthly_small_preprocessed.csv
--------------------
month,pcod,residents,residents_perKm2,arrived,departed,delta_arrived,residents_diffwithref,abnormality,residents_pctchangewithref
2020-02-01,HT0111-01,544650,29650,446860,427110,19760,-880.0,-0.68,-0.16
2020-02-01,HT0111-02,75340,10880,33420,31480,1940,1950.0,0.86,2.65
--------------------
2020-07-01,HT1031-05,9630,170,1040,1040,0,0.0,0.0,0.0
2020-07-01,HT1032-01,1770,60,160,160,0,0.0,0.0,0.0
2020-07-01,HT1032-02,3030,220,420,420,0,0.0,0.0,0.0


## Data ingestion

Now we can load the data we want to ingest.
We need to process the CSV files, which contain all indicators and dates for one category but the API ingests data in smaller chunks (one API call per indicator per date).
It's still recommended to compress the request body using `gzip`.

In [8]:
CHUNK_SIZE = 50


def chunked_iterable(iterable, size):
    it = iter(iterable)
    while True:
        chunk = tuple(itertools.islice(it, size))
        if not chunk:
            break
        yield chunk


async def post_async(ds, client):
    return await client.post(
        url=f"{BASE_URL}/data",
        headers={
            "Content-Type": "application/json",
            "Content-Encoding": "gzip",
            "Authorization": f"Bearer {admin_token}",
        },
        data=gzip.compress(json.dumps(ds, default=str).encode("utf-8")),
        timeout=3600,
    )


async def ingest_data(file_name):
    start_time = time.monotonic()

    print(f"Processing file {file_name}.csv")
    f = files[file_name]

    # get the category object
    category_id = f["category_id"]
    category = [c for c in categories if c["category_id"] == category_id][0]
    # get all indicators for that category
    response = httpx.get(
        url=f"{BASE_URL}/indicators_for_category/{category_id}",
        headers={"Authorization": f"Bearer {admin_token}"},
    )
    indicators = json.loads(response.content)["indicators"]
    print(f"Found {len(indicators)} indicators for category {category_id}")

    # get column names and order from csv
    columns = []
    column_to_indicator_id = {}

    with open(f"/tmp/{file_name}_preprocessed.csv") as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=",")
        for row in csv_reader:
            columns = row
            break

        for col in columns:
            column_to_indicator_id[col] = f"{category_id}.{col}"

        # sort data by ID for easier processing
        indicators_by_id = {i["indicator_id"]: i for i in indicators}

        # collect all datasets in a dict by indicator ID and date - that way it doesn't matter whether the CSV file is ordered
        datasets = {}
        total_num = 0
        # we already consumed the first row so can continue here with the same reader
        for row in csv_reader:
            # check each column in each row
            for col in columns:
                if (
                    col in column_to_indicator_id
                    and column_to_indicator_id[col] in indicators_by_id
                ):
                    value = row[columns.index(col)]
                    # skip "None" values
                    if value in ["NaN", "Inf", "-Inf", ""]:
                        continue

                    # make sure to use the correct row for the date and the correct datetime format
                    date_string = row[0]
                    dt = parser.parse(date_string)

                    # not one per indicator but one per indicator per date
                    indicator = indicators_by_id[column_to_indicator_id[col]]
                    indicator_id = indicator["indicator_id"]
                    datasets.setdefault(indicator_id, {})

                    if date_string not in datasets[indicator_id]:
                        total_num += 1

                    datasets[indicator_id].setdefault(
                        date_string,
                        {
                            "metadata": {
                                "revision": "v0.1-demo",
                                # adding a date here which will be overwritten later when it is actually added to the db
                                # this is to avoid a fastapi.exceptions.RequestValidationError for checking the length of a "None" type
                                "date_added": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
                                "category_id": category_id,
                                "indicator_id": indicator_id,
                                "srid": f["srid"],
                                "trid": f["trid"],
                                "dt": dt,
                            },
                            "data_type": category["type"],
                            "data_input": [],
                        },
                    )

                    datasets[indicator_id][date_string]["data_input"].append(
                        {
                            "spatial_unit_ids": [row[1]]
                            if category["type"] == "single_location"
                            else [row[1], row[2]],
                            "data": value,
                        }
                    )

    print(f"Prepared requests in {round(time.monotonic() - start_time, 2)}s")
    print(f"Starting ingestion of data for {len(datasets)} indicators...")
    async with httpx.AsyncClient() as client:
        start_time = time.monotonic()
        num = 0
        for indicator_id in datasets:
            print(
                f"    Ingesting {len(datasets[indicator_id].values())} datasets for indicator {indicator_id}",
                end="",
                flush=True,
            )

            for chunk in chunked_iterable(datasets[indicator_id].values(), size=CHUNK_SIZE):
                print(".", end="", flush=True)
                responses = await asyncio.gather(*(post_async(ds, client) for ds in chunk))
                for response in responses:
                    if response.status_code not in [201, 204]:
                        print("")
                        log(response)
                    else:
                        num += 1
            print("")

        print(
            f"Ingested {num}/{total_num} datasets ({round(num/total_num*100, 2)}%) in {round(time.monotonic() - start_time, 2)}s"
        )


async def doit():
    for file_name in files.keys():
        # if file_name != 'residents':
        #    continue
        await ingest_data(file_name)

await doit()
#loop = asyncio.get_event_loop()
#task = loop.create_task(doit())
#if not loop.is_running():
#    loop.run_until_complete(task)

Processing file residents_admin3_monthly_small.csv
Found 8 indicators for category residents
Prepared requests in 0.72s
Starting ingestion of data for 8 indicators...
    Ingesting 6 datasets for indicator residents.residents.
    Ingesting 6 datasets for indicator residents.residents_perKm2.
    Ingesting 6 datasets for indicator residents.arrived.
    Ingesting 6 datasets for indicator residents.departed.
    Ingesting 6 datasets for indicator residents.delta_arrived.
    Ingesting 6 datasets for indicator residents.residents_diffwithref.
    Ingesting 6 datasets for indicator residents.abnormality.
    Ingesting 6 datasets for indicator residents.residents_pctchangewithref.
Ingested 48/48 datasets (100.0%) in 1.71s
Processing file relocations_admin3_monthly_small.csv
Found 4 indicators for category relocations
Prepared requests in 3.52s
Starting ingestion of data for 4 indicators...
    Ingesting 6 datasets for indicator relocations.relocations.
    Ingesting 6 datasets for indicato

Done! Provided you got all `201` or `204` responses (i.e. no errors), the data should now be in the database!

## Data permissions & access

The data is now in the database, but without access management, only administrators will be able to see the data by default.
To enable access by users depending on their roles, we need to define what scopes give access to which part of the data.
We use an "allow-list" style access management so we have to define each bit of data that will be accessible to users that aren't admins.
We do that using JSON. Each key is the name of a scope as defined in Auth0 (see also the API spec) and each value is a set of queries (as per API spec) that define a set of data.

In [10]:
data_access = {
    "read:free_data": [
        {
            "category_id": "residents",
            "indicator_id": "residents.residents",
            "srid": 3,
            "trid": 2,
            "start_date": "2020-02",
            "duration": 3
        }
    ],
    "read:premium_data": [
        {
            "category_id": "residents",
            "indicator_id": "residents.residents",
            "srid": 3,
            "trid": 2,
            "start_date": "2020-02",
            "duration": 6
        },
        {
            "category_id": "residents",
            "indicator_id": "residents.residents_perKm2",
            "srid": 3,
            "trid": 2,
            "start_date": "2020-02",
            "duration": 6
        }
    ]
}

Now we need to get the metadata IDs of the specified data:

In [19]:
scope_mappings = {}
for scope in data_access:
    scope_mappings[scope] = []
    for query in data_access[scope]:
        query['mdids_only'] = True
        response = httpx.post(
            url=f"{BASE_URL}/query",
            headers={
                "Content-Type": "application/json",
                "Content-Encoding": "gzip",
                "Authorization": f"Bearer {admin_token}",
            },
            data=gzip.compress(json.dumps(query).encode("utf-8")),
        )
        scope_mappings[scope] += json.loads(response.content)["mdids"]
print(scope_mappings)

{'read:free_data': ['3', '1', '4'], 'read:premium_data': ['3', '1', '4', '2', '5', '6', '9', '10', '7', '11', '12', '8']}


Next we can ingest the scope mappings using the `/scope_mapping` endpoint:

In [25]:
for scope in scope_mappings:
    for mdid in scope_mappings[scope]:
        scope_mapping = {
            "scope": scope,
            "mdid": mdid
        }
        response = httpx.post(
            url=f"{BASE_URL}/scope_mapping",
            headers={
                "Content-Type": "application/json",
                "Content-Encoding": "gzip",
                "Authorization": f"Bearer {admin_token}",
            },
            data=gzip.compress(json.dumps(scope_mapping).encode("utf-8")),
        )
        if response.status_code != 204:
            log(response)
print("Done.")

Done.


If not status codes other than `204` come back, then the ingestion of scope mappings worked and the data is now tagged.