<a href="https://colab.research.google.com/github/fsn-capital/ilevel-to-bigquery-python-script/blob/main/ilevel_to_bigquery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# iLevel to BigQuery

## Information
This script reads all data from ilevel and pushes it to bigquery.
`fetch_method = "full"` will refresh all data, and takes about 1h to run (2022 numbers with 2.000.000 records). This is run in Google Colab or similar.
`fetch_method = "append"` only updates new record since the last run. This is run in GCP Cloud Functions, triggered by a pub/sub topic. As Cloud Functions has a limitation on max 10 minutes per script, the append works fine (less than 2 minutes)

### Checklist when deploying:
0. Run the script from Colab, jupyter notebook, or another environment that can support more than 1 hour of run time using `fetch_method = "full"`. You can now move it to a microservice with a shorter timeout.
1. Set the environmental veriables `client_id` and `client_secret`. These can be found by logging into ilevel. (https://clients.ilevelsolutions.com/)
2. Delete one of the first code cells called `Running in Colabs (to be deleted when running in Cloud functions)``
3. Delete the very last line `main()`, as this is called by Cloud Function by "entry point"
4. Change `fetch_method = "full"` to `fetch_method = "append"`, or your script will most likely timeout.
5. Change `bq_dataset_name = "fsn_insight_test"` to `bq_dataset_name = "fsn_insight_ldg"`, if you want to write to the prod tables.

### iLevel API

iLevel API documentation (which is surprisingly good) can be found here:
[https://sand-docs.ilevelsolutions.eu/](https://sand-docs.ilevelsolutions.eu/)

The script is fairly straight forward, except for two quirks:
1. The biggest table `periodicData` has a limit of only 1000 results per page, and a maximum of 100 pages (=100,000 records). To get around this (we currently have 2,000,000 records in this table), we keep adding filters to this table until we can chunk it to chunks of < 100,000 per chunk. This is described in detail further down.
2. Some endpoints support pagination, some dont. Some support sorting, some dont. Some have a date field, some have a datetime field, and some have neither. It's a bit of a mess. And the API is extremely picky on what it will accept. The block `main` handles each supported feature per endpoint.

Because of the behavior described in (2) above, we do the append function like this:
- For the `periodicData` table, we fetch only the records that was added after the latest push.
- For all the rest, we check if the current amount of items in biquery is equal to what is served by the API endpoint. If it is, we ignore and move on, if it's not, we do a full refresh.
- Oh, why don't we do a purge of the tables before we do a full refresh of mostly static data I hear you ask? I'll tell you why: BigQuery panics when you do a purge+write. And you end up with an error message that tells you the data stream is busy. This is super hard to replicate consistantly, and testing shows that sometimes you need to wait 10+ minutes after a purge before you can write to the table. So, we ignore, and just fill the tables with some duplicates. This is mostly static data, so most of the times nothing is written anyway...

### Final words

Generate `requirements.txt` by running `pip freeze` in a cell.<br>
```
google-cloud-bigquery==1.21.0
requests==2.23.0
urllib3==1.23 <= Script crashed with "SSL ERROR" with the latest release (known bug)
```
<br>

To trigger the live script, use:
```
gcloud pubsub topics publish trigger_ilevel-to-bigquery --message=None
```

Also, just to be sure, the link to the Colab page for this script is:
[https://colab.research.google.com/drive/1Uh3szpV7JtxUEsl2jY3b-YHa1ZxxU6yM?usp=sharing](https://colab.research.google.com/drive/1Uh3szpV7JtxUEsl2jY3b-YHa1ZxxU6yM?usp=sharing)

-Chris (cc@fsncapital.com)

## Imports

In [None]:
import os
import requests
import json
import sys
import timeit
import calendar
import urllib.parse
import logging
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from urllib.parse import urlparse
from urllib.parse import parse_qs
from abc import ABC, abstractmethod
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import secretmanager
from google.colab import auth

## Running in Colabs (to be deleted when running in Cloud functions)

In [None]:
# TO BE DELETED WHEN RUNNING IN THE CLOUD

# GET SECRETS
!pip install google-cloud-secret-manager

from google.cloud import secretmanager
from google.oauth2 import service_account
from google.colab import auth

credentials = service_account.Credentials.from_service_account_file(
   r'/content/drive/MyDrive/Colab Notebooks/fsn-insight-b64b318df8c7.json')

secret_client = secretmanager.SecretManagerServiceClient(credentials=credentials)
project_id = 'fsn-insight' 

secret_name = "ilevel_client_id" 
resource_name = f"projects/{project_id}/secrets/{secret_name}/versions/latest" 
response = secret_client.access_secret_version(request={"name": resource_name})
os.environ["client_id"] = response.payload.data.decode('UTF-8')

secret_name = "ilevel_client_secret" 
resource_name = f"projects/{project_id}/secrets/{secret_name}/versions/latest" 
response = secret_client.access_secret_version(request={"name": resource_name})
os.environ["client_secret"] = response.payload.data.decode('UTF-8')

# Set other variables
os.environ["runtime"] = "local" # "gcp" for gcp

# Authenticate
auth.authenticate_user()
print('Authenticated')

## Set variables and declare functions

In [None]:
# Set environmental variables
start = timeit.default_timer()
runtime = os.environ.get("runtime")
url_base = "https://api.ilevelsolutions.com/v1/"
fetched_at = datetime.utcnow().isoformat()[:-3]+'Z'
fetched_by = "ilevel-to-bigquery script on GCP Cloud Functions"
fetch_method = "full" # Extracts all data. Takes more than 1 hour to run
#fetch_method = "append" # Truncates all tables and extracts all data, except for periodicData, where it does an append. Takes < 10 minutes to run
access_token = {}
total_count = 0
bq_project_name = "fsn-insight"
#bq_dataset_name = "fsn_insight_ldg" # prod dataset
bq_dataset_name = "fsn_insight_test" # testing dataset
bq_client = bigquery.Client(bq_project_name)
read_buffer_size = 10000
ilevel_page_size = 1000 # Pagination limit by ilevel

# Get secrets
credentials = service_account.Credentials.from_service_account_file(
   r'/content/drive/MyDrive/Colab Notebooks/fsn-insight-b64b318df8c7.json')

secret_client = secretmanager.SecretManagerServiceClient(credentials=credentials)
project_id = 'fsn-insight' 

secret_name = "ilevel_client_id" 
resource_name = f"projects/{project_id}/secrets/{secret_name}/versions/latest" 
response = secret_client.access_secret_version(request={"name": resource_name})
os.environ["client_id"] = response.payload.data.decode('UTF-8')

secret_name = "ilevel_client_secret" 
resource_name = f"projects/{project_id}/secrets/{secret_name}/versions/latest" 
response = secret_client.access_secret_version(request={"name": resource_name})
os.environ["client_secret"] = response.payload.data.decode('UTF-8')


# Helpers for each API endpoint for fetching data
read_buffer = []

# Helpers for filtering periodicData
list_assets = []
list_scenarios = []
list_dataItems = []

## Shared functions

### get_access_token()

In [None]:
# Get access token
def get_access_token():
    global access_token
    if("expires_at" in access_token and access_token["expires_at"] > datetime.now() + timedelta(seconds = 30)): # adding some seconds in margin
        return access_token
    try:
        r = requests.post(
            url_base + "token",
            headers={"Accept": "application/json",
                "Content-Type":"application/x-www-form-urlencoded"},
            data={"grant_type": "client_credentials"},
            auth=(client_id, client_secret)
        )
        access_token = r.json()["access_token"]
        expires_in = r.json()["expires_in"]
        expires_at = datetime.now() + timedelta(seconds = expires_in)
        access_token = {"access_token":access_token, "expires_in":expires_in, "expires_at":expires_at}
        return access_token
    except Exception as e:
        raise Exception(f"Error while refreshing access token: {e}") from e  

### get_next_page()

In [None]:
# Check if more pages and return page number if there is a next page
def get_next_page(r):
    try:
        if r.json()["links"].get("next"):
            url_next = r.json()["links"]["next"]
            next_page_number = int(parse_qs(urlparse(url_next).query)['page[number]'][0])
            return {"page[number]" : str(next_page_number)}
    except:
        return None
    return None

### timer()

In [None]:
# Get a string with the current running time
def timer():
    stop = timeit.default_timer()
    total_time = stop - start

    # output running time in a nice format.
    mins, secs = divmod(total_time, 60)
    hours, mins = divmod(mins, 60)
    
    hours, rem = divmod(stop-start, 3600)
    minutes, seconds = divmod(rem, 60)

    return "{:0>2}:{:0>2}:{:0>2}".format(int(hours),int(minutes),int(seconds))

### log()

In [None]:
# Log function. To be updated with Stackdriver
def log(msg):
    print(timer() + " " + str(msg)) 

### load_data_to_bigquery()

In [None]:
# Post data to BigQuery
def load_data_to_bigquery(table, records, truncate=False):
    bq_table_full_path = f"""{bq_project_name}.{bq_dataset_name}.{table}"""
    rows_to_insert = records
    global total_count

    try:
        errors = bq_client.insert_rows_json(bq_table_full_path, rows_to_insert)
        total_count += len(rows_to_insert)
    except Exception as e:
        raise Exception(f"Failed to post to BQ: {e}") from e
    
    if errors == []:
        log("Loaded " + str(len(rows_to_insert)) + " rows into " + bq_table_full_path + " (" + f'{total_count:,}' + " in total)" )
    else:
        log("Encountered errors while inserting rows to BigQuery: {}".format(errors))

### bq_get_number_of_latest_records()

In [None]:
def bq_get_number_of_latest_records(table):
    bq_table_full_path = f"""{bq_project_name}.{bq_dataset_name}.{table}"""
    latest_fetch_in_bq = ""

    try:
        # Find the latest load date time from BigQuery
        latest_fetch_in_bq = get_last_upload(table)

        # Count how many records where loaded in the last run
        query = "SELECT COUNT(fetched_at) as number_of_records FROM `" + str(bq_table_full_path) + "` WHERE fetched_at = \"" + str(latest_fetch_in_bq) + "\""
        result = bq_client.query(query)
        rows = result.result()
        for row in rows:
            return row.number_of_records
    except Exception as e:
        raise Exception(f"Failed to query BQ: {e}") from e

### get_last_upload

In [None]:
def get_last_upload(table):
    client = bigquery.Client(project=bq_project_name)
    query = "SELECT fetched_at FROM " + bq_project_name + "." + bq_dataset_name + "." + table + " ORDER BY fetched_at DESC LIMIT 1"
    try:
        result = client.query(query)  # Make an API request.
        for row in result:
            return row[0]
    except:
        return None

## Head

In [None]:
def head():
    log("Starting extract/load iLevel/bigquery")
    log("Time (UTC): " + fetched_at)
    log("Fetch method: " + fetch_method)

## periodicData filtering

### Explanation of filtering


https://sand-docs.ilevelsolutions.eu/<br/>
Get list of all filter items to be used for periodicdata. As iLevel has a limit of 100 pages with 1000 records each, we need to divide the query. We built three lists: assets, scenarios, and data items. We then build one master list of filters covering the dataset. If we could sort and filter by ID (we cannot) we could just read as far as we came, filter, and repeat. But since each query can only contain max 100,000 records, we need to get below this number at the filter level on the query.

It is worth noting that this limiation only applies to the periodicData endpoint.

Tested on a dataset of ~2.000.000 records, we found ~50 assets ~10 scenarios and ~2.000 dataItems: 
- Downloaded entire dataset using ~2.200 querries and ~1 hour 
- The three filters plus splitting on years or months, we are comfortably within iLevel's limitations.


### The way this script works: 
It starts extracting all data for the first investment. 
 1. If it returns less than 100,000 records, it will go to the next investment in the list_assets list. 
 2. If it returns more than 100,000 records, it keep adding more filters until it hits less than 100,000.
<br>The order of filters are `[investment]` -> `[scenarios]` -> `[periodEnd (Year then month)]` -> `[dataItem]`
<br>Which can look like this `[Acme Inc]` -> `[Budget]` -> `[ge(2020-01-01),le(2020-12-31)]` -> `[Revenue]`

#### Example of list of filters
For three companies named "**Small**", "**Medium**", and "**Large**", the `filter_params` list could look something like this:


*Entire asset "Small" fits in 100k records. Sweet!*

```
filter_params[0]  {"page[size]":"1000", "filter[investment]":"Small"}`
```

*Asset "Medium" needs to be split into scenarios to fit in 100k records*

```
filter_params[1]  {"page[size]":"1000", "filter[investment]":"Medium", "filter[scenario]":"Actual"}
filter_params[2]  {"page[size]":"1000", "filter[investment]":"Medium", "filter[scenario]":"Budget"}
filter_params[3]  {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Actiual"}
```

*`Actual` is less than 100k, but `Budget` is not. Budget needs to be split into years*

```
filter_params[4]  {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2020-01-01),le(2020-12-31)]"}
```

*`Budget` for `2020` is less than 100k records, but `2021` has more, which is therefore split by month*

```
filter_params[5]  {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-01-01),le(2021-01-31)]"}
```

*Budget for `Jan 2021` is less than 100k records, and so are each month...*<br> 
```
filter_params[6]  {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-02-01),le(2021-02-31)]"}filter_params[7]  {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-03-01),le(2021-03-31)]"}
filter_params[8]  {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-04-01),le(2021-04-31)]"}
filter_params[9]  {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-05-01),le(2021-05-31)]"}
filter_params[10] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-06-01),le(2021-06-31)]"} 
```
*...except for `July`, which is broken down into `dataItems` to stay below 100k* <br> 

```
filter_params[11] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-07-01),le(2021-07-31)]", "filter[dataItem]":"Revenue" }
filter_params[12] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-07-01),le(2021-07-31)]", "filter[dataItem]":"EBITDA" }
filter_params[13] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-08-01),le(2021-08-31)]"}
filter_params[14] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-09-01),le(2021-09-31)]"}
filter_params[15] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-10-01),le(2021-10-31)]"}
filter_params[16] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-11-01),le(2021-11-31)]"}
filter_params[17] {"page[size]":"1000", "filter[investment]":"Large", "filter[scenario]":"Budget", "filter[periodEnd]":"[ge(2021-12-01),le(2021-12-31)]"}
```

By breaking it down like above, we ensure that we minimize the amount of querries needed.

### Build list of assets, scenarios, and dataItems

In [None]:
# Build list of available Assets, Scenarios, and DataItems from iLevel
def build_lists_of_assets_scenarios_dataitems():

    headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
    params = {}

    log("Starting to extract PeriodicData. As this API endpoint has a limit of 100 pages from the iLevel API, we need to build lists of filters, using Assets as the root filter.")

    # Build list of all Assets
    log("Building list of all assets")
    url_self = url_base + "entities/assets"
    params = {"page[size]":"1000", "page[number]":"1", "fields[assets]":"\"name\"", "sort":"name"}
    try:
        while True:
            # Loop all pages - if any
            r = requests.get(url=url_self, headers=headers, params=urllib.parse.urlencode(params, safe='[]'))
            for x in r.json()["data"]:
                list_assets.append(x["attributes"]["name"])
            next_page = get_next_page(r)
            if(not next_page): break
            params.update(get_next_page(r))
            
    except Exception as e:
        raise Exception(f"Error while getting list of asset names: {e}") from e  


    # Build list of all Scenarios
    log("Building list of all scenarios")
    headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}

    try:
        url_self = url_base + "scenarios"  
        r = requests.get(url=url_self, headers=headers)
        for x in r.json()["data"]:
            list_scenarios.append(x["attributes"]["name"])
    except Exception as e:
        raise Exception(f"Error while getting list of scenario names: {e}") from e
        

    # Build list of dataItems
    log("Building list of all dataItems")
    headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
    url_self = url_base + "dataItems"
    params = {"page[size]":"1000", "page[number]":"1", "fields[dataItems]":"\"name\"", "sort":"name"}

    try:
        while True:
            # Loop all pages - if any
            r = requests.get(url=url_self, headers=headers, params=urllib.parse.urlencode(params, safe='[]'))
            for x in r.json()["data"]:
                list_dataItems.append(x["attributes"]["name"])
            next_page = get_next_page(r)
            if(not next_page): break
            params.update(get_next_page(r))
                
    except Exception as e:
        raise Exception(f"Error while getting list of dataItems names: {e}") from e

