In [1]:
import json
import numpy as np
from collections import Counter

In [2]:
kb_json = '../dataset/kb.json'

train_json = '../dataset/train.json'
val_json = '../dataset/val.json'
test_json = '../dataset/test.json'

In [3]:
def string_clean(s: str) -> str:
    s = s.replace(',', ' and ')
    s = ' '.join(s.split())
    return s

def find_name(kb, id):
    try:
        return kb['entities'][id]['name']
    except:
        try:
            return kb['concepts'][id]['name']
        except:
            raise


In [None]:
# # Different version that have different sort
# def get_qualifier_relational_clean_fullname_combine(kb_json, output=False, file_name='kb_q_r_clean_fullname_combine.txt'):
#     qualifier = dict()
#     kb = json.load(open(kb_json))
#     for i in kb['entities']:
#         fullname = kb['entities'][i]['name']
#         for rel_dict in kb['entities'][i]['relations']:
#             # First: add fact key, also called triple pairs
#             statement = None
#             if rel_dict['direction'] == 'forward':
#                 statement = (string_clean(fullname), string_clean(rel_dict['predicate']), string_clean(find_name(kb, rel_dict['object'])))
#             elif  rel_dict['direction'] == 'backward':
#                 statement = (string_clean(find_name(kb, rel_dict['object'])), string_clean(rel_dict['predicate']), string_clean(fullname))
            
#             if not statement in qualifier: 
#                 qualifier[statement] = set()

#             for qk, qvs in rel_dict['qualifiers'].items():                
#                 # Second add qk - qv pairs, for qv that have more than one instance, seperate to single qk - qv pairs
#                 new_qvs = []
#                 for qv in qvs:
#                     if qv['type'] == 'string':
#                         new_qvs.append(string_clean(qv['value']))
                        
#                 if len(new_qvs) != 0:
#                     for qv in new_qvs:
#                         # Add as pairs so that can do duplication check
#                         qualifier[statement].add(tuple([string_clean(qk), qv]))
        
#     # Third: Make sure the statement is qualifier 
#     output_qualifier = set()
#     for statement, qkv_pairs in qualifier.items():    
#         if len(qkv_pairs) > 0:
#             new_qkv_pairs = sorted(qkv_pairs)
#             new_qkv_list = []
#             for pair in new_qkv_pairs:
#                 new_qkv_list += list(pair)
#             output_qualifier.add(tuple(list(statement) + new_qkv_list))

#     output_qualifier = sorted(output_qualifier)

#     if output:
#         str_q = [",".join(q)+'\n' for q in output_qualifier]
#         with open(file_name, 'w', encoding='utf-8') as f:
#             f.writelines(str_q)

#     return output_qualifier


## Main

In [4]:
# Version that will have same sort
def get_qualifier_relational_clean_fullname_combine(kb_json, output=False, file_name='kb_q_r_clean_fullname_combine.txt'):
    qualifier = dict()
    kb = json.load(open(kb_json))
    for i in kb['entities']:
        fullname = kb['entities'][i]['name']
        for rel_dict in kb['entities'][i]['relations']:
            # First: add fact key, also called triple pairs
            statement = None
            if rel_dict['direction'] == 'forward':
                statement = (string_clean(fullname), string_clean(rel_dict['predicate']), string_clean(find_name(kb, rel_dict['object'])))
            elif  rel_dict['direction'] == 'backward':
                statement = (string_clean(find_name(kb, rel_dict['object'])), string_clean(rel_dict['predicate']), string_clean(fullname))
            
            if not statement in qualifier: 
                qualifier[statement] = dict()

            for qk, qvs in rel_dict['qualifiers'].items():                
                # Second add qk - qv pairs, for qv that have more than one instance, seperate to single qk - qv pairs
                new_qvs = []
                for qv in qvs:
                    if qv['type'] == 'string':
                        new_qvs.append(string_clean(qv['value']))
                        
                if len(new_qvs) != 0:
                    for qv in new_qvs:
                        if string_clean(qk) not in qualifier[statement]:
                            qualifier[statement][string_clean(qk)] = [qv]
                        else:
                            if qv not in qualifier[statement][string_clean(qk)]:
                                qualifier[statement][string_clean(qk)] += [qv]
        
    # Third: Make sure the statement is qualifier 
    output_qualifier = set()
    for statement, qkv_pairs in qualifier.items():    
        if len(qkv_pairs) > 0:
            new_qkv_list = []
            for qk in qkv_pairs:
                for qv in qkv_pairs[qk]:
                    new_qkv_list += [qk, qv]
            output_qualifier.add(tuple(list(statement) + new_qkv_list))

    # output_qualifier = sorted(output_qualifier)

    if output:
        str_q = [",".join(q)+'\n' for q in output_qualifier]
        with open(file_name, 'w', encoding='utf-8') as f:
            f.writelines(str_q)

    return output_qualifier


In [5]:
q = get_qualifier_relational_clean_fullname_combine(kb_json, output=False)

