# Create schema mapping files for LDBC initial snapshot and batch inserts (CSV)
This notebook crawls through the LDBC csv directory and auto-discovers dataframes and creates header files based on file paths and column types.

In [1]:
import os
from pathlib import Path
import pandas as pd
import dask.dataframe as dd
import numpy as np
from dataclasses import dataclass

base_path = Path('/Users/andrew/datasets')
dataset_path = base_path / 'sf-0.003/csv/bi/composite-projected-fk'
headers_path = dataset_path / 'headers-v2'

In [2]:
@dataclass
class TYPE:
    df: dd.DataFrame
    files: list
    
def csv_to_df(path:Path):
    df = dd.read_csv(path / '*.csv',
                       delimiter='|',
                       assume_missing=True,
                       dtype={
                           'id': int,
                           'length': np.dtype('long'),
                           'content': np.dtype('O'),
                           'imageFile': np.dtype('O'),
                           'classYear': np.dtype('long'),
                           'workFrom': np.dtype('long'),
                           'language': np.dtype('O'),
                           'email': np.dtype('O')
                       },
                       converters={
                           'creationDate': lambda x: pd.to_datetime(x, unit='ns'),
                           'deletionDate': lambda x: pd.to_datetime(x, unit='ns'),
                           'birthday': lambda x: pd.to_datetime(x),
                           #'language': lambda x: x.split(';'),
                           #'email': lambda x: x.split(';')
                       })
    type_name = path.name
    # df['type'] = type_name
    # df['typed_id'] = df.apply(lambda row: type_name + ':' + str(row['id']), axis=1, meta=('typed_id', 'str'))
    return df

def csv_files_in_dir(path, directory):
    return [str((path / _).relative_to(directory)) for _ in path.iterdir() if _.match('*.csv')]

def dfs_from_directory(directory:Path):
    dfs = {}
    for root, dirs, _ in os.walk(directory):
        for name in dirs:
            path = Path(os.path.join(root, name))
            if any(_.match('*.csv') for _ in path.iterdir()):
                dfs[str(path.relative_to(directory))] = TYPE(df=csv_to_df(path),
                                                             files=csv_files_in_dir(path, dataset_path))
    return dfs

In [3]:
dfs = dfs_from_directory(dataset_path)

In [4]:
print(", ".join(dfs)[:500] + "...")

examples, deletes/dynamic/Post/batch_id=2012-12-02, deletes/dynamic/Post/batch_id=2012-12-11, deletes/dynamic/Post/batch_id=2012-12-06, deletes/dynamic/Post/batch_id=2012-12-24, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-10-13, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-11-25, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-12-02, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-12-28, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-12-19, deletes/dynamic/Forum_...


In [5]:
initial_snapshot_dfs = dict(filter(lambda x: x[0].startswith('initial_snapshot'), dfs.items()))
print(", ".join(initial_snapshot_dfs)[:500] + "...")

initial_snapshot/dynamic/Post, initial_snapshot/dynamic/Forum_hasMember_Person, initial_snapshot/dynamic/Post_hasTag_Tag, initial_snapshot/dynamic/Comment_hasTag_Tag, initial_snapshot/dynamic/Person, initial_snapshot/dynamic/Comment_replyOf_Post, initial_snapshot/dynamic/Comment, initial_snapshot/dynamic/Comment_replyOf_Comment, initial_snapshot/dynamic/Comment_hasCreator_Person, initial_snapshot/dynamic/Post_isLocatedIn_Place, initial_snapshot/dynamic/Person_hasInterest_Tag, initial_snapshot/dy...


In [6]:
inserts_dfs = dict(filter(lambda x: x[0].startswith('inserts'), dfs.items()))
print(", ".join(inserts_dfs)[:500] + "...")