### Build master list of parameters for extracting data from periodicData

In [None]:
def get_filter_list(list_assets, list_scenarios, list_dataItems):
    # Start building master list to filter_params

    filter_params = []
    subtotal_count = 0
    params = {}

    start_time = get_last_upload("ilevel_periodicData_ldg")

    if(fetch_method=="append"):
        log("Fetching data modified after: " + start_time)
    log("Building master list of all filters")
    
    try:
        headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
        url_self = url_base + "periodicData"
        params = {"page[size]":"1", "page[number]":"1", "sort":"lastModifiedDate"}
        r = requests.get(url=url_self, headers=headers, params=params)
        count = r.json()["meta"]["count"]

        if(count <= 100000):
            params.update({"page[size]":"1000", "page[number]":"1"})
            verbose = "(" + f'{count:,}' + ")"
            filter_params.append({"params":params,"count":count,"url":r.url,"verbose":verbose})
            log("Added (" + f'{count:,}' + ") to list of filters")
            subtotal_count += count

        elif(count > 100000): 
            log("Periodic data has more than 100,000 items. (" + f'{count:,}' + " found) Splitting query into assets.")

            for a in list_assets:
                # Reset params for each loop
                params = {"page[size]":"1", "page[number]":"1", "sort":"lastModifiedDate", "filter[investment]":"\"" + a + "\""}
                if(start_time and fetch_method == "append"):
                    params.update({"filter[lastModifiedDate]":"ge(" + start_time + ")"})

                # Check to see if more than 100,000 records with filter "investment"
                headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
                r = requests.get(url=url_self, headers=headers, params=params)
                count = r.json()["meta"]["count"]
                
                if(count == 0):
                    continue
                
                # If less than 100,000 records, append the filter to the list
                if(count <= 100000):
                    params.update({"page[size]":"1000", "page[number]":"1"})
                    verbose = "[" + a + "] (" + f'{count:,}' + ")"
                    filter_params.append({"params":params,"count":count,"url":r.url,"verbose":verbose})
                    log("Added [" + a + "] (" + f'{count:,}' + ") to list of filters")
                    subtotal_count += count
                
                # If more than 100,000 records, add another filter "scenario"
                else:
                    log("Periodic data for [" + a + "] has more than 100,000 items. (" + f'{count:,}' + " found) Splitting query into scenarios.")
                    for b in list_scenarios:
                        # Reset params for each loop
                        params = {"page[size]":"1", "page[number]":"1", "sort":"lastModifiedDate,scenario", "filter[investment]":"\"" + a + "\""}
                        if(start_time and fetch_method == "append"):
                            params.update({"filter[lastModifiedDate]":"ge(" + start_time + ")"})

                        # Add filter "investment" and "scenario" and check to see if we get more than 100,000 records with this filter 
                        headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
                        params.update({"page[size]":"1", "filter[scenario]":"\"" + b + "\""})
                        r = requests.get(url=url_self, headers=headers, params=params)
                        count = r.json()["meta"]["count"]

                        if(count == 0):
                            continue
                            
                        # If less than 100,000 records, append the filter to the list
                        if(count <= 100000):
                            params.update({"page[size]":"1000", "page[number]":"1"})
                            verbose = "[" + a + "] [" + b + "] (" + f'{count:,}' + ")"
                            filter_params.append({"params":params,"count":count,"url":r.url,"verbose":verbose})
                            log("Added [" + a + "] [" + b + "] (" + f'{count:,}' + ") to list of filters")
                            subtotal_count += count

                        # If more than 100,000 records, add another filter "year" (going one year at a time)
                        else:
                            # Find first date with data
                            headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
                            params.update({"page[size]":"1", "sort":"periodEnd"})
                            r = requests.get(url=url_self, headers=headers, params=params)
                            first_date = r.json()["data"][0]["attributes"]["periodEnd"]

                            # Find last date with data
                            params.update({"sort":"-periodEnd"})
                            r = requests.get(url=url_self, headers=headers, params=params)
                            last_date = r.json()["data"][0]["attributes"]["periodEnd"]

                            first_year = int(first_date[0:4])
                            last_year = int(last_date[0:4])

                            log("Periodic data for [" + a + "] [" + b + "] has more than 100,000 items. (" + f'{count:,}' + " found) Splitting query into years. (" + str(first_year) + "-" + str(last_year) + ")")

                            for y in range(first_year, last_year+1):
                                # Reset params for each loop
                                headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
                                params = {"page[size]":"1", "page[number]":"1", "filter[investment]":"\"" + a + "\"", "filter[scenario]":"\"" + b + "\""}
                                
                                if(start_time and fetch_method == "append"):
                                    params.update({"filter[lastModifiedDate]":"ge(" + start_time + ")"})
                                
                                params.update({"filter[periodEnd]":["ge(" + str(y) + "-01-01)", "le(" + str(y) + "-12-31)"]})
                                
                                # Count records in the year
                                r = requests.get(url=url_self, headers=headers, params=params)
                                count = r.json()["meta"]["count"]

                                if(count == 0):
                                    continue
                                
                                # If year has less than 100,000 records, append the filter to the list
                                if(count <= 100000):
                                    params.update({"page[size]":"1000"})
                                    verbose = "[" + a + "] [" + b + "] [" + str(y) + "] (" + f'{count:,}' + ")"
                                    filter_params.append({"params":params,"count":count,"url":r.url,"verbose":verbose})
                                    log("Added [" + a + "] [" + b + "] [" + str(y) + "] (" + f'{count:,}' + ") to list of filters")
                                    subtotal_count += count

                                # If more than 100,000 records, add another filter "month" (going one month at a time)
                                else:
                                    log("Periodic data for [" + a + "] [" + b + "] [" + str(y) + "] has more than 100,000 items. (" + f'{count:,}' + " found) Splitting query into months.")
                                    for m in range(1,13):
                                        # Reset params for each loop
                                        headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
                                        params = {"page[size]":"1", "page[number]":"1", "filter[investment]":"\"" + a + "\"", "filter[scenario]":"\"" + b + "\""}
                                        if(start_time and fetch_method == "append"):
                                            params.update({"filter[lastModifiedDate]":"ge(" + start_time + ")"})

                                        # Update with filter per month and count records for each month
                                        params.update({"filter[periodEnd]":["ge(" + str(y) + "-" + str(m).zfill(2) + "-01)", 
                                            "le(" + str(y) + "-" + str(m).zfill(2) + "-" + str(calendar.monthrange(y, m)[1]) + ")"]})
                                        r = requests.get(url=url_self, headers=headers, params=params)
                                        count = r.json()["meta"]["count"]

                                        if(count == 0):
                                            continue
                                        
                                        # If less than 100,000 records, append the filter to the list
                                        if(count <= 100000):
                                            params.update({"page[size]":"1000"})
                                            verbose = "[" + a + "] [" + b + "] [" + str(y) + "] [" + str(m).zfill(2) + "] (" + f'{count:,}' + ")"
                                            filter_params.append({"params":params,"count":count,"url":r.url,"verbose":verbose})
                                            log("Added [" + a + "] [" + b + "] [" + str(y) + "] [" + str(m).zfill(2) + "] (" + f'{count:,}' + ") to list of filters")
                                            subtotal_count += count

                                        # If more than 100,000 records, ad the dataItems filter
                                        else:
                                            log("Periodic data for [" + a + "] [" + b + "] [" + str(y) + "] [" + str(m) + "] has more than 100,000 items. (" + f'{count:,}' + " found) Splitting query into data items.")
                                            for c in list_dataItems:
                                                # Reset params for each loop
                                                params = {"page[size]":"1", "page[number]":"1", "filter[investment]":"\"" + a + "\"", "filter[scenario]":"\"" + b + "\""}
                                                params.update({"page)[size]":"1","filter[periodEnd]":["ge(" + str(y) + "-" + str(m).zfill(2) + "-01)", 
                                                                "le(" + str(y) + "-" + str(m).zfill(2) + "-" + str(calendar.monthrange(y, m)[1]) + ")"]})
                                                if(start_time and fetch_method == "append"):
                                                    params.update({"filter[lastModifiedDate]":"ge(" + start_time + ")"})

                                                # Update with filter per data item
                                                params.update({"filter[dataItem]":"\"" + c + "\""})
                                                verbose = "[" + a + "] [" + b + "] [" + str(y) + "] [" + str(m).zfill(2) + "] [" + c + "] (" + f'{count:,}' + ")"
                                                filter_params.append({"params":params,"count":None,"url":None,"verbose":verbose})
                                                log("Added [" + a + "] [" + b + "] [" + str(y) + "] [" + str(m).zfill(2) + "] [" + c + "] (" + f'{count:,}' + ") to list of filters")
                                                subtotal_count += count

                                                # If you have more than 100,000 elements, when you filter on "investment", "scenario", "year", 
                                                # "month", and "dataItem", I can't do anything more for you. You need to fix your broken iLevel implementation.
                                                # As data items can be a very long list, the script will not make a request and count each one, instead it
                                                # will run it blind, assuming none fo the data items returns more than 100,000 records.
                                                # If it does return more than 100,000 records, the script will just cut off, and continue like nothing
                                                # happened, but you will be missing the data in your target.
    except Exception as e:
        raise Exception(f"Last request: " + str(r) + " " + r.url + " " + str(r.json()) + "{e}") from e
    
    log("Finished splitting data in chunks of < 100,000. Total items found: " + f'{subtotal_count:,}')
    
    return filter_params

