# [Design & Art Australia Online](https://www.daao.org.au/) Loading

## 0. Setting

In [1]:
import os
import pprint
from collections import Counter, defaultdict
from copy import deepcopy

import jsonlines
import pandas as pd
import pymongo
from bson import ObjectId
from bson.dbref import DBRef
from tqdm import tqdm

pp = pprint.PrettyPrinter(indent=2)

In [2]:
import sys

codefolder = "C:/ProjectCollections/Programs/Australia_Cultural_Data_Engine/codes"

sys.path.append(codefolder)
from acde import MongoDBManipulation as acde_manip
from daao import JsonProcessing as daao_jp
from daao import MongoDBManipulation as daao_manip
from general import GeneralFunctions as gen_gf
from general import JsonProcessing as gen_jp
from general import MongoDBManipulation as gen_manip

# data_folder = (
#     "D:/Program_Data/Australia_Cultural_Data_Engine_Data/design_art_australia_online"
# )
# js_folder = os.path.join(data_folder, "exported_json")
# csv_folder = os.path.join(data_folder, "exported_csv")

## 1. DAAO MongoDB Connection

In [3]:
daao_meta = daao_manip.DAAO_MetaCollection()
acde_opr = acde_manip.ACDE_Manipulation()

## 2. DAAO DB Meta Collection

1. get all collections and corresponding classes
2. get field information of each collection (Removed, All, Date Objects)

### 2.1 DAAO Class Extraction

In [4]:
class_names = daao_meta.extract_class()

In [5]:
# Display multi-class collection
print("List all multi-class collection:\n")
for coll, cls_name in class_names.items():
    if len(cls_name) > 1:
        print(f'【{coll}】\n{", ".join(cls_name)}\n')

List all multi-class collection:

【controlledvocab】
ControlledVocab.ANZSICOccupation, ControlledVocab.EventType, ControlledVocab.GroupType, ControlledVocab.IndigenousCountry, ControlledVocab.IndigenousLanguage, ControlledVocab.Language, ControlledVocab.Medium, ControlledVocab.RecognitionType, ControlledVocab.RecordStatus, ControlledVocab.ResourceType, ControlledVocab.Source, ControlledVocab.Tag

【xactivity】
VersionedDocument.XActivity.XEvent.Event, VersionedDocument.XActivity.XEvent.EventGroup, VersionedDocument.XActivity.XRecognition.Recognition

【xactivity_ori】
VersionedDocument.XActivity.XEvent.Event, VersionedDocument.XActivity.XEvent.EventGroup, VersionedDocument.XActivity.XRecognition.Recognition

【xparty】
VersionedDocument.XParty.Person, VersionedDocument.XParty.PersonGroup

【xparty_ori】
VersionedDocument.XParty.Person, VersionedDocument.XParty.PersonGroup

【xpredicate】
XPredicate.CollectionPredicate, XPredicate.EventEventPredicate, XPredicate.EventGroupPredicate, XPredicate.Eve

### 2.2 DAAO Field Name Extraction

In [6]:
daao_objs_info = daao_meta.extract_fieldName()

person: 100%|███████████████████████████████████████████████████████████████████| 30693/30693 [01:06<00:00, 460.17it/s]
personGroup: 100%|████████████████████████████████████████████████████████████████| 4660/4660 [00:05<00:00, 848.34it/s]
event: 100%|███████████████████████████████████████████████████████████████████| 21906/21906 [00:12<00:00, 1783.47it/s]
eventGroup: 100%|████████████████████████████████████████████████████████████████████| 94/94 [00:00<00:00, 2048.97it/s]
recognition: 100%|███████████████████████████████████████████████████████████████| 5481/5481 [00:01<00:00, 2962.73it/s]
work: 100%|████████████████████████████████████████████████████████████████████| 23780/23780 [00:09<00:00, 2636.38it/s]
collection: 100%|██████████████████████████████████████████████████████████████| 11747/11747 [00:03<00:00, 3475.86it/s]
ansicOcc_menu: 100%|██████████████████████████████████████████████████████████████| 825/825 [00:00<00:00, 17600.00it/s]
eventType_menu: 100%|███████████████████

### 2.3 Lookup Project Pipeline Setting

In [7]:
lookupProj_info = daao_meta.construct_LookUpPipelines_loading(daao_objs_info)

## 3 Data Extraction

1. Add is_locked 【Done】
2. Add json format for individual levels
3. Add empty attribute for each attribute? (For easier pd.json_normalize) Maybe shouldn't clean empty value?

### 3.1 Person

#### 3.1.0 Pipeline Setting

In [8]:
#####
# Set aggregation variables
#####

curr_collname = "person"
curr_lvl = "person"

bool_filters = {"is_deleted": False, "is_shadow": False, "is_locked": False}
value_filters = [daao_objs_info[curr_collname]["filter_cond"]]

person_initRename = {"primary_name.prefix": "primary_name.title"}

# set fields that need to be unwinded for complenmentary construction
# in person extraction, we need role, other_occupations and periods_active
# to construct all_roles
person_fields_convert = [
    "roles",
    "other_occupations",
    "periods_active",
    "alternative_names",
    "trainings",
]
# the following pipeline need to be implemented on the non-array object
person_fields_newFields = {
    "alternative_names.display_name": {
        "$trim": {
            "input": {
                "$concat": [
                    "$alternative_names.given_names",
                    " ",
                    "$alternative_names.family_name",
                ]
            }
        }
    },
    "career_periods_1": {
        "occupation": {"type": "$roles.broad_role", "title": "$roles.detailed_role"},
        "coverage_range": {"date_range": "$periods_active"},
    },
    "career_periods_2": {
        "occupation": {
            "title": "$other_occupations.title",
            "type": "$other_occupations.type",
        },
        "coverage_range": "$other_occupations.coverage_range",
    },
    "education_trainings": {
        "organization": {
            "course": "$trainings.course_name",
            "coverage": "$trainings.coverage_range",
        }
    },
}

# set final rename fields, the original fields will be dropped
person_fields_finalRenames = {
    "longterm_roles": "roles",
    "nla": "nla_id",
}
# set final additional field construction
person_fields_finalNewFields = {
    "ori_url": {"$concat": ["https://www.daao.org.au/bio/", "$slug"]},
    "display_name": {
        "$trim": {
            "input": {
                "$concat": [
                    "$primary_name.given_names",
                    " ",
                    "$primary_name.family_name",
                ]
            }
        }
    },
    "career.career_periods": {"$setUnion": ["$career_periods_1", "$career_periods_2"]},
    "data_source": "DAAO",
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "ori_dbid": "$_id",
}

#####
# Construct fields
#####
# get initial project fields: all first level fields
person_fields_initProj = daao_manip.daao_get_initProjFields(
    daao_objs_info, curr_collname
)

#####
# Get original renamed fields that need to be removed
#####
# remove all "related" attributes, meta attribute having underscore
# and the initial renamed attributes
person_initRemoval = set(
    f.split(".")[0] if f.startswith("related") else f
    for f in daao_objs_info[curr_collname]["remove_fields"]
)

# get name of first level of field having 'coverage' prefix
person_fields_levelUp_root = list(
    set(
        ".".join(f.split(".")[:2])
        for f in daao_objs_info[curr_collname]["date_objects"]
        if "coverage" in f
    )
)
# get name of first level of ARRAY field having 'coverage' prefix
person_fields_arrayDates = set(
    f.split(".", 1)[0]
    for f in person_fields_levelUp_root
    if f.split(".", 1)[0].endswith("s")
)
# get all fields need to be unwinded
person_fields_unwind = list(person_fields_arrayDates | set(person_fields_convert))