inserts/dynamic/Post/batch_id=2012-10-24, inserts/dynamic/Post/batch_id=2012-10-23, inserts/dynamic/Post/batch_id=2012-10-15, inserts/dynamic/Post/batch_id=2012-10-13, inserts/dynamic/Post/batch_id=2012-10-14, inserts/dynamic/Post/batch_id=2012-10-25, inserts/dynamic/Post/batch_id=2012-11-13, inserts/dynamic/Post/batch_id=2012-09-20, inserts/dynamic/Post/batch_id=2012-09-29, inserts/dynamic/Post/batch_id=2012-12-05, inserts/dynamic/Post/batch_id=2012-09-16, inserts/dynamic/Post/batch_id=2012-11-...


In [7]:
deletes_dfs = dict(filter(lambda x: x[0].startswith('deletes'), dfs.items()))
print(", ".join(deletes_dfs)[:500] + "...")

deletes/dynamic/Post/batch_id=2012-12-02, deletes/dynamic/Post/batch_id=2012-12-11, deletes/dynamic/Post/batch_id=2012-12-06, deletes/dynamic/Post/batch_id=2012-12-24, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-10-13, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-11-25, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-12-02, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-12-28, deletes/dynamic/Forum_hasMember_Person/batch_id=2012-12-19, deletes/dynamic/Forum_hasMember_...


In [8]:
import re
fk_id_re = re.compile(r'(?P<label>[^\d\W]+)(?P<number>\d*)\.id')
edge_type_re = re.compile(r'(?P<start>\w+)_(?P<type>\w+)_(?P<end>\w+)')

dtype_to_katana_type = {np.dtype('int64'): 'LONG',
                        np.dtype('O'): 'STRING',
                        np.dtype('float64'): 'FLOAT',
                        pd.DatetimeTZDtype(tz='UTC'): 'DATETIME',
                        np.dtype('<M8[ns]'): 'DATE'}
default_label = {
    'Post' : 'Message',
    'Comment' : 'Message',
}

def get_default_label(c:str):
    if label_match := edge_type_re.search(c):
        label = label_match.group('type')
        return re.sub(r'(?<!^)(?=[A-Z])', '_', label).upper()
    if c in default_label:
        return default_label[c]
    return c

def get_type(c:str, dtype, label: str, include_fk=False):
    # print('c: ' + c + ', label: ' + label)
    if c == 'id':
        return c + ':ID(' + label + ')'
    elif label_match := edge_type_re.search(label):
        # parse edge type
        # print(label + ': ' + label_match.group('type'))
        if fk_label_match := fk_id_re.search(c):
            if not include_fk:
                c = ""
            fk_label = fk_label_match.group('label')
            number = fk_label_match.group('number')
            start = label_match.group('start')
            end = label_match.group('end')
            if fk_label == start and fk_label == end:
                if number == '1':
                    return c + ':START_ID(' + fk_label + ')'
                return c + ':END_ID(' + fk_label + ')'
            if fk_label == start:
                return c + ':START_ID(' + fk_label + ')'
            if fk_label == end:
                return c + ':END_ID(' + fk_label + ')'
    # parse node type
    if fk_id_re.search(c):
        return c + ':IGNORE'
    if c == 'birthday':
        return c + ':DATE'
    return c + ':' + dtype_to_katana_type[dtype]

def get_header(df:dd.DataFrame, label:str):
    return [get_type(c, df[c].dtype, label, include_fk=True) for c in df]

In [9]:
s = 'Person1.id'
m = fk_id_re.search(s)
if m:
    print(m.group('label'))

s = 'Person_knows_Person'
m = edge_type_re.search(s)
if m:
    print('start: ' + m.group('start'))
    print('end: ' + m.group('end'))
    print('type: ' + m.group('type'))

Person
start: Person
end: Person
type: knows