## Define superclass for endpoints

In [None]:
class ilevel_to_bigquery(ABC):
    def __init__(self, endpoint, sort = None, append_supported = False, pagination_supported = True, filters=None,deleted_data=False):
        self.endpoint = endpoint
        self.bigquery_table = "ilevel_" + endpoint.rsplit('/', 1)[-1] + "_ldg"
        self.sort = sort
        self.append_supported = append_supported
        self.pagination_supported = pagination_supported
        self.read_buffer = []
        self.subtotal_count = 0
        self.url_self = url_base + self.endpoint
        self.deleted_data = deleted_data
        self.start_time = get_last_upload(self.bigquery_table)
        self.params = {}
        self.headers = {}
        self.fetched_by = fetched_by
        self.fetched_at = fetched_at
        self.fetch_method = fetch_method
        self.truncate = False
        
        # If list of filters exist (used for extracting periodicData)
        if(filters):
            i = 1
            for f in filters:
                v = str(f["verbose"]) + " - Progress: " + str(i) + "/" + str(len(filters))
                self.extract_data_from_ilevel_to_bigquery(params=f["params"],verbose=v)
                i += 1
        else:
            self.extract_data_from_ilevel_to_bigquery()
    
    def extract_data_from_ilevel_to_bigquery(self, params=None, verbose=None):
        # Extract data from ilevel, and loads it into bigquery

        # Skip if appending data, and the table is not part of the append
        # (For not duplicating massive amounts of data that rarely changes)
        
        s = "Extracting data from " + self.endpoint
        if(verbose):
            s += " " + str(verbose)
        log(s)

        # Add fetch method to query
        if(not self.append_supported):
            self.fetch_method = "full"
            self.truncate = True
            
        
        # Build params
        self.params = {}

        if(params):
            self.params.update(params)
        if(self.pagination_supported):
            self.params.update({"page[size]":ilevel_page_size,"page[number]":"1"})
        if(self.sort):
            self.params.update({"sort":self.sort})
        if(self.start_time and self.append_supported and self.fetch_method=="append"):
            if(self.deleted_data):
                self.params.update({"filter[deletionDate]":"ge(" + str(self.start_time) + ")"})
            else:
                self.params.update({"filter[lastModifiedDate]":"ge(" + str(self.start_time) + ")"})
        
        # Check to see if an update is needed (but disregard periodData)
        if(self.fetch_method == "full" and not self.append_supported):
            if(self.pagination_supported):
                self.params.update({"page[size]":"1", "page[number]":"1"})
            self.headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
            try:
                if(self.pagination_supported):
                    r = requests.get(url=self.url_self, headers=self.headers, params=self.params)   
                    ilevel_size = r.json()["meta"]["count"]
                else:
                    r = requests.get(url=self.url_self, headers=self.headers, params=None)
                    ilevel_size = len(r.json()["data"])
                bq_size = bq_get_number_of_latest_records(self.bigquery_table)
            except Exception as e:
                
                raise Exception(r.url + f" Error while comparing bigquery and ilevel sizes: {e}") from e

            # If update is not needed, return
            if(ilevel_size == bq_size):
                log("No new data found in " + self.bigquery_table + " (" + str(ilevel_size) + " records), not loading to BigQuery. ")
                return
            # Else reset the params
            if(self.pagination_supported):
                self.params.update({"page[size]":ilevel_page_size})
        
        # Loop through each page of result
        while True:
            try:
                self.headers = {"Authorization": "Bearer " + get_access_token()["access_token"]}
                r = requests.get(url=self.url_self, headers=self.headers, params=self.params)   
                # If no results, end
                if(not r.json().get("data")):
                    break

                for record in r.json()["data"]:
                    self.add_record_to_read_buffer(record)
                    
                self.subtotal_count = len(r.json()["data"])
                
                s = "Extracted " + str(len(r.json()["data"])) + " rows from iLevel " + self.endpoint
                if(verbose):
                    s += " " + verbose
                if(self.pagination_supported):
                    s += " " + parse_qs(urlparse(r.json()["links"]["self"]).query)['page[number]'][0] + "/" + parse_qs(urlparse(r.json()["links"]["last"]).query)['page[number]'][0]
                log(s)

                # Check for more pages under this filter
                next_page = get_next_page(r)
                
                # If read buffer is full: load to bigquery
                if(len(self.read_buffer) >= read_buffer_size):
                    self.push_to_bigquery()
                    self.read_buffer.clear()
                
                # End loop if no more pages
                if(not next_page or not self.pagination_supported):
                    # Push remaining items to bigquery before ending
                    if(len(self.read_buffer) > 0):
                        self.push_to_bigquery()
                        self.read_buffer.clear()
                    break
                
                self.params.update(next_page)

            except Exception as e:
                raise Exception(f"Last request: " + str(r) + " " + r.url + " " + str(r.json()) + "{e}") from e

        log("Successfully extracted and loaded " + f'{self.subtotal_count:,}' + " records from " + self.endpoint + " (" + self.fetch_method + " records) to BigQuery")  
    
    def add_record_to_fetched_data(self, record):
        # Need to be defined in abstract children functions
        None

    def push_to_bigquery(self,truncate=False):
        load_data_to_bigquery(table=self.bigquery_table, records=self.read_buffer,truncate=self.truncate)


