In [47]:
import psycopg2
import pandas as pd
from sqlalchemy import create_engine, types
import math
from requests import get, post
from json import loads, dumps

In [48]:
postgres_pw = TokenLibrary.getSecret("redacted", "redacted", "redacted")

url_str=f"postgresql://redacted:{postgres_pw}@redacted/rdt_dev"
engine = create_engine(url_str)

Get bearer token from Matrixian platform

In [49]:
matrixian_user = TokenLibrary.getSecret("redacted", "matrixian-account-user", "redacted")
matrixian_pw = TokenLibrary.getSecret("redacted", "matrixian-account-pass", "redacted")
data = {
            'grant_type': 'password',         # Must always be "password"
            'username': matrixian_user,
            'password': matrixian_pw             # Your login password
}
# Authenticate by generating a token
token = loads(post('https://api.matrixiangroup.com/token', data=data).text)["access_token"]
# Use the token in the headers in the requests to the company API
headers = {'accept': 'application/json', 'Authorization': f'Bearer {token}' }

# first request, to calculate total number of pages/items
params = ()
# Send a request to the company API using our token
resp=get('https://api.matrixiangroup.com/logistic/geo-object/vehicle-restriction', headers=headers, params=params, verify=False)
restrictions = loads(resp.text)

(Functions to) write to postgres

In [50]:
# simple runs pandas to_sql
# if it encounters an error it will try to write data per column, indicating which column contains the problematic data
def write_or_finderror(df,table):
    try:
        df.to_sql(table, engine, schema="stg_bordenanalyse", if_exists=mode, index=False, index_label=None,dtype={"constraint.and": types.JSON})
    except:
        print('Error writing data to table: '+table)
        for col in df.columns:
            print(col)
            try:
                df[col].to_sql("error", engine, schema="stg_bordenanalyse", if_exists='replace', index=False, index_label=None)
            except:
                print(df[col])
                print(df)
                df.to_clipboard()
                raise

In [76]:
def addToTable(data,mode):
    # Flatten data
    # print(data)
    df_nested_list = pd.json_normalize([data], record_path =['items'])
    df_constraint = pd.json_normalize(data,record_path=['items','constraint','and'],meta=[['items','id']])
    
    df_coords = pd.json_normalize(data, record_path =['items'],max_level=0)
    df_coords = pd.concat([df_coords['id'],df_coords['geometry'].apply(str)],axis=1)
    
    
    # Some pages of data have fewer columns
    cols=['id','cityName','remarks','createdDate','updatedDate','trafficDecision','report.id','report.name','report.type','report.description',
        'report.files','report.status','report.comments','report.createdDate','report.updatedDate','report.user.id','report.areaOfResponsibility','constraint.type',
        'administrativeReference.maximumSingleAxleWeight','administrativeReference.lzv','administrativeReference.municipalityName','type.type'];
    for col in cols:
        if col not in df_nested_list:
            df_nested_list[col] = None
            print("Adding col "+col+" to df")

    # Some pages of data have to many columns
    df_nested_list=df_nested_list[cols]

    # write
    write_or_finderror(df_nested_list,"matrixian_vehiclerestrictions")
    write_or_finderror(df_constraint,"matrixian_vehiclerestrictions_constraints")
    write_or_finderror(df_coords,"matrixian_vehiclerestrictions_geom")

In [52]:
# because the matrixian endpoint doesn't support verification, and this generates a ton of warnings, disable this specific warning
import warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter('ignore',InsecureRequestWarning)

The actual API call

In [77]:
# Results are paged, 25 items per page
total_pages=math.ceil(int(restrictions["totalCount"])/25)
# print(total_pages)
mode='replace'
for page in range(1,total_pages):
    # Set the GET parameters
    params = (('page',str(page)),)
    # Send a request to the company API using our token
    resp=get('https://api.matrixiangroup.com/logistic/geo-object/vehicle-restriction', headers=headers, params=params, verify=False)
    restrictions = loads(resp.text)
    
    addToTable(restrictions,mode)
    mode='append'
    
    print("\r","Values of page "+str(page)+"/"+str(total_pages-1)+" inserted")

Updating metadata

In [78]:
import psycopg2
try:
    postgres_pw = TokenLibrary.getSecret("redacted", "redacted", "redacted")
    conn = psycopg2.connect(dbname="rdt_dev",user="redacted",host="redacted",password=postgres_pw)
    conn.autocommit = True
    cur=conn.cursor()
    print("Database connected")
except:
    print("I am unable to connect to the database")

In [79]:
sql_meta='''
CALL update_metadata('rdt_dev'::VARCHAR,'stg_bordenanalyse'::VARCHAR,'matrixian_vehiclerestrictions'::VARCHAR,NOW(),'RDT_Bordenanalyse_MatrixianApiCall'::VARCHAR,'data ingest'::VARCHAR);
CALL update_metadata('rdt_dev'::VARCHAR,'stg_bordenanalyse'::VARCHAR,'matrixian_vehiclerestrictions_constraints'::VARCHAR,NOW(),'RDT_Bordenanalyse_MatrixianApiCall'::VARCHAR,'data ingest'::VARCHAR);
CALL update_metadata('rdt_dev'::VARCHAR,'stg_bordenanalyse'::VARCHAR,'matrixian_vehiclerestrictions_geom'::VARCHAR,NOW(),'RDT_Bordenanalyse_MatrixianApiCall'::VARCHAR,'data ingest'::VARCHAR);'''
cur.execute(sql_meta)