To begin we start with importing the necessary modules.  

In [1]:
import sys
from arcgis.gis import GIS
from arcgis.graph import KnowledgeGraph
import pandas as pd
import numpy as np
import csv
import gzip
from simpledbf import Dbf5

gis = GIS("https://arcgis.edc.renci.org/portal",'dariusmb',password='dmbo9889')

PyTables is not installed. No support for HDF output.
SQLalchemy is not installed. No support for SQL output.


In [2]:
result = gis.content.create_service(
    name="Update",
    capabilities="Query,Editing,Create,Update,Delete",
    service_type="KnowledgeGraph",
)
#kg = KnowledgeGraph("https://arcgis.edc.renci.org/server/rest/services/Hosted/2_6_KG/KnowledgeGraphServer",gis = gis)

In [3]:
kg = KnowledgeGraph("https://arcgis.edc.renci.org/portal/rest/services/Hosted/Update/KnowledgeGraphServer", gis=gis)

This is a logic test to see that your KG is made and empty. Should only have Document and HasDocument

In [4]:
for types in kg.datamodel['entity_types']:
    print(types)
for types in kg.datamodel['relationship_types']:
    print(types)

Document
HasDocument


In [5]:
def read_csv_headers(file_path):
    if file_path.endswith('.gz'):
        with gzip.open(file_path, 'rt') as csvfile:
            reader = csv.reader(csvfile)
            headers = next(reader)  # Read the first row (headers)
    else:
        with open(file_path, 'r') as csvfile:
            reader = csv.reader(csvfile)
            headers = next(reader)  # Read the first row (headers)
    # Read the first few rows to infer data types using pandas
    sample_data = pd.read_csv(file_path, nrows=5)
    dtypes = sample_data.dtypes.tolist()

    return ['label'] + headers, dtypes

def build_entity_types(entity_properties):
    entity_types = []
    
    for entity, properties in entity_properties.items():
        entity_dict = {
            "name": entity,
            "alias": entity,
            "role": "esriGraphNamedObjectRegular",
            "strict": False,
            "properties": {}
        }
        
        for prop in properties:
            prop_dict = {
                "name": prop,
                "alias": prop,
                "fieldType": "esriFieldTypeInteger",
                "editable": True,
                "visible": True,
                "required": False,
                "IsSystemMaintained": False,
                "role": "esriGraphPropertyRegular"
            }
            
            entity_dict["properties"][prop] = prop_dict
        
        entity_types.append(entity_dict)
    
    return entity_types


def build_props(name, fieldType):
    prop_types = []
    prop_dict = {
        "name": name,
        "alias": name,
        "fieldType": fieldType,
        "editable": True,
        "visible": True,
        "required": False,
        "IsSystemMaintained": False,
        "role": "esriGraphPropertyRegular"
    }
    prop_types.append(prop_dict)
    return prop_types

def build_relationship_types(relationships):
    relationship_types = []
    
    for relate in relationships:
        relate_dict = {
            "name": relate,
            "alias": relate,
            "role": "esriGraphNamedObjectRegular",
            "strict": False

        }
        relationship_types.append(relate_dict)
    
    return relationship_types

def build_spatial_props(geo_type):
    space_types = []
    space_dict = {
        "name": "shape",
        "alias": "shape",
        "fieldType": "esriFieldTypeGeometry",
        "geometryType": geo_type,
        "hasZ": False,
        "hasM": False,
        "editable": True,
        "visible": True,
        "required": False,
        "IsSystemMaintained": False,
        "role": "esriGraphPropertyRegular"
    }
    space_types.append(space_dict)
    return space_types

# Function to add properties to the knowledge graph
def add_properties(type_name, properties):
    for prop_name, field_type in properties:
        prop_types = build_props(prop_name, field_type)
        kg.graph_property_adds(type_name=type_name, graph_properties=prop_types)


def convert_columns_to_string(dataframe, columns):
    for column in columns:
        dataframe.loc[:,column] = dataframe[column].astype(str)

def handle_nan_values(value, default="Unknown"):
    return value if not pd.isna(value) else default

def convert_values_to_string(*values):
    return [handle_nan_values(value) for value in values]