In [8]:
from collections import Counter
c = Counter()
for st in q:
    c.update([len(st)])

print(c)

Counter({5: 14377, 7: 6128, 9: 1395, 11: 851, 13: 327, 15: 220, 19: 114, 23: 59, 17: 30, 27: 21, 31: 19, 35: 11, 39: 11, 21: 11, 43: 11, 25: 9, 37: 9, 33: 8, 29: 7, 47: 7, 41: 5, 63: 4, 51: 4, 45: 3, 49: 2, 109: 1, 119: 1, 53: 1, 133: 1, 153: 1, 67: 1, 137: 1, 55: 1, 83: 1, 129: 1, 57: 1, 71: 1, 65: 1})


In [None]:
def get_relational_clean_fullname_combine(kb_json, output=False, file_name='kb_r_clean_fullname_combine.txt'):
    qualifier = dict()
    kb = json.load(open(kb_json))
    for i in kb['entities']:
        fullname = kb['entities'][i]['name']
        
        # For instance of
        for concept_id in kb['entities'][i]['instanceOf']:
            statement = (string_clean(fullname), 'instance of', string_clean(find_name(kb, concept_id)))
            if not statement in qualifier: 
                qualifier[statement] = dict()

        for rel_dict in kb['entities'][i]['relations']:
            # First: add fact key, also called triple pairs
            statement = None
            if rel_dict['direction'] == 'forward':
                statement = (string_clean(fullname), string_clean(rel_dict['predicate']), string_clean(find_name(kb, rel_dict['object'])))
            elif  rel_dict['direction'] == 'backward':
                statement = (string_clean(find_name(kb, rel_dict['object'])), string_clean(rel_dict['predicate']), string_clean(fullname))
            
            if not statement in qualifier: 
                qualifier[statement] = dict()

            for qk, qvs in rel_dict['qualifiers'].items():                
                # Second add qk - qv pairs, for qv that have more than one instance, seperate to single qk - qv pairs
                new_qvs = []
                for qv in qvs:
                    if qv['type'] == 'string':
                        new_qvs.append(string_clean(qv['value']))
                        
                if len(new_qvs) != 0:
                    for qv in new_qvs:
                        if string_clean(qk) not in qualifier[statement]:
                            qualifier[statement][string_clean(qk)] = [qv]
                        else:
                            if qv not in qualifier[statement][string_clean(qk)]:
                                qualifier[statement][string_clean(qk)] += [qv]

    # Third: Add statement
    output_qualifier = set()
    for statement, qkv_pairs in qualifier.items():    
        new_qkv_list = []
        for qk in qkv_pairs:
            for qv in qkv_pairs[qk]:
                new_qkv_list += [qk, qv]
        output_qualifier.add(tuple(list(statement) + new_qkv_list))
    
    # output_qualifier = sorted(output_qualifier)
    
    if output:
        str_q = [",".join(q)+'\n' for q in output_qualifier]
        with open(file_name, 'w', encoding='utf-8') as f:
            f.writelines(str_q)
    
    return output_qualifier

In [None]:
q = get_relational_clean_fullname_combine(kb_json, output=True)

In [None]:
def get_attributes_clean_fullname(kb_json, output=False, file_name='kb_a_clean_fullname_combine.txt'):
    qualifier = dict()
    kb = json.load(open(kb_json))
    for i in kb['entities']:
        fullname = kb['entities'][i]['name']

        # For attribute
        for att_dict in kb['entities'][i]['attributes']:
            # First: if it is literal, ignore it
            if att_dict['value']['type'] != 'string':
                continue
            else:
                # Second: add attributes
                statement = (string_clean(fullname), string_clean(att_dict['key']), string_clean(att_dict['value']['value']))
            
            if not statement in qualifier: 
                qualifier[statement] = dict()

            for qk, qvs in att_dict['qualifiers'].items():                
                # Second add qk - qv pairs, for qv that have more than one instance, seperate to single qk - qv pairs
                new_qvs = []
                for qv in qvs:
                    if qv['type'] == 'string':
                        new_qvs.append(string_clean(qv['value']))
                        
                if len(new_qvs) != 0:
                    for qv in new_qvs:
                        if string_clean(qk) not in qualifier[statement]:
                            qualifier[statement][string_clean(qk)] = [qv]
                        else:
                            if qv not in qualifier[statement][string_clean(qk)]:
                                qualifier[statement][string_clean(qk)] += [qv]
            
    # Third: Add statement
    output_qualifier = set()
    for statement, qkv_pairs in qualifier.items():    
        new_qkv_list = []
        for qk in qkv_pairs:
            for qv in qkv_pairs[qk]:
                new_qkv_list += [qk, qv]
        output_qualifier.add(tuple(list(statement) + new_qkv_list))
    
    # output_qualifier = sorted(output_qualifier)
    
    if output:
        str_q = [",".join(q)+'\n' for q in output_qualifier]
        with open(file_name, 'w', encoding='utf-8') as f:
            f.writelines(str_q)
    
    return output_qualifier