## Define endpoints (Abstract children)

### periodicData

In [None]:
class periodicData(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                    "fetched_at":       str(self.fetched_at),
                    "fetched_by":       str(self.fetched_by),
                    "fetched_method":   str(self.fetch_method),
                    
                    "type":             str(record.get("type")),
                    "id":               str(record.get("id")),
                    
                    "value":            str(record["attributes"].get("value")),
                    "dataItem":         str(record["attributes"].get("dataItem")),
                    "dataValueType":    str(record["attributes"].get("dataValueType")),
                    "owner":            str(record["attributes"].get("owner")),
                    "investment":       str(record["attributes"].get("investment")),
                    "relationshipPath": str(record["attributes"].get("relationshipPath")),
                    "scenario":         str(record["attributes"].get("scenario")),
                    "currency":         str(record["attributes"].get("currency")),
                    "security":         str(record["attributes"].get("security")),
                    "segment":          str(record["attributes"].get("segment")),
                    "periodEnd":        str(record["attributes"].get("periodEnd")),
                    "periodLength":     str(record["attributes"].get("periodLength")),
                    "asOfDate":         str(record["attributes"].get("asOfDate")),
                    "lastModifiedDate":  str(record["attributes"].get("lastModifiedDate"))
                })

### assets

In [None]:
class assets(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":                 str(self.fetched_at),
                "fetched_by":                 str(self.fetched_by),
                "fetched_method":             str(self.fetch_method),
                
                "type":                       str(record["type"]),
                "id":                         str(record["id"]),
                
                "name":                       str(record["attributes"].get("name")),
                "excelTicker":                str(record["attributes"].get("excelTicker")),
                "status":                     str(record["attributes"].get("status")),
                "assetType":                  str(record["attributes"].get("assetType")),
                "reportingCurrency":          str(record["attributes"].get("reportingCurrency")),
                "geography":                  str(record["attributes"].get("geography")),
                "industry":                   str(record["attributes"].get("industry")),
                "calendarType":               str(record["attributes"].get("calendarType")),
                "longDescription":            str(record["attributes"].get("longDescription")),
                "shortDescription":           str(record["attributes"].get("shortDescription")),
                "acquisitionDate":            str(record["attributes"].get("acquisitionDate")),
                "acquisitionAsOf":            str(record["attributes"].get("acquisitionAsOf")),
                "exitDate":                   str(record["attributes"].get("exitDate")),
                "exitAsOf":                   str(record["attributes"].get("exitAsOf")),
                "leadInvestmentProfessional": str(record["attributes"].get("leadInvestmentProfessional")),
                "initialPeriod":              str(record["attributes"].get("initialPeriod")),
                "isPublic":                   str(record["attributes"].get("isPublic")),
                "website":                    str(record["attributes"].get("website")),
                "lastModifiedDate":            str(record["attributes"].get("lastModifiedDate"))
                })