def add_entities_in_batches(entity_dicts, batch_size=100):
    """
    Add entities to the knowledge graph in batches.

    Parameters:
    - entity_dicts: List of entity dictionaries to be added to the knowledge graph.
    - batch_size: Size of each batch for adding entities. Default is 100.

    Returns:
    - List of results for each batch addition.
    """
    results = []
    return_dict = []
    num_batches = (len(entity_dicts) + batch_size - 1) // batch_size
    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(entity_dicts))
        batch = entity_dicts[start_idx:end_idx]
        
        #return_dict = []
        for j, entity_dict in enumerate(batch):
            entity_dict["_index"] = start_idx + j  # Assign index based on position in batch
            return_dict.append(entity_dict)

        result = kg.apply_edits(adds=batch)
        results.append(result)
    
    return results, return_dict


def find_entity_from_result(results, index, entity_type, batch_size=50000):
    # Calculate the batch index and index within the batch
    batch_index = index // batch_size
    index_within_batch = index % batch_size
    
    uuid = results_h[batch_index]["editsResult"][entity_type]["addResults"][index_within_batch]["id"]
    return uuid

def search_edits_h_by_hh_id(edits, target_id, type_id):
    """
    Search for a household entity in edits_h based on the hh_id.

    Parameters:
    - edits_h: List of household entities.
    - target_hh_id: The hh_id to search for.

    Returns:
    - Index of the matching household entity.
    - Matching household entity.
    """
    for index, entity in enumerate(edits):
        hh_id_h = entity['_properties'][type_id]
        if hh_id_h == target_id:
            return index, entity

    return None, None

def add_relates_in_batch(relate_dict, batch_size):
    relate = []
    num_batches = (len(relate_dict) + batch_size - 1) // batch_size
    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(relate_dict))
        batch = relate_dict[start_idx:end_idx]
        
        result = kg.apply_edits(adds=batch)
        relate.append(result)
        
    return relate

def create_index_dict(edits, type_id):
    """
    Create a dictionary for fast lookup of household entities by hh_id.

    Parameters:
    - edits_h: List of household entities.

    Returns:
    - Dictionary where keys are hh_id values and values are lists of indices.
    """
    index_dict = {}
    for index, entity in enumerate(edits):
        hh_id_h = entity['_properties'][type_id]
        if hh_id_h not in index_dict:
            index_dict[hh_id_h] = []
        index_dict[hh_id_h].append(index)
    return index_dict

def get_field_type(dtype):
    """
    Map pandas dtype to esriFieldType.
    """
    if dtype == 'int64':
        return "esriFieldTypeInteger"
    elif dtype == 'float64':
        return "esriFieldTypeDouble"
    elif dtype == 'object':
        return "esriFieldTypeString"
    # Add more mappings as needed for other data types
    else:
        return "esriFieldTypeString"  # Default to string if unknown dtype

def check_column_name(column_name):
    """
    Replace spaces in a column name with underscores.

    Parameters:
    - column_name: String representing the column name.

    Returns:
    - String with spaces replaced by underscores.
    """
    return column_name.replace(" ", "_").replace("-", "_")

def add_properties_batch(entity_name, df):
    ent_prop = [(check_column_name(column), get_field_type(dtype)) for column, dtype in zip(df.columns, df.dtypes)]
    add_properties(entity_name, ent_prop)

def clean_column_names(df):
    """
    Clean column names of a DataFrame by replacing spaces with underscores.

    Parameters:
    - df: DataFrame whose column names need to be cleaned.

    Returns:
    - DataFrame with cleaned column names.
    """
    cleaned_columns = [check_column_name(column) for column in df.columns]
    df.columns = cleaned_columns
    return df

In [6]:
entity_properties = {
    "Household": ["label", "logrecno", "hh_age", "hh_income", "hh_race", "ethnicity",
                  "size", "state_fips", "county_fips", "tract_fips", "blkgrp_fips",
                  "puma_fips","evelation"],
    "Person": ["label", "p_id", "sporder", "relshipp", "rac1p", "agep",
               "sex", "hisp"],
    "Workplace": ["label"],
    "Gages": ['label','DOT_Div'],
    "Matthew_Flood_Impact": ["label","index","object_id"],
    "Florence_Flood_Impact": ["label","index","object_id"],
    "Pub_School": ["label","fips"],
    "Priv_School": ["label","COUNTY_FIPS","fips"]
}

entity_types = build_entity_types(entity_properties)

