In [3]:
import config
import pymongo
import numpy as np
import pandas as pd
import json
import re
import copy
import ast


import sqlalchemy as db
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import reflection

In [4]:
myclient = pymongo.MongoClient(str(config.MONGO_URI))
stagingDb = myclient[str(config.DB_STAGING)]
analyseDb = myclient[str(config.DB_ANALYSE)]
stagingCol = stagingDb[config.COLL_PLAATJES]
stagingOud = stagingDb[config.COLL_STAGING_OUD]
stagingNieuw = stagingDb[config.COLL_STAGING_NIEUW]
analyseCol = analyseDb[config.COLL_ANALYSE]
analyseColClean = analyseDb[config.COLL_ANALYSE_CLEAN]

metaCollection = stagingDb['Kolominformatie']

In [5]:
AIRFLOW_WASSTRAAT_CONFIG = "./wasstraat_config/Wasstraat_Config_Harmonize.xlsx"

config.AIRFLOW_INPUTDIR

'/input'

In [6]:
df = pd.read_excel(AIRFLOW_WASSTRAAT_CONFIG, 'Objecten');
df.fillna(value="", inplace=True)
df

Unnamed: 0,Object,Tabellen,Overerven,Samenvoegen
0,Ignore,"["".*backup.*"", "".*kopie.*""]",,
1,Tekening,"[""TEKENING.*""]",,
2,Vondst,"[""VONDSTENLIJST"", ""VONDST"", ""VONDSTINHD""]",,
3,Spoor,"[""SPOREN"", ""SPOOR""]",,
4,Vulling,"[""VULLING.*""]",,
5,Dia,"[""DIA.*""]",,
6,Foto,"[""FOTO.*""]",,
7,Put,"[""PUT"", ""PUTTEN""]",,
8,Artefact,"[""ARTEFACT.*"", "".*AARDEWERK.*"", "".*steen.*"", ""...",,
9,Hout,"[""HOUT""]",Artefact,


In [36]:
HARMONIZE_AGGR = [
    {'$match': {"table": {"$in" : ["XXX","YYY"]}}},
    {"$match": {"table": {"$in" : ["XXX","YYY"]}}},
    {"$replaceRoot" : {"newRoot" : {"_id" : "$_id", "brondata" : "$$ROOT"}}},
    {"$addFields" : { 
        "projectcd" : "$brondata.project"}}
    ,{ "$merge": { "into": { "db": config.DB_ANALYSE, "coll": config.COLL_ANALYSE }, "on": "_id",  "whenMatched": "replace", "whenNotMatched": "insert"}}
]

HARMONIZER = False


def getAggrTables(root, tabellen, include): 
    lst_tabellen = ast.literal_eval(tabellen)
    if len(lst_tabellen) == 0:
        root["table"] = {"$in" : []} if include else {"$not": {"$in" : []}}
    elif len(lst_tabellen) == 1:
        root["table"] = {'$regex': lst_tabellen[0], '$options': 'i'} if include else {"$not": {'$regex': lst_tabellen[0], '$options': 'i'}}
    else:
        or_lst = []
        for tabel in lst_tabellen:
            or_lst.append({"table": {'$regex': tabel, '$options': 'i'}} if include else {"table": {"$not": {'$regex': tabel, '$options': 'i'}}})
        root = {"$or": or_lst} if include else {"$and": or_lst}
    return root


def getKolomValues(kolommen):
    if len(kolommen) == 0:
        return None
    elif len(kolommen) == 1:
        return "$brondata." + kolommen[0]
    else:
        val = kolommen.pop(0)
        return {'$ifNull': ["$brondata." + val, getKolomValues(kolommen)]}

    
def getAttributes(root, df_attributes):
    for index, row in df_attributes.iterrows():        
        kolommen = row['Kolommen']
        if re.match(r"\[.*\]", str(kolommen)):
            kolommen = ast.literal_eval(kolommen)
            root[row['Attribute']] = getKolomValues(kolommen)
        else:
            root[row['Attribute']] = kolommen
        
    return root

# initialize all attributes so that they can be used for inheritenace
def initAttributes(xl):
    aggr = copy.deepcopy(HARMONIZE_AGGR)
    lst_objecten = xl['Objecten']['Object'].unique()
    aggr = aggr[3]['$addFields']
    return { obj:getAttributes(copy.deepcopy(aggr), xl[obj]) for obj in lst_objecten if obj in xl.keys() }