### cashTransactions

In [None]:
class cashTransactions(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":          str(self.fetched_at),
                "fetched_by":          str(self.fetched_by),
                "fetched_method":      str(self.fetch_method),

                "type":                str(record["type"]),
                "id":                  str(record["id"]),

                "baseAmount":          str(record["attributes"].get("baseAmount")),
                "baseCurrency":        str(record["attributes"].get("baseCurrency")),
                "owner":               str(record["attributes"].get("owner")),
                "investment":          str(record["attributes"].get("investment")),
                "security":            str(record["attributes"].get("security")),
                "scenario":            str(record["attributes"].get("scenario")),
                "transactionType":     str(record["attributes"].get("transactionType")),
                "transactionCategory": str(record["attributes"].get("transactionCategory")),
                "transactionDate":     str(record["attributes"].get("transactionDate")),
                "asOfDate":            str(record["attributes"].get("asOfDate")),
                "localAmount":         str(record["attributes"].get("localAmount")),
                "localCurrency":       str(record["attributes"].get("localCurrency")),
                "groupId":             str(record["attributes"].get("groupId")),
                "originalId":          str(record["attributes"].get("originalId")),
                "description":         str(record["attributes"].get("description")),
                "lastModifiedBy":       str(record["attributes"].get("lastModifiedBy")),
                "lastModifiedDate":     str(record["attributes"].get("lastModifiedDate"))
                })