relationships = {
    "LivesIn", "Attends","WorksAt","Holds", "EvacPath", "Within"
}
relate_types = build_relationship_types(relationships)

res = kg.named_object_type_adds(entity_types, relate_types)
print(res)

{'entityAddResults': [{'name': 'Household'}, {'name': 'Person'}, {'name': 'Workplace'}, {'name': 'Gages'}, {'name': 'Matthew_Flood_Impact'}, {'name': 'Priv_School'}, {'name': 'Pub_School'}, {'name': 'Florence_Flood_Impact'}], 'relationshipAddResults': [{'name': 'LivesIn'}, {'name': 'Holds'}, {'name': 'Within'}, {'name': 'EvacPath'}, {'name': 'WorksAt'}, {'name': 'Attends'}]}


In [7]:
#esriGeometryTypePoint
#esriGeometryTypePolyline
#esriGeometryTypePolygon
#esriGeometryTypeMultipoint
spatial_point = build_spatial_props("esriGeometryPoint")

spatial_point
kg.graph_property_adds(type_name='Household', graph_properties=spatial_point)
kg.graph_property_adds(type_name='Workplace', graph_properties=spatial_point)
kg.graph_property_adds(type_name='Gages', graph_properties=spatial_point)
kg.graph_property_adds(type_name='Pub_School', graph_properties=spatial_point)
kg.graph_property_adds(type_name='Priv_School', graph_properties=spatial_point)

{'propertyAddResults': [{'name': 'shape'}]}

Below is a WIP for adding polygon shapes into knowledge graph

In [8]:
#spatial_poly = build_spatial_props("esriGeometryTypePolygon")

#kg.graph_property_adds(type_name='Matthew_Flood_Impact', graph_properties=spatial_poly)
#kg.graph_property_adds(type_name='Florence_Flood_Impact', graph_properties=spatial_poly)

In [9]:
# Define a list of properties for each type
person_properties = [
    ("hh_id", "esriFieldTypeString"),
    ("wp_id", "esriFieldTypeString"),
    ("job_income_bucket", "esriFieldTypeString"),
    ("school_id", "esriFieldTypeString"),
    ("school_type", "esriFieldTypeString"),
    ("grade_lvl", "esriFieldTypeString"),
    ("serlino", "esriFieldTypeString"),
]

workplace_properties = [
    ("x", "esriFieldTypeDouble"),
    ("y", "esriFieldTypeDouble"),
    ("wp_id", "esriFieldTypeString"),
    ("NAICS", "esriFieldTypeString"),
    ("blkgrp_fips_wp", "esriFieldTypeDouble"),
    ("elevation", "esriFieldTypeDouble"),
]

household_properties = [
    ("x", "esriFieldTypeDouble"),
    ("y", "esriFieldTypeDouble"),
    ("hh_id", "esriFieldTypeString"),
    ("serlino", "esriFieldTypeString"),
]

gage_properties = [
    ("name", "esriFieldTypeString"),
    ("matthew_peak", "esriFieldTypeDouble"),
    ("florence_peak", "esriFieldTypeDouble"),
    ("site_id", "esriFieldTypeString")
]

matt_properties = [
    ('SITE_ID', 'esriFieldTypeString'),
    ('USER_FLAG', 'esriFieldTypeDouble'),
    ('Shape_Leng', 'esriFieldTypeDouble'),
    ('Shape_Area', 'esriFieldTypeDouble'),
    ('LEVEL_ID', 'esriFieldTypeString'),
    ('JOIN_ID', 'esriFieldTypeString'),
    ('OBJECTID', 'esriFieldTypeInteger')
]

flor_properties = [
    ('SITE_ID', 'esriFieldTypeString'),
    ('USER_FLAG', 'esriFieldTypeDouble'),
    ('Shape_Leng', 'esriFieldTypeDouble'),
    ('Shape_Area', 'esriFieldTypeDouble'),
    ('LEVEL_ID', 'esriFieldTypeString'),
    ('JOIN_ID', 'esriFieldTypeString'),
    ('OBJECTID', 'esriFieldTypeInteger'),
]

pub_properties = [
    ('NCESSCH','esriFieldTypeString'),
    ('NAME', 'esriFieldTypeString'),
    ('STREET', 'esriFieldTypeString'),
    ('CITY', 'esriFieldTypeString'),
    ('STATE', 'esriFieldTypeString'),
    ('ZIP', 'esriFieldTypeDouble'),
    ('STFIP', 'esriFieldTypeDouble'),
    ('CNTY', 'esriFieldTypeDouble'),
]

