# Fetching data from CBS using OData v3 API into parquet format

## A fast and convenient way to fetch CBS data

- CBS API is well documented. Features that we use for `nl-open-data` are:
  - catalog function: quickly search through available datasets
  - well-typed datasets, including metadata with definitions of dimensions

- We currently use v3, v4 is in beta. Both return paginated results with 10,000 records per page. An `odata.nextLink` is provided at the end of each page (which is null if there are no more pages)
- Fetching the pages for a single dataset can take a long time if the dataset is large. Although you could hack your way around it by looking up the total number of rows in the catalog and run processes in parallel, we prefer to keep it simple and sequential.
- [cbsodata](https://github.com/J535D165/cbsodata) is an unofficial Python library for using the API. The add an extra processing step by combining the paginated results into a single parquet file, which is suitable for uploading into a blob storage service like Azure Storage, Google Cloud Storage or AWS Simple Cloud Storage (S3)

## Converting CBS OData to parquet

- json with list should be converted to newline delimited json (ndjson)
- ndjson can be read into pyarrow.Table and converted to parquet

In [1]:
from pathlib import Path
import json

import cbsodata
import pyarrow.json
import pyarrow.parquet
import pandas as pd


TEMP = "test-cbs-to-parquet"
TEMP_NDJSON = Path(TEMP + ".ndjson")
TEMP_PARQUET = Path(TEMP + ".parquet")

TESTDATA = "82070ENG"

In [2]:
# fetch data and parse into ndjson
data = cbsodata.get_data(TESTDATA)
with open(TEMP_NDJSON, "w+") as ndjson:
    for record in data:
        ndjson.write(json.dumps(record) + "\n")

table = pyarrow.json.read_json(TEMP_NDJSON)
pyarrow.parquet.write_table(table, TEMP_PARQUET)

table

pyarrow.Table
ID: int64
Gender: string
PersonalCharacteristics: string
CaribbeanNetherlands: string
Periods: string
EmployedLabourForceInternatDef_1: int64
EmployedLabourForceNationalDef_2: int64

In [3]:
df = pd.read_parquet(TEMP_PARQUET)
df.Periods

0      2012
1      2012
2      2012
3      2012
4      2012
       ... 
310    2012
311    2012
312    2012
313    2012
314    2012
Name: Periods, Length: 315, dtype: object

## TO DO

The `cbsodata` method already flattens the dimensions in the TypedDataSet (see [here in the repo](https://github.com/dataverbinders/cbsodata/blob/4922b102a72f5e1d5c67c06b73d827a338305e66/cbsodata/cbsodata3.py#L488)). In the example dataset, the period '2012 JJ00' is converted to the string "2012". 

We want to keep the original data (we will flatten the dimensions later), hence optimize and remove unneccesary intermediate steps in `cbsodata` methods:
- fetching a page returns json --> store this and immediately start downloading `nextLink`
- `value` in json contains list of dicts --> use Prefect/Dask to convert into ndjson and concatenate into parquet


Example `TypedDataSet`: https://opendata.cbs.nl/ODataApi/OData/37506wwm/TypedDataSet 

In [4]:
TEMP_NDJSON.unlink()
TEMP_PARQUET.unlink()