### cashTransactionTypes

In [None]:
class cashTransactionTypes(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":          str(self.fetched_at),
                "fetched_by":          str(self.fetched_by),
                "fetched_method":      str(self.fetch_method),

                "type":                str(record["type"]),
                "id":                  str(record["id"]),

                "name":                str(record["attributes"].get("name")),
                "category":            str(record["attributes"].get("category")),
                "canBeRenamed":        str(record["attributes"].get("canBeRenamed")),
                "canBeDeleted":        str(record["attributes"].get("canBeDeleted")),
                "dataItemName":        str(record["attributes"].get("dataItemName")),
                "isExposed":           str(record["attributes"].get("isExposed")),
                "isPositiveOperator":  str(record["attributes"].get("isPositiveOperator")),
                "relationships":       str(record.get("relationships")).replace("None","\'None\'")
                })

### currencies

In [None]:
class currencies(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":     str(self.fetched_at),
                "fetched_by":     str(self.fetched_by),
                "fetched_method": str(self.fetch_method),

                "type":           str(record["type"]),
                "id":             str(record["id"]),

                "name":           str(record["attributes"].get("name")),
                "symbol":         str(record["attributes"].get("symbol"))
                })

### dataItemCategories

In [None]:
class dataItemCategories(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":     str(self.fetched_at),
                "fetched_by":     str(self.fetched_by),
                "fetched_method": str(self.fetch_method),

                "type":           str(record["type"]),
                "id":             str(record["id"]),

                "name":           str(record["attributes"].get("name")),
                "sequenceNumber": str(record["attributes"].get("sequenceNumber")),
                "relationships":  str(record.get("relationships")).replace("None","\'None\'")
                })

### deals

In [None]:
class deals(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":       str(self.fetched_at),
                "fetched_by":       str(self.fetched_by),
                "fetched_method":   str(self.fetch_method),

                "type":             str(record["type"]),
                "id":               str(record["id"]),

                "dealType":         str(record["attributes"].get("dealType")),
                "acquisitionDate":  str(record["attributes"].get("acquisitionDate")),
                "investmentName":   str(record["attributes"].get("investmentName")),
                "investmentAmount": str(record["attributes"].get("investmentAmount")),
                "currency":         str(record["attributes"].get("currency")),
                "exitDate":         str(record["attributes"].get("exitDate")),
                "status":           str(record["attributes"].get("status")),
                "ownerName":        str(record["attributes"].get("ownerName")),
                "ownership":        str(record["attributes"].get("ownership")),
                "securityName":     str(record["attributes"].get("securityName")),
                "lastModifiedDate":  str(record["attributes"].get("lastModifiedDate")),
                "relationships":    str(record.get("relationships")).replace("None","\'None\'")
                })

### documents

In [None]:
class documents(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":             str(self.fetched_at),
                "fetched_by":             str(self.fetched_by),
                "fetched_method":         str(self.fetch_method),

                "type":                   str(record["type"]),
                "id":                     str(record["id"]),

                "name":                   str(record["attributes"].get("name")),
                "extension":              str(record["attributes"].get("extension")),
                "size":                   str(record["attributes"].get("size")),
                "tags":                   str(record["attributes"].get("tags")),
                "periodEnd":              str(record["attributes"].get("periodEnd")),
                "periodLength":           str(record["attributes"].get("periodLength")),
                "eventDate":              str(record["attributes"].get("eventDate")),
                "location":               str(record["attributes"].get("location")),
                "uploadedBy":             str(record["attributes"].get("uploadedBy")),
                "uploadDate":             str(record["attributes"].get("uploadDate")),
                "availableForPublishing": str(record["attributes"].get("availableForPublishing")),
                "isAvailableToAllUsers":  str(record["attributes"].get("isAvailableToAllUsers")),
                "lastModifiedBy":          str(record["attributes"].get("lastModifiedBy")),
                "lastModifiedDate":        str(record["attributes"].get("lastModifiedDate")),
                "relationships":          str(record.get("relationships")).replace("None","\'None\'")
                })

### entityGroupCategories

In [None]:
class entityGroupCategories(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":      str(self.fetched_at),
                "fetched_by":      str(self.fetched_by),
                "fetched_method":  str(self.fetch_method),

                "type":            str(record["type"]),
                "id":              str(record["id"]),

                "name":            str(record["attributes"].get("name")),
                "entityType":      str(record["attributes"].get("entityType")),
                "groupDefinition":  str(record["attributes"].get("groupDefinition")),
                "groupDefinedBy":   str(record["attributes"].get("groupDefinedBy")),
                "applicableTo":    str(record["attributes"].get("applicableTo")),
                "relationships":   str(record.get("relationships")).replace("None","\'None\'")
                })

### entityGroups

In [None]:
class entityGroups(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":         str(self.fetched_at),
                "fetched_by":         str(self.fetched_by),
                "fetched_method":     str(self.fetch_method),

                "type":               str(record["type"]),
                "id":                 str(record["id"]),

                "name":               str(record["attributes"].get("name")),
                "entityCategoryType": str(record["attributes"].get("entityCategoryType")),
                "entityIds":          str(record["attributes"].get("entityIds")),
                "applicableTo":       str(record["attributes"].get("applicableTo")),
                "entitiesCount":      str(record["attributes"].get("entitiesCount")),
                "relationships":      str(record.get("relationships")).replace("None","\'None\'")
                })

### folders