In [None]:
q = get_attributes_clean_fullname(kb_json, output=True)

In [5]:
def get_all_clean_fullname(kb_json, output=False, file_name='kb_all_clean_fullname_combine.txt'):
    qualifier = dict()
    kb = json.load(open(kb_json))
    for i in kb['entities']:
        fullname = kb['entities'][i]['name']
        
        # For instance of
        for concept_id in kb['entities'][i]['instanceOf']:
            statement = (string_clean(fullname), 'instance of', string_clean(find_name(kb, concept_id)))
            if not statement in qualifier: 
                qualifier[statement] = dict()

        # For relation
        for rel_dict in kb['entities'][i]['relations']:
            # First: add fact key, also called triple pairs
            statement = None
            if rel_dict['direction'] == 'forward':
                statement = (string_clean(fullname), string_clean(rel_dict['predicate']), string_clean(find_name(kb, rel_dict['object'])))
            elif  rel_dict['direction'] == 'backward':
                statement = (string_clean(find_name(kb, rel_dict['object'])), string_clean(rel_dict['predicate']), string_clean(fullname))
            
            if not statement in qualifier: 
                qualifier[statement] = dict()

            for qk, qvs in rel_dict['qualifiers'].items():                
                # Second add qk - qv pairs, for qv that have more than one instance, seperate to single qk - qv pairs
                new_qvs = []
                for qv in qvs:
                    if qv['type'] == 'string':
                        new_qvs.append(string_clean(qv['value']))
                        
                if len(new_qvs) != 0:
                    for qv in new_qvs:
                        if string_clean(qk) not in qualifier[statement]:
                            qualifier[statement][string_clean(qk)] = [qv]
                        else:
                            if qv not in qualifier[statement][string_clean(qk)]:
                                qualifier[statement][string_clean(qk)] += [qv]

        # For attribute
        for att_dict in kb['entities'][i]['attributes']:
            # First: if it is literal, ignore it
            if att_dict['value']['type'] != 'string':
                continue
            else:
                # Second: add attributes
                statement = (string_clean(fullname), string_clean(att_dict['key']), string_clean(att_dict['value']['value']))
            
            if not statement in qualifier: 
                qualifier[statement] = dict()

            for qk, qvs in att_dict['qualifiers'].items():                
                # Second add qk - qv pairs, for qv that have more than one instance, seperate to single qk - qv pairs
                new_qvs = []
                for qv in qvs:
                    if qv['type'] == 'string':
                        new_qvs.append(string_clean(qv['value']))
                        
                if len(new_qvs) != 0:
                    for qv in new_qvs:
                        if string_clean(qk) not in qualifier[statement]:
                            qualifier[statement][string_clean(qk)] = [qv]
                        else:
                            if qv not in qualifier[statement][string_clean(qk)]:
                                qualifier[statement][string_clean(qk)] += [qv]

    # Third: Add statement
    output_qualifier = set()
    for statement, qkv_pairs in qualifier.items():    
        new_qkv_list = []
        for qk in qkv_pairs:
            for qv in qkv_pairs[qk]:
                new_qkv_list += [qk, qv]
        output_qualifier.add(tuple(list(statement) + new_qkv_list))
    
    # output_qualifier = sorted(output_qualifier)
    
    if output:
        str_q = [",".join(q)+'\n' for q in output_qualifier]
        with open(file_name, 'w', encoding='utf-8') as f:
            f.writelines(str_q)
    
    return output_qualifier

In [6]:
q = get_all_clean_fullname(kb_json, output=True)

## Others

In [10]:
def random_sampling(s: set, split: list=[0.85, 0.15]):
    str_l = [",".join(q)+'\n' for q in s]
    str_l = np.array(str_l)
    length = len(str_l)
    permutation = np.random.permutation(length).reshape(-1)
    trn_length = np.round(length * split[0]).astype(int)
    # vld_length = np.round(length * split[1])
    tst_length = np.round(length * split[1]).astype(int)
    # assert (trn_length + vld_length + tst_length) == length
    assert (trn_length + tst_length) == length
    trn = str_l[permutation[0:trn_length]]
    # vld = str_l[permutation[trn_length:trn_length+vld_length]]
    # tst = str_l[permutation[trn_length+vld_length:length]]
    tst = str_l[permutation[trn_length:length]]

    with open("train.txt", 'w')as f:
        f.writelines(trn)
    with open("test.txt", 'w')  as f:
        f.writelines(tst)

In [1]:
def generate_fake_test(s: set, split: int=0.01):
    str_l = [",".join(q)+'\n' for q in s]
    str_l = np.array(str_l)
    length = len(str_l)
    permutation = np.random.permutation(length).reshape(-1)
    trn_length = np.round(length).astype(int)
    tst_length = np.round(length * split).astype(int)
    trn = str_l[permutation]
    tst = str_l[permutation[:tst_length]]

    with open("train.txt", 'w')as f:
        f.writelines(trn)
    with open("test.txt", 'w')  as f:
        f.writelines(tst)

In [None]:
generate_fake_test(q)