In [10]:
def generate_header_files(batches, directory:Path):
    if not directory.exists():
        directory.mkdir() 
        
    for b in batches:
        batch = batches[b]
        nodefile = ''
        edgefile = ''
        for t in batch:
            label = Path(t).name
            if '_' in label:
                # edge type
                #print('edge: ' + t)
                edgefile += 'KATANA_DEFAULT_LABEL=' + get_default_label(label) + '\n'
                edgefile += '|'.join(get_header(batch[t].df, label)) + '\n'
                edgefile += '\n'.join(batch[t].files) + '\n\n'
            else:
                # node type
                #print('node: ' + t)
                nodefile += 'KATANA_DEFAULT_LABEL=' + get_default_label(label) + '\n'
                nodefile += '|'.join(get_header(batch[t].df, label)) + '\n'
                nodefile += '\n'.join(batch[t].files) + '\n\n'
        # print('processing batch file: ' + b)
        if nodefile:
            batch_node_file = open(directory / (b + '-node_headers.txt'), 'w')
            batch_node_file.write(nodefile)
            batch_node_file.close()
        if edgefile:
            batch_edge_file = open(directory / (b + '-edge_headers.txt'), 'w')
            batch_edge_file.write(edgefile)
            batch_edge_file.close()

## Generate Initial Snapshot Header file

In [11]:
initial_snapshot_batches = {}
batch = 'initial_snapshot'
for key in initial_snapshot_dfs.keys():
    file = Path(key).name
    #print(file)
    if batch not in initial_snapshot_batches:
        initial_snapshot_batches[batch] = {}
    initial_snapshot_batches['initial_snapshot'][str(file)] = initial_snapshot_dfs[key]

output_path = headers_path / 'initial_snapshot'
output_path.mkdir(parents = True, exist_ok = True)
generate_header_files(initial_snapshot_batches, output_path)

## Generate Inserts Header file

In [12]:
insert_batches = {}
for key in inserts_dfs.keys():
    file = Path(key).parent
    batch = Path(key).name
    if batch not in insert_batches:
        insert_batches[batch] = {}
    insert_batches[batch][str(file)] = inserts_dfs[key]

output_path = headers_path / 'inserts'
output_path.mkdir(parents = True, exist_ok = True)
generate_header_files(insert_batches, output_path)

## Generate Deletes Header file

In [13]:
delete_batches = {}
for key in deletes_dfs.keys():
    file = Path(key).parent
    batch = Path(key).name
    if batch not in delete_batches:
        delete_batches[batch] = {}
    delete_batches[batch][str(file)] = deletes_dfs[key]

output_path = headers_path / 'deletes'
output_path.mkdir(parents = True, exist_ok = True)
generate_header_files(delete_batches, output_path)

## Collect and write out combined insert and delete batches

In [14]:
import json
from datetime import datetime

batches_list = list(set(insert_batches.keys()) | set(delete_batches.keys()))
batches_list.sort(key = lambda date: datetime.strptime(date, 'batch_id=%Y-%m-%d'))

output = []

for b in batches_list:
    insert_edge_path = headers_path / 'inserts' / (b + '-edge_headers.txt')
    insert_node_path = headers_path / 'inserts' / (b + '-node_headers.txt')
    delete_edge_path = headers_path / 'deletes' / (b + '-edge_headers.txt')
    delete_node_path = headers_path / 'deletes' / (b + '-node_headers.txt')
    batch = {}
    if insert_edge_path.exists():
        batch['insert_edge_list'] = str(insert_edge_path.relative_to(dataset_path))
    if insert_node_path.exists():
        batch['insert_node_list'] = str(insert_node_path.relative_to(dataset_path))
    if delete_edge_path.exists():
        batch['delete_edge_list'] = str(delete_edge_path.relative_to(dataset_path))
    if delete_node_path.exists():
        batch['delete_node_list'] = str(delete_node_path.relative_to(dataset_path))
    output.append(batch)
batches_file_name = 'batches.json'
batches_file = open(headers_path / batches_file_name, 'w')
batches_file.write(json.dumps(output, indent=1))
batches_file.close()