In [None]:
class folders(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":       str(self.fetched_at),
                "fetched_by":       str(self.fetched_by),
                "fetched_method":   str(self.fetch_method),

                "type":             str(record["type"]),
                "id":               str(record["id"]),

                "name":             str(record["attributes"].get("name")),
                "createdDate":      str(record["attributes"].get("createdDate")),
                "lastModifiedDate":  str(record["attributes"].get("lastModifiedDate")),
                "relationships":    str(record.get("relationships")).replace("None","\'None\'")
                })

### funds

In [None]:
class funds(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":                 str(self.fetched_at),
                "fetched_by":                 str(self.fetched_by),
                "fetched_method":             str(self.fetch_method),

                "type":                       str(record["type"]),
                "id":                         str(record["id"]),

                "name":                       str(record["attributes"].get("name")),
                "longName":                   str(record["attributes"].get("longName")),
                "status":                     str(record["attributes"].get("status")),
                "ownerType":                  str(record["attributes"].get("ownerType")),
                "reportingCurrency":          str(record["attributes"].get("reportingCurrency")),
                "vintage":                    str(record["attributes"].get("vintage")),
                "totalCommittedCapital":      str(record["attributes"].get("totalCommittedCapital")),
                "initialCloseDate":           str(record["attributes"].get("initialCloseDate")),
                "finalCloseDate":              str(record["attributes"].get("finalCloseDate")),
                "includeInWorkflow":           str(record["attributes"].get("includeInWorkflow")),
                "description":                str(record["attributes"].get("description")),
                "calendarType":               str(record["attributes"].get("calendarType")),
                "initialPeriod":              str(record["attributes"].get("initialPeriod")),
                "defaultPmeIndex":            str(record["attributes"].get("defaultPmeIndex")),
                "commitment":                 str(record["attributes"].get("commitment")),
                "defaultPmeLiquidityPremium": str(record["attributes"].get("defaultPmeLiquidityPremium")),
                "generalPartner":             str(record["attributes"].get("generalPartner")),
                "subStrategy":                str(record["attributes"].get("subStrategy")),
                "strategy":                   str(record["attributes"].get("strategy")),
                "fundSize":                   str(record["attributes"].get("fundSize")),
                "benchmarkVintage":           str(record["attributes"].get("benchmarkVintage")),
                "lastModifiedDate":            str(record["attributes"].get("lastModifiedDate")),
                "relationships":              str(record.get("relationships")).replace("None","\'None\'")
                })

### scenarios

In [None]:
class scenarios(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":       str(self.fetched_at),
                "fetched_by":       str(self.fetched_by),
                "fetched_method":   str(self.fetch_method),

                "type":             str(record["type"]),
                "id":               str(record["id"]),

                "name":             str(record["attributes"].get("name")),
                "shortName":        str(record["attributes"].get("shortName")),
                "lastModifiedDate": str(record["attributes"].get("lastModifiedDate")),
                })

### securities

In [None]:
class securities(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":       str(self.fetched_at),
                "fetched_by":       str(self.fetched_by),
                "fetched_method":   str(self.fetch_method),

                "type":             str(record["type"]),
                "id":               str(record["id"]),

                "name":             str(record["attributes"].get("name")),
                "shortName":        str(record["attributes"].get("shortName")),
                "securityType":     str(record["attributes"].get("type")),
                "securitySubType":  str(record["attributes"].get("subtype")),
                "hasOwnership":     str(record["attributes"].get("hasOwnership")),
                "isActive":         str(record["attributes"].get("isActive")),
                "lastModifiedDate":  str(record["attributes"].get("lastModifiedDate")),
                "relationships":    str(record.get("relationships")).replace("None","\'None\'")
                })

### segments

In [None]:
class segments(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":       str(self.fetched_at),
                "fetched_by":       str(self.fetched_by),
                "fetched_method":   str(self.fetch_method),

                "type":             str(record["type"]),
                "id":               str(record["id"]),

                "name":             str(record["attributes"].get("name")),
                "isActive":         str(record["attributes"].get("isActive")),
                "isAllowRollup":    str(record["attributes"].get("isAllowRollup")),
                "nullReplacement":  str(record["attributes"].get("nullReplacement")),
                "isGlobal":         str(record["attributes"].get("isGlobal")),
                "lastModifiedDate":  str(record["attributes"].get("lastModifiedDate")),
                "relationships":    str(record.get("relationships")).replace("None","\'None\'")
                })

### deletedCashTransactions

In [None]:
class deletedCashTransactions(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":          str(self.fetched_at),
                "fetched_by":          str(self.fetched_by),
                "fetched_method":      str(self.fetch_method),

                "type":                str(record["type"]),
                "id":                  str(record["id"]),

                "baseAmount":          str(record["attributes"].get("baseAmount")),
                "baseCurrency":        str(record["attributes"].get("baseCurrency")),
                "owner":               str(record["attributes"].get("owner")),
                "investment":          str(record["attributes"].get("investment")),
                "security":            str(record["attributes"].get("security")),
                "scenario":            str(record["attributes"].get("scenario")),
                "transactionType":     str(record["attributes"].get("transactionType")),
                "transactionCategory": str(record["attributes"].get("transactionCategory")),
                "transactionDate":     str(record["attributes"].get("transactionDate")),
                "asOfDate":            str(record["attributes"].get("asOfDate")),
                "localAmount":         str(record["attributes"].get("localAmount")),
                "localCurrency":       str(record["attributes"].get("localCurrency")),
                "valuePerShare":       str(record["attributes"].get("valuePerShare")),
                "shares":              str(record["attributes"].get("shares")),
                "costPerShare":        str(record["attributes"].get("costPerShare")),
                "custom1":             str(record["attributes"].get("custom1")),
                "custom2":             str(record["attributes"].get("custom2")),
                "custom3":             str(record["attributes"].get("custom3")),
                "custom4":             str(record["attributes"].get("custom4")),
                "custom5":             str(record["attributes"].get("custom5")),
                "custom6":             str(record["attributes"].get("custom6")),
                "groupId":             str(record["attributes"].get("groupId")),
                "originalId":          str(record["attributes"].get("originalId")),
                "description":         str(record["attributes"].get("description")),
                "deletedBy":           str(record["attributes"].get("deletedBy")),
                "deletionDate":        str(record["attributes"].get("deletionDate")),
                "relationships":       str(record.get("relationships")).replace("None","\'None\'")
                })

### deletedDataItems

In [None]:
class deletedDataItems(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":     str(self.fetched_at),
                "fetched_by":     str(self.fetched_by),
                "fetched_method": str(self.fetch_method),

                "type":           str(record["type"]),
                "id":             str(record["id"]),

                "name":           str(record["attributes"].get("name")),
                "dataItemType":   str(record["attributes"].get("dataItemType")),
                "isCarryOver":    str(record["attributes"].get("isCarryOver")),
                "isGlobal":       str(record["attributes"].get("isGlobal")),
                "deletedBy":      str(record["attributes"].get("deletedBy")),
                "deletionDate":   str(record["attributes"].get("deletionDate"))
                })