priv_properties = [
    ('SCHOOL_NAME', 'esriFieldTypeString'),
    ('CITY', 'esriFieldTypeString'),
    ('STATE', 'esriFieldTypeString'),
    ('COUNTY_FIPS', 'esriFieldTypeInteger'),
    ('COUNTY_NAME', 'esriFieldTypeString'),
    ('PPIN', 'esriFieldTypeString'),
]
# Add properties for 'Person'
add_properties("Person", person_properties)
# Add properties for 'Workplace' and 'Household'
add_properties("Workplace", workplace_properties)
add_properties("Household", household_properties)
add_properties("Gages", gage_properties)
add_properties("Florence_Flood_Impact", flor_properties)
add_properties("Matthew_Flood_Impact", matt_properties)
add_properties("Pub_School", pub_properties)
add_properties("Priv_School", priv_properties)

In [10]:
df_hh = pd.read_csv("2019_ver1_37/37/NC2019_Households.csv")
df_Person = pd.read_csv("2019_ver1_37/37/NC2019_Persons.csv.gz")
df_Work = pd.read_csv("2019_ver1_37/37/NC2019_Workplaces.csv.gz")
df_pub = pd.read_csv("2019_ver1_37/37/NC_schools_pub.csv.gz")
df_priv= pd.read_csv("2019_ver1_37/37/NC_schools_priv.csv.gz")

# Replace 'your_file.dbf' with the actual path to your .dbf file
dbf_path = 'coastalGages.dbf'
# Use simpledbf to read the .dbf file
dbf = Dbf5(dbf_path)
# Convert the .dbf file to a DataFrame
gages = dbf.to_dataframe()
matt_sdf = pd.DataFrame.spatial.from_featureclass('matthew.shp')
flor_sdf = pd.DataFrame.spatial.from_featureclass('florence.shp')

In [11]:
county_list = [37129, 37065, 37019, 37155, 37139]
df_hh_subset = df_hh[(df_hh['county_fips'].isin(county_list))]
hh_subset_id_list = df_hh_subset['hh_id'].unique()

In [12]:
columns_to_convert = ['serialno']
convert_columns_to_string(df_hh_subset, columns_to_convert)

In [13]:
df_person_subset = df_Person[(df_Person['hh_id'].isin(hh_subset_id_list))]
workplace_id_list = df_Person['workplace_id'].unique()
columns_to_convert = ['school_id', 'school_type', 'grade_level', 'job_income_bucket', 'serialno']
convert_columns_to_string(df_person_subset, columns_to_convert)

In [14]:
df_work_subset = df_Work[df_Work['workplace_id'].isin(workplace_id_list)]
columns_to_convert = ['workplace_id']
convert_columns_to_string(df_work_subset, columns_to_convert)

In [15]:
df_priv = df_priv[(df_priv['COUNTY_FIPS'].isin([129, 65, 19, 155, 139]))]
df_pub = df_pub[(df_pub['CNTY'].isin([37129, 37065, 37019, 37155, 37139]))]

In [16]:
columns_to_convert=['NCESSCH']
convert_columns_to_string(df_pub, columns_to_convert)

In [17]:
df_pub['NCESSCH']

179     370004002349
241     370010202506
247     370010802534
291     370014102927
358     370032303256
            ...     
2018    370393002243
2019    370393002244
2020    370393002245
2021    370393002246
2022    370393002247
Name: NCESSCH, Length: 139, dtype: object

In [18]:
# Test individual df ingest
# Initialize lists to store entity dictionaries
edits_p = []
edits_h = []
edits_w = []

# Iterate over df_Person_subset to build entity dictionaries for persons
for per in df_person_subset.itertuples():
    # Construct person_edit dictionary
    school_id, school_type, grade_lvl, income, serialno, workplace_id = convert_values_to_string(
    per.school_id, per.school_type, per.grade_level, per.job_income_bucket, per.serialno, per.workplace_id)
    person_edit = {
            "_objectType": "entity",
            "_typeName": "Person",
            "_properties": {
                "hh_id": per.hh_id,
                "sex": per.sex,
                "agep": per.agep,
                "rac1p": per.rac1p,
                "wp_id": workplace_id,
                "hisp": per.hisp,
                "school_id": school_id,
                "school_type": school_type,
                "grade_lvl": grade_lvl,
                "job_income_bucket": income,
                "relshipp": per.relshipp,
                "sporder": per.sporder,
                "serlino": serialno,
                "p_id": per.person_id_numeric
            }
    }

    # Add person_edit dictionary to edits_ph list
    edits_p.append(person_edit)