def createAggr(soort, tabellen, tabellen_Ignore, xl, attrs):
    aggr = copy.deepcopy(HARMONIZE_AGGR)

    idx_addfields = 3
    # Fase Negative match
    aggr[0]["$match"] = getAggrTables(aggr[0]["$match"], tabellen_Ignore, False)
    # Fase Positive match
    aggr[1]["$match"] = getAggrTables(aggr[1]["$match"], tabellen, True)
    # Fase create addFields from inherited Class
    overerven_van = xl['Objecten'][xl['Objecten']['Object'] == soort]['Overerven'].values[0]

    # Fase create inherited fields
    if overerven_van != "" and pd.notna(overerven_van):
        idx_addfields += 1
        aggr_flds = copy.deepcopy(aggr[3])
        aggr_flds['$addFields'] = attrs[overerven_van]
        aggr.insert(idx_addfields-1, aggr_flds)

    # Fase create addFields
    if soort in xl.keys():
        aggr[idx_addfields]['$addFields'] = attrs[soort]
    aggr[idx_addfields]['$addFields']['soort'] = soort

    return aggr
   
    
def loadHarmonizer():
    xl = pd.read_excel(AIRFLOW_WASSTRAAT_CONFIG, sheet_name=None);
    attrs = initAttributes(xl)
    
    df = xl['Objecten']    
    df['Object'] = df['Object'].apply(lambda x: x.strip())
    df['aggr'] = df.apply(lambda x: createAggr(x['Object'], x['Tabellen'], df[df.Object == 'Ignore']['Tabellen'].values[0], xl, attrs), axis=1)   
    
    HARMONIZER = df    
    return HARMONIZER
  
    
def getAggr(soort, reload=False):
    global HARMONIZER
    
    if not HARMONIZER or reload:
        HARMONIZER = loadHarmonizer()
      
    if not soort in HARMONIZER['Object'].unique():
        raise Exception('Error while loading harmonizer, ' + soort + ' does not exist in Excel.')
    return HARMONIZER[HARMONIZER.Object == soort]['aggr'].values[0]
        
aggr = getAggr('Glas')

aggr

[{'$match': {'$and': [{'table': {'$not': {'$regex': '.*backup.*',
       '$options': 'i'}}},
    {'table': {'$not': {'$regex': '.*kopie.*', '$options': 'i'}}}]}},
 {'$match': {'$or': [{'table': {'$regex': 'GLAS', '$options': 'i'}},
    {'table': {'$regex': 'ARTF_GLS', '$options': 'i'}}]}},
 {'$replaceRoot': {'newRoot': {'_id': '$_id', 'brondata': '$$ROOT'}}},
 {'$addFields': {'projectcd': '$brondata.project',
   'putnr': {'$ifNull': ['$brondata.PUT', '$brondata.PUTNO']},
   'vlaknr': {'$ifNull': ['$brondata.VLAK', '$brondata.VLAKNO']},
   'spoornr': {'$ifNull': ['$brondata.SPOOR', '$brondata.SPOORNO']},
   'vondstnr': {'$ifNull': ['$brondata.VONDSTNO', '$brondata.Vondst']},
   'doosnr': {'$ifNull': ['$brondata.DOOSNO', '$brondata.10a']},
   'datering': {'$ifNull': ['$brondata.3',
     {'$ifNull': ['$brondata.1c', '$brondata.DATERING']}]},
   'artefactnr': '$brondata.SUBNO',
   'beschrijving': '$brondata.4b',
   'publicatiecode': '$brondata.1b',
   'dateringvanaf': '$brondata.3a',
   'd

In [38]:
pd.DataFrame(list(stagingOud.aggregate(aggr)))

In [22]:
aggr = copy.deepcopy(HARMONIZE_AGGR)

xl = pd.read_excel(AIRFLOW_WASSTRAAT_CONFIG, None);
df = xl['Objecten']

aggr = copy.deepcopy(HARMONIZE_AGGR)
lst_objecten = df['Object'].unique()
aggr = aggr[3]['$addFields']
attrs = { obj:getAttributes(copy.deepcopy(aggr), xl[obj]) for obj in lst_objecten if obj in xl.keys() }

attrs['Vondst']

df
#xl['Objecten'][xl['Object'] == soort]['Overerven'].values[0]
xl['Objecten'][xl['Objecten']['Object'] == 'Leer']

Unnamed: 0,Object,Tabellen,Overerven,Samenvoegen
10,Leer,"[""LEER""]",Artefact,


In [32]:
HARMONIZER['Object'].unique()

array(['Ignore', 'Tekening', 'Vondst', 'Spoor', 'Vulling', 'Dia', 'Foto',
       'Put', 'Artefact', 'Hout', 'Leer', 'Kleipijp', 'Steen', 'Glas',
       'Metaal', 'Aardewerk', 'Skelet', 'Spijker', 'Munt', 'Keramiek',
       'Bot'], dtype=object)

In [None]:
eval('Kolommen')