### deletedDeals

In [None]:
class deletedDeals(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":     str(self.fetched_at),
                "fetched_by":     str(self.fetched_by),
                "fetched_method": str(self.fetch_method),

                "type":           str(record["type"]),
                "id":             str(record["id"]),

                "deletedBy":      str(record["attributes"].get("deletedBy")),
                "deletedDate":    str(record["attributes"].get("deletedDate")),
                "relationships":  str(record.get("relationships")).replace("None","\'None\'")
                })

### deletedEntities

In [None]:
class deletedEntities(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":    str(self.fetched_at),
                "fetched_by":    str(self.fetched_by),
                "fetched_method":str(self.fetch_method),

                "type":          str(record["type"]),
                "id":            str(record["id"]),

                "name":          str(record["attributes"].get("name")),
                "deletedBy":     str(record["attributes"].get("deletedBy")),
                "deletionDate":  str(record["attributes"].get("deletionDate")),
                "entityType":    str(record["attributes"].get("entityType"))
                })

### deletedPeriodicData

In [None]:
class deletedPeriodicData(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":       str(self.fetched_at),
                "fetched_by":       str(self.fetched_by),
                "fetched_method":   str(self.fetch_method),

                "type":             str(record["type"]),
                "id":               str(record["id"]),

                "value":            str(record["attributes"].get("value")),
                "dataItem":         str(record["attributes"].get("dataItem")),
                "dataValueType":    str(record["attributes"].get("dataValueType")),
                "owner":            str(record["attributes"].get("owner")),
                "investment":       str(record["attributes"].get("investment")),
                "relationshipPath": str(record["attributes"].get("relationshipPath")),
                "scenario":         str(record["attributes"].get("scenario")),
                "currency":         str(record["attributes"].get("currency")),
                "security":         str(record["attributes"].get("security")),
                "segment":          str(record["attributes"].get("segment")),
                "periodEnd":        str(record["attributes"].get("periodEnd")),
                "periodLength":     str(record["attributes"].get("periodLength")),
                "asOfDate":         str(record["attributes"].get("asOfDate")),
                "lastModifiedDate":  str(record["attributes"].get("lastModifiedDate")),
                "relationships":    str(record.get("relationships")).replace("None","\'None\'")
                })

### deletedScenarios

In [None]:
class deletedScenarios(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":     str(self.fetched_at),
                "fetched_by":     str(self.fetched_by),
                "fetched_method": str(self.fetch_method),

                "type":           str(record["type"]),
                "id":             str(record["id"]),

                "name":           str(record["attributes"].get("name")),
                "deletedBy":      str(record["attributes"].get("deletedBy")),
                "deletionDate":   str(record["attributes"].get("deletionDate"))
                })

### deletedSecurities

In [None]:
class deletedSecurities(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
                "fetched_at":     str(self.fetched_at),
                "fetched_by":     str(self.fetched_by),
                "fetched_method": str(self.fetch_method),

                "type":           str(record["type"]),
                "id":             str(record["id"]),

                "name":           str(record["attributes"].get("name")),
                "shortName":      str(record["attributes"].get("shortName")),
                "recordType":     str(record["attributes"].get("type")),
                "subType":        str(record["attributes"].get("subType")),
                "isActive":       str(record["attributes"].get("isActive")),
                "hasOwnership":   str(record["attributes"].get("hasOwnership")),
                
                "deletedBy":      str(record["attributes"].get("deletedBy")),
                "deletionDate":   str(record["attributes"].get("deletionDate"))
                })

### deletedSegments

In [None]:
class deletedSegments(ilevel_to_bigquery):
    def add_record_to_read_buffer(self, record):
        self.read_buffer.append({
            "fetched_at":     str(self.fetched_at),
            "fetched_by":     str(self.fetched_by),
            "fetched_method": str(self.fetch_method),

            "type":           str(record["type"]),
            "id":             str(record["id"]),

            "name":           str(record["attributes"].get("name")),
            
            "deletedBy":      str(record["attributes"].get("deletedBy")),
            "deletionDate":   str(record["attributes"].get("deletionDate"))
            })

## Tail

In [None]:
def tail():
    log("Script finished adding " + f'{total_count:,}' + " to BigQuery")

    stop = timeit.default_timer()
    total_time = stop - start

    # output running time in a nice format.
    mins, secs = divmod(total_time, 60)
    hours, mins = divmod(mins, 60)

    log("Total running time: %d hours, %d minutes, %d seconds." % (hours, mins, secs))

## Main

In [None]:
def main(x,y):
    head()

    build_lists_of_assets_scenarios_dataitems()

    # Run all the endpoints
    periodicData           (endpoint="periodicData",           sort="-lastModifiedDate",append_supported=True, filters=get_filter_list(list_assets, list_scenarios, list_dataItems))
    assets                 (endpoint="entities/assets",        sort="name")
    cashTransactions       (endpoint="cashTransactions",       sort="-lastModifiedDate")
    cashTransactionTypes   (endpoint="cashTransactionTypes",   sort=None,pagination_supported=False)
    currencies             (endpoint="currencies",             sort=None,pagination_supported=False)
    dataItemCategories     (endpoint="dataItemCategories",     sort="name")
    deals                  (endpoint="deals",                  sort="-lastModifiedDate")
    documents              (endpoint="documents",              sort="-lastModifiedDate")
    entityGroupCategories  (endpoint="entityGroupCategories",  sort=None,pagination_supported=False)
    entityGroups           (endpoint="entityGroups",           sort=None,pagination_supported=False)
    folders                (endpoint="folders",                sort="name")
    funds                  (endpoint="entities/funds",         sort="-lastModifiedDate")
    scenarios              (endpoint="scenarios",              sort=None,pagination_supported=False)
    securities             (endpoint="securities",             sort="-lastModifiedDate")
    segments               (endpoint="segments",               sort="-lastModifiedDate")
    deletedCashTransactions(endpoint="deletedCashTransactions",sort="deletionDate",deleted_data=True)
    deletedDataItems       (endpoint="deletedDataItems",       sort="deletionDate",deleted_data=True)
    deletedDeals           (endpoint="deletedDeals",           sort="deletionDate",deleted_data=True)
    deletedEntities        (endpoint="deletedEntities",        sort="deletionDate",deleted_data=True)
    deletedPeriodicData    (endpoint="deletedPeriodicData",    sort="-lastModifiedDate")
    deletedScenarios       (endpoint="deletedScenarios",       sort="deletionDate",deleted_data=True)
    deletedSecurities      (endpoint="deletedSecurities",      sort="deletionDate",deleted_data=True)
    deletedSegments        (endpoint="deletedSegments",        sort="deletionDate",deleted_data=True)

    tail()

## To be deleted when running in cloud functions

In [None]:
main("x","y")