# Now, you have populated edits_ph list with entity dictionaries for persons.

# Iterate over df_Work_subset to build entity dictionaries for workplaces
for work in df_work_subset.itertuples():
    workplace_id = convert_values_to_string(work.workplace_id)
    # Construct workplace_edit dictionary
    workplace_edit = {
                "_objectType": "entity",
                "_typeName": "Workplace",
                "_properties": {
                    "NAICS": work.NAICS,
                    "blkgrp_fips_wp": work.blkgrp_fips_workplace,
                    "elevation": work.elevation,
                    "wp_id": work.workplace_id,
                    "shape": {
                        'x': float(work.lon_workplace),
                        'y': float(work.lat_workplace),
                        '_objectType': 'geometry'
                    }
                }
    }
    edits_w.append(workplace_edit)

# Now, you have populated edits_phw list with entity dictionaries for workplaces.

# Iterate over df_hh_subset to build entity dictionaries for households
for hh in df_hh_subset.itertuples():
    serialno = convert_values_to_string(hh.serialno)
    # Construct house_edit dictionary
    house_edit = {
            "_objectType": "entity",
            "_typeName": "Household",
            "_properties": {
                "hh_age": hh.hh_age,
                "size": hh.size,
                "hh_race": hh.hh_race,
                "ethnicity": hh.ethnicity,
                "hh_income": hh.hh_income, 
                "state_fips": hh.state_fips,
                "county_fips": hh.county_fips,
                "tract_fips": hh.tract_fips,
                "blkgrp_fips": hh.blkgrp_fips,
                "evelation": hh.elevation,
                "serlino": hh.serialno,
                "hh_id": hh.hh_id,
                "shape": {
                    'x': float(hh.LON),
                    'y': float(hh.LAT),
                    '_objectType': 'geometry'
                }
            }
    }
    edits_h.append(house_edit)


In [19]:
edits_pub = []
for pub in df_pub.itertuples():
    ncessch = convert_values_to_string(pub.NCESSCH)
    pub_edit = {
        "_objectType": "entity",
        "_typeName": "Pub_School",
        "_properties":{
            "CNTY": pub.CNTY,
            "STFIP": pub.STFIP,
            "ZIP": pub.ZIP,
            "STATE": pub.STATE,
            "CITY": pub.CITY,
            "STREET": pub.STREET,
            "NAME": pub.NAME,
            "fips": pub.fips,
            "NCESSCH": pub.NCESSCH,
            "shape":{
                'x': float(pub.LON),
                'y': float(pub.LAT),
                '_objectType': 'geometry'
            }
        }
    }
    edits_pub.append(pub_edit)

In [20]:
edits_priv= []
for priv in df_priv.itertuples():
    priv_edit = {
        "_objectType": "entity",
        "_typeName": "Priv_School",
        "_properties":{
            "COUNTY_NAME": priv.COUNTY_NAME,
            "COUNTY_FIPS": priv.COUNTY_FIPS,
            "fips": priv.fips,
            "STATE": priv.STATE,
            "CITY": priv.CITY,
            "SCHOOL_NAME": priv.SCHOOL_NAME,
            "PPIN": priv.PPIN,
            "shape":{
                'x': float(pub.LON),
                'y': float(pub.LAT),
                '_objectType': 'geometry'
            }
        }
    }
    edits_priv.append(priv_edit)

In [21]:
edits_g = []
for gage in gages.itertuples():
    #print(gage)
    gage_edit = {
        "_objectType": "entity",
        "_typeName": "Gages",
        "_properties":{
            'DOT_Div': gage.DOT_Divisi,
            'matthew_peak': gage.MATTHEW_PE,
            'florence_peak': gage.FLORENCE_P,
            'name': gage.NAME,
            'site_id': gage.SITE_ID,
            'shape':{
                'x': float(gage.LONGITUDE),
                'y': float(gage.LATITUDE),
                '_objectType': 'geometry'
            }
        }
    }
    edits_g.append(gage_edit)