#####
# Construct Stages
#####
#
person_InitStage_Match = gen_manip.construct_MatchStage(
    bool_filters=bool_filters, value_filters=value_filters,
)
person_InitStage_Rename = gen_manip.construct_ReconstructFieldsStage(
    rename_pairs=person_initRename
)
person_InitStage_Remove = gen_manip.construct_FinalProjectionStage(
    remove_list=person_initRemoval
)
person_Stage2_Unwinds = gen_manip.construct_unwindStage(person_fields_unwind)
person_Stage3_LevelUpDate = daao_manip.daao_construct_levelUpDateStage(
    daao_objs_info[curr_collname]["date_objects"]
)
person_Stage4_AddNewFields = [{"$set": person_fields_newFields}]
person_Stage5_Group = daao_manip.daao_construct_groupStage(
    tuple(
        list(person_fields_initProj)
        + [f.split(".")[0] for f in person_fields_newFields.keys()]
    ),
    person_fields_unwind + [f.split(".")[0] for f in person_fields_newFields.keys()],
)
person_FinalStage_Reconstruct = gen_manip.construct_ReconstructFieldsStage(
    rename_pairs=person_fields_finalRenames,
    complex_action_pairs=person_fields_finalNewFields,
    drop_fields=["career_periods_1", "career_periods_2", "trainings", "slug", "_cls",],
)

In [9]:
#####
#
# Construct person extraction pipeline
#
#####

person_agg_pipeline = (
    person_InitStage_Match
    + person_InitStage_Rename
    + person_InitStage_Remove
    + person_Stage2_Unwinds
    + person_Stage3_LevelUpDate
    + person_Stage4_AddNewFields
    + person_Stage5_Group
    + person_FinalStage_Reconstruct
)

pp.pprint(person_agg_pipeline)