result_g = kg.apply_edits(adds=edits_g)

In [22]:
import time
start_time = time.time()
results_h, edits_h_id = add_entities_in_batches(edits_h, 50000)
end_time = time.time()
# Calculate and print the elapsed time
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 87.81038975715637 seconds


In [23]:
start_time = time.time()
results_p, edits_p_id = add_entities_in_batches(edits_p, 50000)
end_time = time.time()
# Calculate and print the elapsed time
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 206.46732091903687 seconds


In [24]:
start_time = time.time()
results_w, edits_w_id = add_entities_in_batches(edits_w, 50000)
end_time = time.time()
# Calculate and print the elapsed time
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 113.09378552436829 seconds


In [25]:
start_time = time.time()
results_pub, edits_pub_id = add_entities_in_batches(edits_pub, 10000)
end_time = time.time()
# Calculate and print the elapsed time
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 0.34399938583374023 seconds


In [26]:
start_time = time.time()
results_priv, edits_priv_id = add_entities_in_batches(edits_priv, 10000)
end_time = time.time()
# Calculate and print the elapsed time
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 0.11199259757995605 seconds


In [27]:
start_time = time.time()
# Create index dictionary for edits_h_id
index_dict_h = create_index_dict(edits_h_id, "hh_id")

# Process person entities in batches
batch_size = 50000
person_uuids = []
relates_h = []
start_idx = 0
while start_idx < len(edits_p_id):
    end_idx = min(start_idx + batch_size, len(edits_p_id))
    batch_person_entities = edits_p_id[start_idx:end_idx]

    for person_entity in batch_person_entities:
        hh_id = person_entity['_properties']['hh_id']
        person_index = person_entity['_index']

        # Search for hh_id in index dictionary
        matching_indices = index_dict_h.get(hh_id, [])
        for matching_index in matching_indices:
            # Get UUIDs for household and person entities
            household_uuid = results_h[matching_index // 50000]['editsResult']['Household']['addResults'][matching_index % 50000]['id']
            person_uuid = results_p[person_index // 50000]['editsResult']['Person']['addResults'][person_index % 50000]['id']
            
            # Create relationship
            relationship = {
                "_objectType": "relationship",
                "_typeName": "LivesIn",
                "_originEntityId": person_uuid,
                "_destinationEntityId": household_uuid,
                "_properties": {}
            }
            relates_h.append(relationship)

    start_idx += batch_size
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

print("Adding to knowledge graph")
start_time = time.time()
p_h_relates = add_relates_in_batch(relates_h, 50000)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

start_time = time.time()
# Create index dictionary for edits_w_id
index_dict_w = create_index_dict(edits_w_id, "wp_id")

# Process person entities in batches
batch_size = 50000
person_uuids = []
relates_w = []
start_idx = 0
while start_idx < len(edits_p_id):
    end_idx = min(start_idx + batch_size, len(edits_p_id))
    batch_person_entities = edits_p_id[start_idx:end_idx]

    for person_entity in batch_person_entities:
        wp_id = person_entity['_properties']['wp_id']
        person_index = person_entity['_index']

        # Search for hh_id in index dictionary
        matching_indices = index_dict_w.get(wp_id, [])
        for matching_index in matching_indices:
            # Get UUIDs for household and person entities
            workplace_uuid = results_w[matching_index // 50000]['editsResult']['Workplace']['addResults'][matching_index % 50000]['id']
            person_uuid = results_p[person_index // 50000]['editsResult']['Person']['addResults'][person_index % 50000]['id']
            
            # Create relationship
            relationship = {
                "_objectType": "relationship",
                "_typeName": "WorksAt",
                "_originEntityId": person_uuid,
                "_destinationEntityId": workplace_uuid,
                "_properties": {}
            }
            relates_w.append(relationship)

    start_idx += batch_size
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

print("Adding to knowledge graph")
start_time = time.time()
p_w_relates = add_relates_in_batch(relates_w, 50000)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 3.598994731903076 seconds
Adding to knowledge graph
Elapsed time: 357.2031171321869 seconds
Elapsed time: 1.6330063343048096 seconds
Adding to knowledge graph
Elapsed time: 142.0149028301239 seconds


In [28]:
start_time = time.time()
# Create index dictionary for edits_w_id
index_dict_pub = create_index_dict(edits_pub_id, "NCESSCH")

# Process person entities in batches
batch_size = 50000
person_uuids = []
relates_pub = []
start_idx = 0
while start_idx < len(edits_p_id):
    end_idx = min(start_idx + batch_size, len(edits_p_id))
    batch_person_entities = edits_p_id[start_idx:end_idx]

    for person_entity in batch_person_entities:
        pub_id = person_entity['_properties']['school_id']
        person_index = person_entity['_index']
        #print(pub_id, person_index)
        if pub_id == 'nan':
            #print(pub_id)
            continue

        # Search for hh_id in index dictionary
        matching_indices = index_dict_pub.get(pub_id, [])
        for matching_index in matching_indices:
            # Get UUIDs for household and person entities
            pub_uuid = results_pub[matching_index // 50000]['editsResult']['Pub_School']['addResults'][matching_index % 50000]['id']
            person_uuid = results_p[person_index // 50000]['editsResult']['Person']['addResults'][person_index % 50000]['id']
            
            # Create relationship
            relationship = {
                "_objectType": "relationship",
                "_typeName": "Attends",
                "_originEntityId": person_uuid,
                "_destinationEntityId": pub_uuid,
                "_properties": {}
            }
            relates_pub.append(relationship)

    start_idx += batch_size
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

print("Adding to knowledge graph")
start_time = time.time()
p_pub_relates = add_relates_in_batch(relates_pub, 50000)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 0.4629979133605957 seconds
Adding to knowledge graph
Elapsed time: 42.99869632720947 seconds


In [29]:
start_time = time.time()
# Create index dictionary for edits_w_id
index_dict_priv = create_index_dict(edits_priv_id, "PPIN")

# Process person entities in batches
batch_size = 50000
person_uuids = []
relates_priv = []
start_idx = 0
while start_idx < len(edits_p_id):
    end_idx = min(start_idx + batch_size, len(edits_p_id))
    batch_person_entities = edits_p_id[start_idx:end_idx]

    for person_entity in batch_person_entities:
        priv_id = person_entity['_properties']['school_id']
        person_index = person_entity['_index']
        #print(pub_id, person_index)
        if priv_id == 'nan':
            #print(pub_id)
            continue

        # Search for hh_id in index dictionary
        matching_indices = index_dict_priv.get(priv_id, [])
        for matching_index in matching_indices:
            # Get UUIDs for household and person entities
            priv_uuid = results_priv[matching_index // 50000]['editsResult']['Priv_School']['addResults'][matching_index % 50000]['id']
            person_uuid = results_p[person_index // 50000]['editsResult']['Person']['addResults'][person_index % 50000]['id']
            
            # Create relationship
            relationship = {
                "_objectType": "relationship",
                "_typeName": "Attends",
                "_originEntityId": person_uuid,
                "_destinationEntityId": priv_uuid,
                "_properties": {}
            }
            relates_priv.append(relationship)

    start_idx += batch_size
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

print("Adding to knowledge graph")
start_time = time.time()
p_pub_relates = add_relates_in_batch(relates_priv, 50000)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Elapsed time: 0.27098774909973145 seconds
Adding to knowledge graph
Elapsed time: 0.7270524501800537 seconds


In [30]:
#edits_p_id[29675]['_properties']['school_id']

In [31]:
#edits_p_id[29675]['_index']

Below is an example of the mapping of KG on a gis map. There is a limit of 2000 entries here and won't display entire graph. The KG can be access from arcgis Pro gui for full visualization. 

In [32]:
from arcgis.features import FeatureSet, Feature
from arcgis.geometry import Geometry

query_results = kg.query("MATCH (n:Household) RETURN n")
house_list = []

for house in query_results:
    geom = Geometry(house[0]['_properties']['shape'])
    house_feat = Feature(geometry=geom)
    house_list.append(house_feat)
    
house_fs = FeatureSet(features= house_list, geometry_type= 'Point', spatial_reference= kg.datamodel['spatial_reference'])
house_sdf = house_fs.sdf

new_map = gis.map("North Carolina")
#new_map.center
new_map.zoom = 6
new_map.basemap = 'gray-vector'
house_sdf.spatial.plot(map_widget = new_map, renderer_type = 's', markersize = 3, symbol_type = 'simple')
#new_map

True