[ { '$match': { '_cls': {'$eq': 'VersionedDocument.XParty.Person'},
                'is_deleted': False,
                'is_locked': False,
                'is_shadow': False}},
  {'$addFields': {'primary_name.prefix': '$primary_name.title'}},
  {'$project': {'primary_name.title': 0}},
  { '$project': { '_authors': 0,
                  '_legacy_bio_id': 0,
                  '_types': 0,
                  '_update_timestamp': 0,
                  'alternative_names._cls': 0,
                  'alternative_names._types': 0,
                  'arrivals._cls': 0,
                  'arrivals._types': 0,
                  'arrivals.date._cls': 0,
                  'arrivals.date._types': 0,
                  'arrivals.date.is_circa': 0,
                  'arrivals.date.precision': 0,
                  'biography._cls': 0,
                  'biography._types': 0,
                  'birth._cls': 0,
                  'birth._types': 0,
                  'birth.coverage._cls': 0,
              

#### 3.1.1 Extracton & Loading

【To Do】<br>
It's so silly to iterate all values twice.<br>
2 bottlenecks:<br>
1. How shall we get the proper attributes in an unknown structure object that has more than 2 levels? 
    - Here I can ignore the index of array.     
2. How shall we known lookup dbref without iteration.
    - Here I need to iterate all stuffs one by one. Is there any way that can get all dbref in one shoot?

In [10]:
person_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)
ori_individuals = []
lookup_individuals = []

# clear all person records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

for individual in tqdm(
    person_collection.aggregate(person_agg_pipeline, allowDiskUse=True,),
    total=person_collection.count_documents(person_InitStage_Match[0]["$match"]),
    position=0,
    leave=True,
):
    # get revision number
    version_no = daao_manip.daao_get_revision_no(
        daao_meta.daao_db,
        daao_objs_info[curr_collname]["collection_name"],
        individual.get("_id"),
    )
    individual.update(version_no)
    # clean biography text,
    if individual.get("summary"):
        ori_text = individual["summary"]
        cleaned_text = daao_jp.person_bioTextCleansing(ori_text)
        individual["summary"] = cleaned_text
    if individual.get("biography") and individual["biography"].get("text"):
        ori_text = individual["biography"]["text"]
        cleaned_text = daao_jp.person_bioTextCleansing(ori_text)
        individual["biography"]["text"] = daao_jp.person_bioTextCleansing(ori_text)
    # # store original record
    # ori_individuals.append(individual)
    lookup_indv = deepcopy(individual)
    ##########
    # look up all/the other DBRefs
    # There're 2 solutions for looking up the DBRefs
    # 1. Scan the full record out of MongoDB and look up all existing DBRefs dynamicly
    # 2. Hard code all DBRefs in MongoDB aggregation pipelines.
    #    This way can assign specific projections in each lookup, but we
    #    need to clearly understand all records.
    # For local single thread, it's acceptable to use Method1.
    # But it would be better to update to Method2 after migrating to Cloud
    ##########
    lookup_indv = daao_manip.daao_get_dbrefInfo(
        lookup_indv, lookupProj_info, db=daao_meta.daao_db
    )
    # we don't need _id for data loading
    lookup_indv.pop("_id")
    # remove all attributes having empty values
    lookup_indv = gen_jp.clean_empty_values(lookup_indv)
    # # store final record
    # lookup_individuals.append(lookup_indv)
    # insert final record to acde_db
    daao_meta.acde_db[curr_lvl].insert_one(lookup_indv)
    # if t.n == 100:
    #     break

100%|████████████████████████████████████████████████████████████████████████████| 17365/17365 [05:53<00:00, 49.17it/s]


### 3.2 Organization (PersonGroup)

#### 3.2.0 Pipeline Setting

In [11]:
#####
# Set aggregation variables
#####

curr_collname = "personGroup"
curr_lvl = "organization"

bool_filters = {"is_deleted": False, "is_shadow": False, "is_locked": False}
value_filters = [daao_objs_info[curr_collname]["filter_cond"]]

org_initRename = {}

# set fields that need to be unwinded for complenmentary construction
# in personGroup extraction, we need role, other_occupations and periods_active
# to construct operation
org_fields_convert = [
    "roles",
    "other_occupations",
    "periods_active",
    "residences",
]
# the following pipeline need to be implemented on the non-array object
org_fields_newFields = {
    "operation_periods_1": {
        "function": {"title": "$roles.detailed_role", "type": "$roles.broad_role"},
        "coverage_range": "$periods_active",
    },
    "operation_periods_2": {
        "function": {
            "title": "$other_occupations.title",
            "type": "$other_occupations.type",
        },
        "coverage_range": "$other_occupations.coverage_range",
    },
}

# set final rename fields, the original fields will be dropped
org_fields_finalRenames = {
    "longterm_roles": "roles",
    "nla": "nla_id",
    "locations": "residences",
}
# set final additional field construction
org_fields_finalNewFields = {
    "ori_url": {"$concat": ["https://www.daao.org.au/bio/group/", "$slug"]},
    "data_source": "DAAO",
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "ori_dbid": "$_id",
    "operation.operation_periods": {
        "$setUnion": ["$operation_periods_1", "$operation_periods_2"]
    },
    "types": {"$setUnion": ["$type", "$types"]},
    "ori_dbid": "_id",
}

#####
# Construct fields
#####
# get initial project fields: all first level fields
org_fields_initProj = daao_manip.daao_get_initProjFields(daao_objs_info, curr_collname)

#####
# Get original renamed fields that need to be removed
#####
# remove all "related" attributes, meta attribute having underscore
# and the initial renamed attributes
org_initRemoval = set(
    f.split(".")[0] if f.startswith("related") else f
    for f in daao_objs_info[curr_collname]["remove_fields"]
) | set(v.strip("$") for v in org_initRename.values())

# get name of first level of field having 'coverage' prefix
org_fields_levelUp_root = list(
    set(
        ".".join(f.split(".")[:2])
        for f in daao_objs_info[curr_collname]["date_objects"]
        if "coverage" in f
    )
)
# get name of first level of ARRAY field having 'coverage' prefix
org_fields_arrayDates = set(
    f.split(".", 1)[0]
    for f in org_fields_levelUp_root
    if f.split(".", 1)[0].endswith("s")
)
# get all fields need to be unwinded
org_fields_unwind = list(org_fields_arrayDates | set(org_fields_convert))

#####
# Construct Stages
#####
#
org_InitStage_Match = gen_manip.construct_MatchStage(
    bool_filters=bool_filters, value_filters=value_filters,
)
org_InitStage_Rename = [{"$addFields": org_initRename}]
org_InitStage_Remove = gen_manip.construct_FinalProjectionStage(
    remove_list=org_initRemoval
)
org_Stage2_Unwinds = gen_manip.construct_unwindStage(org_fields_unwind)
org_Stage3_LevelUpDate = daao_manip.daao_construct_levelUpDateStage(
    daao_objs_info[curr_collname]["date_objects"]
)
org_Stage4_AddNewFields = [{"$set": org_fields_newFields}]
org_Stage5_Group = daao_manip.daao_construct_groupStage(
    tuple(
        list(org_fields_initProj)
        + [f.split(".")[0] for f in org_fields_newFields.keys()]
    ),
    org_fields_unwind + [f.split(".")[0] for f in org_fields_newFields.keys()],
)
org_FinalStage_Reconstruct = gen_manip.construct_ReconstructFieldsStage(
    rename_pairs=org_fields_finalRenames,
    complex_action_pairs=org_fields_finalNewFields,
    drop_fields=["operation_periods_1", "operation_periods_2", "type", "slug", "_cls"],
)

In [12]:
#####
#
# Check personGroup extraction pipeline
#
#####
org_agg_pipeline = (
    org_InitStage_Match
    + org_InitStage_Rename
    + org_InitStage_Remove
    + org_Stage2_Unwinds
    + org_Stage3_LevelUpDate
    + org_Stage4_AddNewFields
    + org_Stage5_Group
    + org_FinalStage_Reconstruct
)
pp.pprint(org_agg_pipeline)

[ { '$match': { '_cls': {'$eq': 'VersionedDocument.XParty.PersonGroup'},
                'is_deleted': False,
                'is_locked': False,
                'is_shadow': False}},
  {'$addFields': {}},
  { '$project': { '_authors': 0,
                  '_legacy_bio_id': 0,
                  '_types': 0,
                  '_update_timestamp': 0,
                  'biography._cls': 0,
                  'biography._types': 0,
                  'locked_biographies._cls': 0,
                  'locked_biographies._types': 0,
                  'other_occupations._cls': 0,
                  'other_occupations._types': 0,
                  'periods_active._cls': 0,
                  'periods_active._types': 0,
                  'periods_active.end._cls': 0,
                  'periods_active.end._types': 0,
                  'periods_active.end.is_circa': 0,
                  'periods_active.end.precision': 0,
                  'periods_active.start._cls': 0,
                  'periods_activ

#### 3.2.1 Extracton & Loading

In [13]:
org_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)
ori_individuals = []
lookup_individuals = []

# clear all organization(personGroup) records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

with tqdm(
    total=org_collection.count_documents(org_InitStage_Match[0]["$match"]),
    position=0,
    leave=True,
) as t:
    for individual in org_collection.aggregate(org_agg_pipeline, allowDiskUse=True,):
        # get revision number
        version_no = daao_manip.daao_get_revision_no(
            daao_meta.daao_db,
            daao_objs_info[curr_collname]["collection_name"],
            individual.get("_id"),
        )
        individual.update(version_no)
        # clean biography text,
        if individual.get("summary"):
            ori_text = individual["summary"]
            cleaned_text = daao_jp.person_bioTextCleansing(ori_text)
            individual["summary"] = cleaned_text
        if individual.get("biography") and individual["biography"].get("text"):
            ori_text = individual["biography"]["text"]
            cleaned_text = daao_jp.person_bioTextCleansing(ori_text)
            individual["biography"]["text"] = daao_jp.person_bioTextCleansing(ori_text)
        # # store original record
        # ori_individuals.append(individual)
        lookup_indv = deepcopy(individual)
        ##########
        # look up all/the other DBRefs
        # There're 2 solutions for looking up the DBRefs
        # 1. Scan the full record out of MongoDB and look up all existing DBRefs dynamicly
        # 2. Hard code all DBRefs in MongoDB aggregation pipelines.
        #    This way can assign specific projections in each lookup, but we need to clearly understand
        #    all records.
        # For local single thread, it's acceptable to use Method1.
        # But it would be better to update to Method2 after migrating to Cloud
        ##########
        lookup_indv = daao_manip.daao_get_dbrefInfo(
            lookup_indv, lookupProj_info, db=daao_meta.daao_db
        )
        # we don't need _id for data loading
        lookup_indv.pop("_id")
        # remove all attributes having empty values
        lookup_indv = gen_jp.clean_empty_values(lookup_indv)
        # # store final record
        # lookup_individuals.append(lookup_indv)
        # insert final record to acde_db
        daao_meta.acde_db[curr_lvl].insert_one(lookup_indv)
        t.update(1)

100%|████████████████████████████████████████████████████████████████████████████████| 933/933 [00:10<00:00, 85.59it/s]


### 3.3 Work

#### 3.3.0 Pipeline Setting

In [14]:
#####
# Set aggregation variables
#####

curr_collname = "work"
curr_lvl = "work"

bool_filters = {"is_deleted": False, "is_locked": False}
value_filters = [daao_objs_info[curr_collname]["filter_cond"]]

# the following pipeline need to be implemented on the non-array object
work_fields_newFields = {
    "coverage_range": {"date_range": "$date",},
}

# set final additional field construction
work_fields_finalNewFields = {
    "description": {"$trim": {"input": "$description"}},
    "ori_url": {"$concat": ["https://www.daao.org.au/bio/work/", "$slug"]},
    "data_source": "DAAO",
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "ori_dbid": "$_id",
}

#####
# Get original renamed fields that need to be removed
#####
# remove all "related" attributes, meta attribute having underscore
# and the initial renamed attributes
work_initRemoval = set(
    f.split(".")[0] if f.startswith("related") else f
    for f in daao_objs_info[curr_collname]["remove_fields"]
)

#####
# Construct Stages
#####
#
work_InitStage_Match = gen_manip.construct_MatchStage(
    bool_filters=bool_filters, value_filters=value_filters,
)
work_InitStage_Remove = gen_manip.construct_FinalProjectionStage(
    remove_list=work_initRemoval
)
work_Stage2_LevelUpDate = daao_manip.daao_construct_levelUpDateStage(
    daao_objs_info[curr_collname]["date_objects"]
)
work_Stage3_AddNewFields = [{"$set": work_fields_newFields}]
work_FinalStage_Reconstruct = gen_manip.construct_ReconstructFieldsStage(
    complex_action_pairs=work_fields_finalNewFields,
    drop_fields=["date", "slug", "_cls"],
)

In [15]:
#####
#
# Check work extraction pipeline
#
#####
work_agg_pipeline = (
    work_InitStage_Match
    + work_InitStage_Remove
    + work_Stage2_LevelUpDate
    + work_Stage3_AddNewFields
    + work_FinalStage_Reconstruct
)
pp.pprint(work_agg_pipeline)

[ { '$match': { '_cls': {'$eq': 'VersionedDocument.XWork.Work'},
                'is_deleted': False,
                'is_locked': False}},
  { '$project': { '_authors': 0,
                  '_types': 0,
                  '_update_timestamp': 0,
                  'date._cls': 0,
                  'date._types': 0,
                  'date.end._cls': 0,
                  'date.end._types': 0,
                  'date.end.is_circa': 0,
                  'date.end.precision': 0,
                  'date.start._cls': 0,
                  'date.start._types': 0,
                  'date.start.is_circa': 0,
                  'date.start.precision': 0,
                  'manufacturers._cls': 0,
                  'manufacturers._types': 0,
                  'mediums._cls': 0,
                  'mediums._types': 0,
                  'related_collections': 0,
                  'related_events': 0,
                  'related_places': 0,
                  'related_recognitions': 0}},
  { '$addFields':

#### 3.3.1 Extracton & Loading

In [16]:
work_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)
ori_individuals = []
lookup_individuals = []

# clear all work records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

with tqdm(
    total=work_collection.count_documents(work_InitStage_Match[0]["$match"]),
    position=0,
    leave=True,
) as t:
    for individual in work_collection.aggregate(work_agg_pipeline, allowDiskUse=True,):
        # get revision number
        version_no = daao_manip.daao_get_revision_no(
            daao_meta.daao_db,
            daao_objs_info[curr_collname]["collection_name"],
            individual.get("_id"),
        )
        individual.update(version_no)
        # # store original record
        # ori_individuals.append(individual)
        lookup_indv = deepcopy(individual)
        ##########
        # look up all/the other DBRefs
        ##########
        lookup_indv = daao_manip.daao_get_dbrefInfo(
            lookup_indv, lookupProj_info, db=daao_meta.daao_db
        )
        # we don't need _id for data loading
        lookup_indv.pop("_id")
        # remove all attributes having empty values
        lookup_indv = gen_jp.clean_empty_values(lookup_indv)
        # # store final record
        # lookup_individuals.append(lookup_indv)
        # insert final record to acde_db
        daao_meta.acde_db[curr_lvl].insert_one(lookup_indv)
        t.update(1)

100%|███████████████████████████████████████████████████████████████████████████| 23729/23729 [01:43<00:00, 228.45it/s]


### 3.4 Event

#### 3.4.0 Pipeline Setting

In [17]:
#####
# Set aggregation variables
#####

curr_collname = "event"
curr_lvl = "event"

bool_filters = {"is_deleted": False, "is_locked": False}
value_filters = [daao_objs_info[curr_collname]["filter_cond"]]

event_initRename = {"types_old": "types"}

# set fields that need to be unwinded for complenmentary construction
# in personGroup extraction, we need role, other_occupations and periods_active
# to construct operation
event_fields_convert = ["coverages", "types"]
# the following pipeline need to be implemented on the non-array object
event_fields_newFields = {
    # "locations": {"coverage_range": "$coverages",},
    "coverage_ranges": "$coverages",
    "types": {"primary_type": "$types_old"},
}

# set final additional field construction
event_fields_finalNewFields = {
    "description": {"$trim": {"input": "$description"}},
    "ori_url": {"$concat": ["https://www.daao.org.au/bio/event/", "$slug"]},
    "data_source": "DAAO",
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "ori_dbid": "$_id",
}

#####
# Construct fields
#####
# get initial project fields: all first level fields
event_fields_initProj = daao_manip.daao_get_initProjFields(
    daao_objs_info, curr_collname
)
#####
# Get original renamed fields that need to be removed
#####
# remove all "related" attributes, meta attribute having underscore
# and the initial renamed attributes
event_initRemoval = set(
    f.split(".")[0] if f.startswith("related") else f
    for f in daao_objs_info[curr_collname]["remove_fields"]
)

# get name of first level of field having 'coverage' prefix
event_fields_levelUp_root = list(
    set(
        ".".join(f.split(".")[:2])
        for f in daao_objs_info[curr_collname]["date_objects"]
        if "coverage" in f
    )
)
# get name of first level of ARRAY field having 'coverage' prefix
event_fields_arrayDates = set(
    f.split(".", 1)[0]
    for f in event_fields_levelUp_root
    if f.split(".", 1)[0].endswith("s")
)
# get all fields need to be unwinded
event_fields_unwind = list(event_fields_arrayDates | set(event_fields_convert))

#####
# Construct Stages
#####
#
event_InitStage_Match = gen_manip.construct_MatchStage(
    bool_filters=bool_filters, value_filters=value_filters,
)
event_InitStage_Rename = gen_manip.construct_ReconstructFieldsStage(
    rename_pairs=event_initRename
)
event_InitStage_Remove = gen_manip.construct_FinalProjectionStage(
    remove_list=event_initRemoval
)
event_Stage2_Unwinds = gen_manip.construct_unwindStage(event_fields_unwind)
event_Stage3_LevelUpDate = daao_manip.daao_construct_levelUpDateStage(
    daao_objs_info[curr_collname]["date_objects"]
)
event_Stage4_AddNewFields = [{"$set": event_fields_newFields}]
event_Stage5_Group = daao_manip.daao_construct_groupStage(
    tuple(
        list(event_fields_initProj)
        + [f.split(".")[0] for f in event_fields_newFields.keys()]
    ),
    event_fields_unwind + [f.split(".")[0] for f in event_fields_newFields.keys()],
)
event_FinalStage_Reconstruct = gen_manip.construct_ReconstructFieldsStage(
    drop_fields=["slug", "_cls", "coverages"],
    complex_action_pairs=event_fields_finalNewFields,
)

In [18]:
#####
#
# Check event extraction pipeline
#
#####

event_agg_pipeline = (
    event_InitStage_Match
    + event_InitStage_Rename
    + event_InitStage_Remove
    + event_Stage2_Unwinds
    + event_Stage3_LevelUpDate
    + event_Stage4_AddNewFields
    + event_Stage5_Group
    + event_FinalStage_Reconstruct
)

pp.pprint(event_agg_pipeline)

[ { '$match': { '_cls': {'$eq': 'VersionedDocument.XActivity.XEvent.Event'},
                'is_deleted': False,
                'is_locked': False}},
  {'$addFields': {'types_old': '$types'}},
  {'$project': {'types': 0}},
  { '$project': { '_authors': 0,
                  '_legacy_event_group_pk': 0,
                  '_legacy_event_group_slug': 0,
                  '_types': 0,
                  '_update_timestamp': 0,
                  'coverages._cls': 0,
                  'coverages._types': 0,
                  'coverages.date_range._cls': 0,
                  'coverages.date_range._types': 0,
                  'coverages.date_range.end._cls': 0,
                  'coverages.date_range.end._types': 0,
                  'coverages.date_range.end.is_circa': 0,
                  'coverages.date_range.end.precision': 0,
                  'coverages.date_range.start._cls': 0,
                  'coverages.date_range.start._types': 0,
                  'coverages.date_range.start.is_c

#### 3.4.1 Extracton & Loading

In [19]:
event_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)
ori_individuals = []
lookup_individuals = []

# clear all event records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

with tqdm(
    total=event_collection.count_documents(event_InitStage_Match[0]["$match"]),
    position=0,
    leave=True,
) as t:
    for individual in event_collection.aggregate(
        event_agg_pipeline, allowDiskUse=True,
    ):
        # get revision number
        version_no = daao_manip.daao_get_revision_no(
            daao_meta.daao_db,
            daao_objs_info[curr_collname]["collection_name"],
            individual.get("_id"),
        )
        individual.update(version_no)
        # clean description text?
        # # store original record
        # ori_individuals.append(individual)
        lookup_indv = deepcopy(individual)
        ##########
        # look up all/the other DBRefs
        ##########
        lookup_indv = daao_manip.daao_get_dbrefInfo(
            lookup_indv, lookupProj_info, db=daao_meta.daao_db
        )
        # we don't need _id for data loading
        lookup_indv.pop("_id")
        # remove all attributes having empty values
        lookup_indv = gen_jp.clean_empty_values(lookup_indv)
        # # store final record
        # lookup_individuals.append(lookup_indv)
        # insert final record to acde_db
        daao_meta.acde_db[curr_lvl].insert_one(lookup_indv)
        t.update(1)

100%|███████████████████████████████████████████████████████████████████████████| 21838/21838 [02:52<00:00, 126.79it/s]


### 3.5 Recognition

#### 3.5.0 Pipeline Setting

In [20]:
#####
# Set aggregation variables
#####

curr_collname = "recognition"
curr_lvl = "recognition"

bool_filters = {"is_deleted": False, "is_locked": False}
value_filters = [daao_objs_info[curr_collname]["filter_cond"]]

# set fields that need to be unwinded for complenmentary construction
# in personGroup extraction, we need role, other_occupations and periods_active
# to construct operation
rec_fields_convert = ["coverages", "dates"]
# the following pipeline need to be implemented on the non-array object
rec_fields_newFields = {}

# set final additional field construction
rec_fields_finalNewFields = {
    "description": {"$trim": {"input": "$description"}},
    "ori_url": {"$concat": ["https://www.daao.org.au/bio/recognition/", "$slug"]},
    "data_source": "DAAO",
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "ori_dbid": "$_id",
}

#####
# Construct fields
#####
# get initial project fields: all first level fields
rec_fields_initProj = daao_manip.daao_get_initProjFields(daao_objs_info, curr_collname)
#####
# Get original renamed fields that need to be removed
#####
# remove all "related" attributes, meta attribute having underscore
# and the initial renamed attributes
rec_initRemoval = set(
    f.split(".")[0] if f.startswith("related") else f
    for f in daao_objs_info[curr_collname]["remove_fields"]
)

# get name of first level of field having 'coverage' prefix
rec_fields_levelUp_root = list(
    set(
        ".".join(f.split(".")[:2])
        for f in daao_objs_info[curr_collname]["date_objects"]
        if "coverage" in f
    )
)
# get name of first level of ARRAY field having 'coverage' prefix
rec_fields_arrayDates = set(
    f.split(".", 1)[0]
    for f in rec_fields_levelUp_root
    if f.split(".", 1)[0].endswith("s")
)
# get all fields need to be unwinded
rec_fields_unwind = list(rec_fields_arrayDates | set(rec_fields_convert))

#####
# Construct Stages
#####
#
rec_InitStage_Match = gen_manip.construct_MatchStage(
    bool_filters=bool_filters, value_filters=value_filters,
)

rec_InitStage_Remove = gen_manip.construct_FinalProjectionStage(
    remove_list=rec_initRemoval
)
rec_Stage2_Unwinds = gen_manip.construct_unwindStage(rec_fields_unwind)
rec_Stage3_LevelUpDate = daao_manip.daao_construct_levelUpDateStage(
    daao_objs_info[curr_collname]["date_objects"]
)
rec_Stage4_Group = daao_manip.daao_construct_groupStage(
    tuple(list(rec_fields_initProj)), rec_fields_unwind,
)
rec_FinalStage_Reconstruct = gen_manip.construct_ReconstructFieldsStage(
    drop_fields=["slug", "_cls"], complex_action_pairs=rec_fields_finalNewFields,
)

In [21]:
#####
#
# Check recognition extraction pipeline
#
#####
rec_agg_pipeline = (
    rec_InitStage_Match
    + rec_InitStage_Remove
    + rec_Stage2_Unwinds
    + rec_Stage3_LevelUpDate
    + rec_Stage4_Group
    + rec_FinalStage_Reconstruct
)
pp.pprint(rec_agg_pipeline)

[ { '$match': { '_cls': { '$eq': 'VersionedDocument.XActivity.XRecognition.Recognition'},
                'is_deleted': False,
                'is_locked': False}},
  { '$project': { '_authors': 0,
                  '_types': 0,
                  '_update_timestamp': 0,
                  'coverages._cls': 0,
                  'coverages._types': 0,
                  'coverages.date._cls': 0,
                  'coverages.date._types': 0,
                  'coverages.date.is_circa': 0,
                  'coverages.date.precision': 0,
                  'dates._cls': 0,
                  'dates._types': 0,
                  'dates.is_circa': 0,
                  'dates.precision': 0}},
  {'$unwind': {'path': '$dates', 'preserveNullAndEmptyArrays': True}},
  {'$unwind': {'path': '$coverages', 'preserveNullAndEmptyArrays': True}},
  { '$addFields': { 'coverages.date': '$coverages.date._date',
                    'dates': '$dates._date'}},
  { '$group': { '_cls': {'$first': '$_cls'},
        

#### 3.5.1 Extracton & Loading

In [22]:
rec_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)
ori_individuals = []
lookup_individuals = []

# clear all recognition records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

with tqdm(
    total=rec_collection.count_documents(rec_InitStage_Match[0]["$match"]),
    position=0,
    leave=True,
) as t:
    for individual in rec_collection.aggregate(rec_agg_pipeline, allowDiskUse=True,):
        # get revision number
        version_no = daao_manip.daao_get_revision_no(
            daao_meta.daao_db,
            daao_objs_info[curr_collname]["collection_name"],
            individual.get("_id"),
        )
        individual.update(version_no)
        # clean description text?
        # # store original record
        # ori_individuals.append(individual)
        lookup_indv = deepcopy(individual)
        ##########
        # look up all/the other DBRefs
        ##########
        lookup_indv = daao_manip.daao_get_dbrefInfo(
            lookup_indv, lookupProj_info, db=daao_meta.daao_db
        )
        # we don't need _id for data loading
        lookup_indv.pop("_id")
        # remove all attributes having empty values
        lookup_indv = gen_jp.clean_empty_values(lookup_indv)
        # # store final record
        # lookup_individuals.append(lookup_indv)
        # insert final record to acde_db
        daao_meta.acde_db[curr_lvl].insert_one(lookup_indv)
        t.update(1)

100%|█████████████████████████████████████████████████████████████████████████████| 5471/5471 [00:36<00:00, 149.94it/s]


### 3.6 Place

#### 3.6.0 Pipeline Setting

In [23]:
#####
# Set aggregation variables
#####

curr_collname = "place"
curr_lvl = "place"

#####
# Construct fields
#####

place_fields_newFields = {
    "ori_dbid": "$_id",
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "data_source": "DAAO",
}
place_fields_newFields.update(lookupProj_info[curr_collname]["complex_action"])

#####
# Construct Stages
#####

place_Pipeline = gen_manip.construct_FinalProjectionStage(
    keep_list=("url",),
    rename_pairs=lookupProj_info["place"]["rename"],
    remove_list=("_id",),
    complex_action_pairs=place_fields_newFields,
)

In [24]:
pp.pprint(place_Pipeline)

[ { '$project': { '_class': 'place',
                  '_class_ori': 'place',
                  'address': { 'city': { '$cond': { 'else': '$$REMOVE',
                                                    'if': { '$eq': [ { '$size': { '$ifNull': [ '$extracted_city_set_inferred',
                                                                                               [ ]]}},
                                                                     1]},
                                                    'then': { '$first': '$extracted_city_set_inferred'}}},
                               'country': { '$cond': { 'else': '$$REMOVE',
                                                       'if': { '$eq': [ { '$size': { '$ifNull': [ '$extracted_cty_set_inferred',
                                                                                                  [ ]]}},
                                                                        1]},
                                                    

#### 3.6.1 Extracton & Loading

In [25]:
place_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)
ori_individuals = []

# clear all place records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

with tqdm(total=place_collection.count_documents({}), position=0, leave=True,) as t:
    for individual in place_collection.aggregate(place_Pipeline, allowDiskUse=True,):
        # remove all attributes having empty values
        individual = gen_jp.clean_empty_values(individual)
        # # store original record
        # ori_individuals.append(individual)
        # insert final record to acde_db
        daao_meta.acde_db[curr_lvl].insert_one(individual)
        t.update(1)
        # if t.n == 100:
        #     break

100%|██████████████████████████████████████████████████████████████████████████| 16691/16691 [00:11<00:00, 1471.65it/s]


### 3.7 Resource (externalresource/asset)

#### 3.7.0 Pipeline Setting

In [26]:
#####
# Set aggregation variables
#####

curr_collname = "externalresource"
curr_lvl = "resource"

etlrsc_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)

#####
# Construct fields
#####

# set final rename fields, the original fields will be dropped
rscEtlRsc_fields_finalRenames = {
    "types": "type",
    "ori_dbid": "_id",
}
rscEtlRsc_fields_finalNewFields = {
    "authoring_info": {
        "authors": [{"name": "$author"}],
        "coverage": {"date": "$date._date", "place": "$place",},
    },
    "source_info": {"citation": "$citation", "url": "$url"},
    "description": {"$trim": {"input": "$content"}},
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "data_source": "DAAO",
}

rscEtlRsc_InitStage_Match = [{"$match": {"is_deleted": False}}]

rscEtlRsc_FinalStage_Proj = gen_manip.construct_FinalProjectionStage(
    keep_list=("note", "title"),
    rename_pairs=rscEtlRsc_fields_finalRenames,
    remove_list=("_id",),
    complex_action_pairs=rscEtlRsc_fields_finalNewFields,
)

rscEtlRsc_pipelines = rscEtlRsc_InitStage_Match + rscEtlRsc_FinalStage_Proj

pp.pprint(rscEtlRsc_pipelines)

[ {'$match': {'is_deleted': False}},
  { '$project': { '_class': 'resource',
                  '_class_ori': { '$toLower': { '$last': { '$split': [ '$_cls',
                                                                       '.']}}},
                  'authoring_info': { 'authors': [{'name': '$author'}],
                                      'coverage': { 'date': '$date._date',
                                                    'place': '$place'}},
                  'data_source': 'DAAO',
                  'description': {'$trim': {'input': '$content'}},
                  'note': 1,
                  'ori_dbid': '$_id',
                  'source_info': {'citation': '$citation', 'url': '$url'},
                  'title': 1,
                  'types': '$type'}},
  {'$project': {'_id': 0}}]


In [27]:
#####
# Set aggregation variables
#####

curr_collname = "asset"
curr_lvl = "resource"

asset_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)

#####
# Construct fields
#####

# set final rename fields, the original fields will be dropped
rscAsset_fields_finalRenames = {
    "ori_dbid": "_id",
}
rscAsset_fields_finalNewFields = {
    "title": "$caption",
    "contributors": ["$created_by"],
    "right_info": {
        "copyright": [{"owner": "$copytright_owner", "license": "$license"}],
    },
    "source_info": {
        "source_page": "$source_page",
        "url": "$url",
        "file_name": "filename",
    },
    "description": {"$trim": {"input": "$alt_text"}},
    "_class_ori": {"$toLower": {"$last": {"$split": ["$_cls", "."]}}},
    "_class": curr_lvl,
    "data_source": "DAAO",
}

rscAsset_FinalStage_Proj = gen_manip.construct_FinalProjectionStage(
    keep_list=("permission_obtained",),
    rename_pairs=rscAsset_fields_finalRenames,
    remove_list=("_id",),
    complex_action_pairs=rscAsset_fields_finalNewFields,
)

rscAsset_pipelines = rscAsset_FinalStage_Proj

pp.pprint(rscAsset_pipelines)

[ { '$project': { '_class': 'resource',
                  '_class_ori': { '$toLower': { '$last': { '$split': [ '$_cls',
                                                                       '.']}}},
                  'contributors': ['$created_by'],
                  'data_source': 'DAAO',
                  'description': {'$trim': {'input': '$alt_text'}},
                  'ori_dbid': '$_id',
                  'permission_obtained': 1,
                  'right_info': { 'copyright': [ { 'license': '$license',
                                                   'owner': '$copytright_owner'}]},
                  'source_info': { 'file_name': 'filename',
                                   'source_page': '$source_page',
                                   'url': '$url'},
                  'title': '$caption'}},
  {'$project': {'_id': 0}}]


#### 3.7.1 Extracton & Loading

In [28]:
# clear all resource records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

ori_individuals = []

for curr_collection, curr_pipelines, curr_filter in [
    (etlrsc_collection, rscEtlRsc_pipelines, rscEtlRsc_InitStage_Match[0]["$match"]),
    (asset_collection, rscAsset_pipelines, {}),
]:

    with tqdm(
        total=curr_collection.count_documents(curr_filter),
        position=0,
        leave=True,
        desc=curr_collection.name,
    ) as t:
        for individual in curr_collection.aggregate(curr_pipelines, allowDiskUse=True,):
            lookup_indv = deepcopy(individual)
            ##########
            # look up all/the other DBRefs
            ##########
            lookup_indv = daao_manip.daao_get_dbrefInfo(
                lookup_indv, lookupProj_info, db=daao_meta.daao_db
            )
            if lookup_indv.get("_id"):
                # we don't need _id for data loading
                lookup_indv.pop("_id")
            # remove all attributes having empty values
            lookup_indv = gen_jp.clean_empty_values(lookup_indv)
            # # store original record
            # ori_individuals.append(individual)
            # insert final record to acde_db
            daao_meta.acde_db[curr_lvl].insert_one(lookup_indv)
            t.update(1)
            # if t.n == 100:
            #     break

externalresource: 100%|█████████████████████████████████████████████████████████| 36663/36663 [00:40<00:00, 911.64it/s]
asset: 100%|█████████████████████████████████████████████████████████████████████| 4501/4501 [00:03<00:00, 1337.67it/s]


### 3.8 Relationship

#### 3.8.1 Load DAAO Relationships into ACDE Database

In [29]:
loading_objs = [obj for obj in acde_opr.Class_RelatedFN_mapping.keys()]

curr_dbid_mapping = acde_opr.acde_extRcd2dict(
    db=daao_meta.acde_db,
    data_source="DAAO",
    loading_objs=loading_objs,
    selected_fields=["_class_ori", "ori_dbid"],
)

person_fetching: 100%|███████████████████████████████████████████████████████| 17365/17365 [00:00<00:00, 100122.61it/s]
organization_fetching: 100%|███████████████████████████████████████████████████████| 933/933 [00:00<00:00, 9408.20it/s]
work_fetching: 100%|█████████████████████████████████████████████████████████| 23729/23729 [00:00<00:00, 115847.33it/s]
event_fetching: 100%|█████████████████████████████████████████████████████████| 21838/21838 [00:00<00:00, 57099.80it/s]
recognition_fetching: 100%|████████████████████████████████████████████████████| 5471/5471 [00:00<00:00, 109717.79it/s]
resource_fetching: 100%|██████████████████████████████████████████████████████| 41164/41164 [00:00<00:00, 92321.11it/s]
place_fetching: 100%|█████████████████████████████████████████████████████████| 16691/16691 [00:00<00:00, 58719.88it/s]


In [30]:
#####
# Set aggregation variables
#####

curr_collname = "relationship"
curr_lvl = "relationship"
relationship_collection = daao_meta.daao_db.get_collection(
    daao_objs_info[curr_collname]["collection_name"]
)

missing_objs = defaultdict(set)

# clear all relationship records from DAAO
daao_meta.acde_db[curr_lvl].delete_many({"data_source": "DAAO"})

with tqdm(
    total=relationship_collection.count_documents({}),
    position=0,
    leave=True,
    desc=curr_collname,
) as pbar:
    for relation_record in relationship_collection.aggregate(
        [
            {
                "$project": {
                    "_class": "relationship",
                    "data_source": "DAAO",
                    "ori_dbid": "$_id",
                    "subject": 1,
                    "object": 1,
                    "time": 1,
                    "predicate": {"term": "$predicate.term", "reverse_term": "$predicate.reverse_term"},
                    "relation_class": 1,
                    "_id": 0,
                }
            }
        ]
    ):
        is_inserted = True
        for e_type in ["subject", "object"]:
            if relation_record.get(e_type):
                et_cdbid = curr_dbid_mapping[relation_record[e_type]["_class"]].get(
                    (
                        relation_record[e_type]["_class_ori"],
                        relation_record[e_type]["ori_dbid"],
                    )
                )
                if et_cdbid is not None:
                    relation_record[e_type]["curr_dbid"] = et_cdbid
                else:
                    missing_objs[relation_record[e_type]["_class"]].add(
                        relation_record[e_type]["ori_dbid"]
                    )
                    is_inserted = False
                    # break
                # try:
                #     relation_record[e_type].update(
                #         daao_meta.acde_db[relation_record[e_type]["_class"]].find_one(
                #             {"ori_dbid": relation_record[e_type]["ori_dbid"]},
                #             {"curr_dbid": "$_id", "_id": 0},
                #         )
                #     )
                # except:
                #     missing_objs[relation_record[e_type]["_class"]].append(
                #         relation_record[e_type]["ori_dbid"]
                #     )
                #     is_inserted = False
                #     break
        if is_inserted:
            daao_meta.acde_db[curr_lvl].insert_one(relation_record)
        else:
            missing_objs["relationship"].add(relation_record["ori_dbid"])

        pbar.update(1)
        # if pbar.n > 100:
        #     break

relationship: 100%|██████████████████████████████████████████████████████████| 212505/212505 [02:07<00:00, 1663.62it/s]


In [31]:
for obj_type, obj_ids in missing_objs.items():
    if obj_type not in ("relationship", "resource", "organization"):
        total_doc_num = daao_meta.daao_db[
            daao_meta.level_filters[obj_type][0]
        ].count_documents({"_cls": daao_meta.level_filters[obj_type][1]})
    elif obj_type == "relationship":
        total_doc_num = daao_meta.daao_db[obj_type].count_documents({})
    elif obj_type == "organization":
        total_doc_num = daao_meta.daao_db["xparty"].count_documents(
            {"_cls": "VersionedDocument.XParty.PersonGroup"}
        )
    else:
        total_doc_num = "unknown number of"
    missing_doc_num = len(obj_ids)
    print("#" * 20)
    print(
        f"""
{obj_type} was found {total_doc_num} records.
It has missing/invalid {missing_doc_num} records according to relationships.
        """
    )

####################

person was found 30693 records.
It has missing/invalid 12076 records according to relationships.
        
####################

relationship was found 212505 records.
It has missing/invalid 42971 records according to relationships.
        
####################

resource was found unknown number of records.
It has missing/invalid 2 records according to relationships.
        
####################

organization was found 4660 records.
It has missing/invalid 4404 records according to relationships.
        
####################

work was found 23780 records.
It has missing/invalid 10 records according to relationships.
        
####################

collection was found 11747 records.
It has missing/invalid 3013 records according to relationships.
        
####################

event was found 21906 records.
It has missing/invalid 13 records according to relationships.
        
####################

eventGroup was found 94 records.
It has missing/invalid 89 records 

#### 3.8.2 Add All Related Objects to Root Objects

In [32]:
#####
# Clean the existing related_XXXX fields from 'DAAO'
#####

gen_manip.mdb_remove_fields(
    daao_meta.acde_db,
    "relationship",
    {"data_source": "DAAO",},
    remove_fields=list(acde_opr.Class_RelatedFN_mapping.values()),
)

In [33]:
#####
# Update the original records with references
# to the relationships of their related records using DBRefs.
#####
acde_opr.acde_update_related_DBRef(data_source="DAAO", db=daao_meta.acde_db)

Extracting related objects from relationship collection: 100%|██████████████| 169534/169534 [00:06<00:00, 25978.63it/s]
person_related_objects_update: 100%|████████████████████████████████████████████| 16767/16767 [00:17<00:00, 978.18it/s]
resource_related_objects_update: 100%|█████████████████████████████████████████| 39400/39400 [00:32<00:00, 1219.39it/s]
work_related_objects_update: 100%|█████████████████████████████████████████████| 23088/23088 [00:20<00:00, 1147.71it/s]
event_related_objects_update: 100%|████████████████████████████████████████████| 21337/21337 [00:21<00:00, 1004.55it/s]
recognition_related_objects_update: 100%|████████████████████████████████████████| 5314/5314 [00:04<00:00, 1181.80it/s]
place_related_objects_update: 100%|████████████████████████████████████████████| 15352/15352 [00:14<00:00, 1050.23it/s]


The DBrefs of the relationships have been successfully updated to `related_XXX` fields!


In [34]:
loading_colls = set(coll for coll in acde_opr.Class_RelatedFN_mapping.keys())
proj_cond = {
    "predicate": 1,
    "subject": 1,
    "object": 1,
    "_id": 0,
    "relationship_dbid": "$_id",
    "relation_class": 1,
    "data_source": 1,
}

data_source = "DAAO"

#####
# Update the original records having relationship DBRefs with lookup documents.
#####

# # Method 1
# acde_opr.acde_add_related_objects(
#     daao_meta.acde_db, data_source="DAAO", loading_objs=loading_objs
# )

# # Method 2
acde_opr.acde_update_related_fields(
    data_source=data_source,
    proj_cond=proj_cond,
    loading_colls=loading_colls,
    db=daao_meta.acde_db,
)

event_related_places: 100%|████████████████████████████████████████████████████| 20139/20139 [00:18<00:00, 1091.97it/s]
event_related_people: 100%|█████████████████████████████████████████████████████| 20588/20588 [00:21<00:00, 970.60it/s]
event_related_events: 100%|█████████████████████████████████████████████████████████| 858/858 [00:00<00:00, 871.78it/s]


event_related_organizations doesn't have any records.


event_related_works: 100%|██████████████████████████████████████████████████████████| 349/349 [00:00<00:00, 615.61it/s]
event_related_resources: 100%|██████████████████████████████████████████████████████| 265/265 [00:00<00:00, 835.63it/s]


event_related_recognitions doesn't have any records.


recognition_related_places: 100%|█████████████████████████████████████████████████████| 21/21 [00:00<00:00, 800.61it/s]
recognition_related_people: 100%|████████████████████████████████████████████████| 5299/5299 [00:04<00:00, 1092.78it/s]


recognition_related_events doesn't have any records.
recognition_related_organizations doesn't have any records.


recognition_related_works: 100%|█████████████████████████████████████████████████████| 27/27 [00:00<00:00, 1289.38it/s]
recognition_related_resources: 100%|████████████████████████████████████████████████████| 5/5 [00:00<00:00, 715.75it/s]


recognition_related_recognitions doesn't have any records.
organization_related_places doesn't have any records.
organization_related_people doesn't have any records.
organization_related_events doesn't have any records.
organization_related_organizations doesn't have any records.
organization_related_works doesn't have any records.
organization_related_resources doesn't have any records.
organization_related_recognitions doesn't have any records.
place_related_places doesn't have any records.


place_related_people: 100%|███████████████████████████████████████████████████████| 9165/9165 [00:09<00:00, 961.44it/s]
place_related_events: 100%|███████████████████████████████████████████████████████| 6694/6694 [00:07<00:00, 879.75it/s]


place_related_organizations doesn't have any records.


place_related_works: 100%|█████████████████████████████████████████████████████████████| 7/7 [00:00<00:00, 1175.82it/s]
place_related_resources: 100%|██████████████████████████████████████████████████████| 121/121 [00:00<00:00, 460.47it/s]
place_related_recognitions: 100%|█████████████████████████████████████████████████████| 21/21 [00:00<00:00, 684.10it/s]
person_related_places: 100%|██████████████████████████████████████████████████████| 9940/9940 [00:10<00:00, 932.56it/s]
person_related_people: 100%|██████████████████████████████████████████████████████| 6729/6729 [00:08<00:00, 760.72it/s]
person_related_events: 100%|██████████████████████████████████████████████████████| 9600/9600 [00:13<00:00, 727.19it/s]


person_related_organizations doesn't have any records.


person_related_works: 100%|███████████████████████████████████████████████████████| 6202/6202 [00:08<00:00, 773.13it/s]
person_related_resources: 100%|█████████████████████████████████████████████████| 10151/10151 [00:11<00:00, 857.15it/s]
person_related_recognitions: 100%|████████████████████████████████████████████████| 1999/1999 [00:03<00:00, 519.20it/s]
work_related_places: 100%|████████████████████████████████████████████████████████████| 11/11 [00:00<00:00, 919.16it/s]
work_related_people: 100%|██████████████████████████████████████████████████████| 22141/22141 [00:24<00:00, 912.07it/s]
work_related_events: 100%|███████████████████████████████████████████████████████| 2291/2291 [00:02<00:00, 1058.23it/s]


work_related_organizations doesn't have any records.
work_related_works doesn't have any records.


work_related_resources: 100%|████████████████████████████████████████████████████| 3024/3024 [00:02<00:00, 1222.89it/s]
work_related_recognitions: 100%|█████████████████████████████████████████████████████| 24/24 [00:00<00:00, 1093.86it/s]
resource_related_places: 100%|███████████████████████████████████████████████████| 2138/2138 [00:01<00:00, 1315.94it/s]
resource_related_people: 100%|█████████████████████████████████████████████████| 34203/34203 [00:27<00:00, 1260.85it/s]
resource_related_events: 100%|█████████████████████████████████████████████████████| 277/277 [00:00<00:00, 1263.92it/s]


resource_related_organizations doesn't have any records.


resource_related_works: 100%|████████████████████████████████████████████████████| 4031/4031 [00:03<00:00, 1181.33it/s]


resource_related_resources doesn't have any records.


resource_related_recognitions: 100%|████████████████████████████████████████████████████| 5/5 [00:00<00:00, 835.29it/s]

The lookup documents of DBRefs have been successfully updated to `related_XXX` fields!





## Learning Notes

0. [DAAO folder sharing](https://cloudstor.aarnet.edu.au/plus/s/SAZARhWR6QAMhIr)

1. [MongoEngine _types and _cls fields](https://stackoverflow.com/questions/13824569/mongoengine-types-and-cls-fields)

2. [MongoDB: Using aggregation pipeline to extract DBref using $lookup operator](https://dev.to/saurabh73/mongodb-using-aggregation-pipeline-to-extract-dbref-using-lookup-operator-4ekl)

3. [removing null values from a dictionary](https://stephenweiss.dev/python-remove-none-from-dictionary/)

4. [How to remove all empty fields in a nested dict?](https://stackoverflow.com/questions/27973988/how-to-remove-all-empty-fields-in-a-nested-dict)

5. https://stackoverflow.com/questions/36019713/mongodb-nested-lookup-with-3-levels

6. https://stackoverflow.com/questions/7811163/query-for-documents-where-array-size-is-greater-than-1

7. https://blog.csdn.net/wyg_031113/article/details/107040405

8. https://stackoverflow.com/questions/34055609/how-to-show-data-from-2-collections-in-mongodb-with-dbref

9. https://stackoverflow.com/questions/7061339/how-to-convert-u00e9-into-a-utf8-char-in-mysql-or-php

10. [Python encoding and json dumps](https://stackoverflow.com/questions/35582528/python-encoding-and-json-dumps)

11. [Mongodb group and push with empty arrays](https://stackoverflow.com/questions/56312636/mongodb-group-and-push-with-empty-arrays)

12. [Elegant way to check if a nested key exists in a dict?](https://stackoverflow.com/questions/43491287/elegant-way-to-check-if-a-nested-key-exists-in-a-dict)

13. [Safe method to get value of nested dictionary](https://stackoverflow.com/questions/25833613/safe-method-to-get-value-of-nested-dictionary)

14. [Convert field from list of objectids to list of string](https://stackoverflow.com/questions/67902748/convert-field-from-list-of-objectids-to-list-of-string)

15. [How to convert array of objects into nested object in MongoDB aggregation pipeline](https://stackoverflow.com/questions/69684136/how-to-convert-array-of-objects-into-nested-object-in-mongodb-aggregation-pipeli)

16. [Return actual type of a field in MongoDB](https://stackoverflow.com/questions/3208538/return-actual-type-of-a-field-in-mongodb)
    * [Get names of all keys in the collection](https://stackoverflow.com/questions/2298870/get-names-of-all-keys-in-the-collection)