In [None]:
# takes 30m to process 500,000 player-session files.
# Performs following steps:
# 1) Load player/*.session.json files.
# 2) Creating sql-friendly dictionaries from json files.
# 3) Iterate over json dictionaries producing a sql file for each json file.
# 4) Read all the sql files to create an in-memory database.
#    Creating an in-memory db, then writing to disk is 100x faster then directly creating a db on disk.
# 5) Write the in-memory database to a single disk file (>4GB).

# next steps:
# acbl-sql-to-tournament-sessions.ipynb (not written yet) reads SQL tables

# previous steps:
# acbl_download_tournaments.ipynb

# todo:
# Clean up sql tables. Change NULL to NOT NULL. Add/remove PRIMARY KEY? Change VARCHAR to INT, REAL, etc.
# Improve autogeneration of sql table file.
# Improve how json_to_sql_walk() and CreateSqlFile() handles ids. Effect ON CONFLICT

In [None]:
import config # contains configurations/settings.import pandas as pd
import pathlib
import time
import json
from collections import defaultdict
import requests
import mlBridgeLib

import sqlalchemy
import sqlalchemy_utils

import traceback

In [None]:
# override pandas display options
mlBridgeLib.pd_options_display()

In [None]:
rootPath = pathlib.Path('e:/bridge/data')
acblPath = rootPath.joinpath('acbl')

In [None]:
# simple function to walk a json file printing keys and values.
# usage: json_walk('main',data_json) where 'main' is first table and data_json is a string containing json.

def json_walk_print(key,value):
    if type(value) is dict:
        #print('dict:'+key)
        for k,v in value.items():
            kk = key+'.'+k
            json_walk_print(kk,v)
    elif type(value) is list:
        #print('list:'+key)
        for n,v in enumerate(value):
            kk = key+'['+str(n)+']'
            json_walk_print(kk,v)
    else:
        if type(value) is str:
            value = '"'+value+'"'
        print(key+'='+str(value))
    return

In [None]:
# walk a json file building a table suitable for generating SQL statements.
# usage: json_to_sql_walk(tables,'main',data_json,primary_keys) where 'main' is first table and data_json is a string containing json.

def sql_create_tables(tables,key,value):
    #print(tables, key, value)
    #print(f"{key}={value}")
    splited = key.split('.')
    tableName = splited[-3]
    fieldId = splited[-2]
    fieldName = splited[-1]
    #print("ct:", tableName, fieldId, fieldName, type(value))
    # removed assert as they were json schema specific
    #assert not tableName[0].isdigit(), [tableName, fieldId, fieldName, type(value)]
    #assert fieldId[0].isdigit(), [tableName, fieldId, fieldName, type(value)]
    #assert not fieldName[0].isdigit(), [tableName, fieldId, fieldName, type(value)]
    if fieldName in tables[tableName][fieldId]:
        #print(type(tables[tableName][fieldId][fieldName]))
        #print(tableName,fieldId,fieldName)
        assert type(tables[tableName][fieldId][fieldName]) is list
        if type(value) is list:
            tables[tableName][fieldId][fieldName] += value
        else:
            tables[tableName][fieldId][fieldName].append(value)
        # set will return unique values from list but all must be same type e.g. str
        tables[tableName][fieldId][fieldName] = list(set(tables[tableName][fieldId][fieldName])) # force unique. award pigment issue
    else:
        tables[tableName][fieldId][fieldName] = value
    return

def json_to_sql_walk(tables,key,last_id,uid,value,primary_keys):
    #print(tables,key,last_id,uid,value)
    if type(value) is dict:
        #print('dict:',key,uid)
        if any([pk in value for pk in primary_keys]):
            for pk in primary_keys:
                if pk in value:
                    last_id = key.split('.')[-1]
                    uid = [str(value[pk])]
                    if key.count('.') > 0:
                        sql_create_tables(tables,key,'-'.join(uid))
        elif all(not k.isdigit() for k in value.keys()):
            sql_create_tables(tables,key+'.'+'-'.join(uid)+'.id','-'.join(uid)) # create PRIMARY KEY column of 'id'
            sql_create_tables(tables,key+'.'+'-'.join(uid)+'.'+last_id,uid[0]) # create parent column using last_id and uid[0] (first id)
            if key.count('.') > 0:
                sql_create_tables(tables,key,['-'.join(uid)])
        for k,v in value.items():
            if all(kk.isdigit() for kk in value.keys()):
                json_to_sql_walk(tables,key,last_id,uid+[k],v,primary_keys)
            else:
                json_to_sql_walk(tables,key+'.'+'-'.join(uid)+'.'+k,last_id,uid,v,primary_keys)
    elif type(value) is list:
        #print('list:',key,uid)
        if len(value) > 0: # turn empty lists into NULL?
            #print("empty list:",key)
            sql_create_tables(tables,key,[])
        for n,v in enumerate(value):
            json_to_sql_walk(tables,key,last_id,uid+[str(n)],v,primary_keys)
    else:
        sql_create_tables(tables,key,value)
    return

#tables = defaultdict(lambda :defaultdict(dict))
#json_to_sql_walk(tables,'events',"",[],data_json,primary_keys)

In [None]:
# Create a file of SQL INSERT commands from table

def CreateSqlFile(tables,f,primary_keys):
    print("PRAGMA foreign_keys = OFF;", file=f) # is this still necessary????
    
    for k,v in tables.items():
        assert type(v) is defaultdict
        #print(f"DELETE FROM [{k}];", file=f) # delete all rows
        for kk,vv in v.items():
            assert type(vv) is dict
            s = '\",\"'.join(vvv for vvv in vv.keys()) # backslashes can't be included within format {}
            print(f"INSERT INTO \"{k}\" (\"{s}\")", file=f)
            values = []
            for kkk,vvv in vv.items():
                #print(kkk,vvv)
                if type(vvv) is str:
                    values.append('\''+vvv.replace('\'','\'\'')+'\'') # escape embedded double-quotes with sql's double double-quotes
                elif vvv is None:
                    values.append("NULL")
                elif type(vvv) is list:
                    #print("list:",kkk)
                    values.append('\'['+','.join(str(vvvv).replace('\'','\'\'') for vvvv in vvv)+']\'') # escape embedded double-quotes with sql's double double-quotes
                else:
                    values.append(vvv)
            print(f"VALUES({','.join(str(vvvv) for vvvv in values)})", file=f)
            # DO UPDATE SET updated_at=excluded.updated_at
            s = ','.join('\"'+vvv+'\"=excluded.\"'+vvv+'\"' for vvv in vv.keys()) # backslashes can't be included within format {}
            assert any([pk in vv for pk in primary_keys]),[primary_keys,vv]
            for pk in primary_keys:
                if pk in vv:
                    print(f"ON CONFLICT({pk}) DO UPDATE SET {s}", file=f, end='')
            #print('created_at' in vv.keys(),'updated_at' in vv.keys())
            assert ('created_at' in vv.keys()) == ('updated_at' in vv.keys())
            if 'created_at' in vv.keys():
                print(f"\nWHERE excluded.\"updated_at\" > \"updated_at\" OR (excluded.\"updated_at\" = \"updated_at\" AND excluded.\"created_at\" > \"created_at\")", file=f, end='')
            print(";\n",file=f)
    return

In [None]:
# takes 7m to 140m to process 320,000 files.
# takes 2m, or 7m to update 1 new month of 12,000 files or 11000s if starting from scratch (3h?).
# todo:
# how to get more more exception info in try/except?
# move acbl_tournament_sessions.sql into options
# move partial directory and rglob() into options
# create cookbook of sql queries.
# write sql script that creates tables and columns similar to Excel implementations.
# initially delete all urls or just filtered urls. if filtered, then need to pass to execute_sql() below.
# group urls by directory so number of files in directory can be displayed.
# show overall file progress, dir to process, files within each directory to process:(1234/20000) dir:(3/266) file:(123/300)
# if skip_existing_files and file exists, do a rglob for .sql files, create list of .json files not in .sql list.
#     ... will be faster than a for loop.

initially_delete_all_output_files = False
skip_existing_files = True
starting_nfile = 0
ending_nfile = 0
event_types = [] # 'PAIRS', 'HOME_STYLE_PAIRS', 'INDIVIDUAL', 'TEAMS'
#event_types = ['TEAMS']

urls = []
for path in acblPath.joinpath('players').rglob('*.session.json'): # fyi: PurePathPosix doesn't support glob/rglob
    urls.append(path)

total_execution_time = 0
total_files_written = 0
if ending_nfile == 0: ending_nfile = len(urls)
filtered_urls = urls[starting_nfile:ending_nfile]
total_urls = len(filtered_urls)
start_time = time.time()

# delete files first, using filtered list of urls
if initially_delete_all_output_files:
    for nfile,url in enumerate(filtered_urls):
        sql_file = url.with_suffix('.sql')
        sql_file.unlink(missing_ok=True)

for nfile,url in enumerate(filtered_urls):
    nfile += 1
    json_file = url
    sql_file = url.with_suffix('.sql')
    print(f"Processing ({nfile}/{total_urls}): file:{json_file.as_posix()}")
    if skip_existing_files:
        if sql_file.exists():
           print(f"Skipping: File exists:{sql_file.as_posix()}")
           continue
    #try:
        data_json = None
        with open(json_file, 'r') as f:
            data_json = json.load(f)
            #engine = sqlalchemy.create_engine(db_connection_string, echo=create_engine_echo)
            #raw_connection = engine.raw_connection()
            #create_tables_sql_file = "acbl_tournament_sessions.sql"
            #db_file = 'acbl_tournament_sessions.sqlite'
            #db_file_connection_string = 'sqlite:///'+acblPath.joinpath(db_file).as_posix()
            #engine = sqlalchemy.create_engine(db_file_connection_string, echo=False)
            #db = pd.read_json(f)
            #db.to_sql('sessions',con=engine)
            #error
        # hmmm, the obvious id, acbl_number, is not in the json file. However, it is the name of a parent directory so use it.
        data_json['acbl_number'] = json_file.parent.parent.stem
        data_json['acbl_number_session_id'] = data_json['acbl_number']+'_'+data_json['session_id']
        #print(f"Reading {json_file.as_posix()} dict len:{len(data_json)}")
        #if len(event_types) > 0 and data_json['type'] not in event_types:
        #    #print(f"Skipping type:{data_json['type']}: file{json_file.as_posix()}")
        #    continue
        tables = defaultdict(lambda :defaultdict(dict))
        primary_keys = ['acbl_number_session_id','id']
        json_to_sql_walk(tables,"sessions","","",data_json,primary_keys) # "sessions" is the main table.
        with open(sql_file,'w') as f:
            CreateSqlFile(tables,f,primary_keys)
        total_files_written += 1
    #except Exception as e:
        #print(f"Error: {e}: session_id:{data_json['session_id']} file:{url.as_posix()}")
    #else:
        print(f"Writing: session_id:{data_json['session_id']} file:{sql_file.as_posix()}")

print(f"All files processed:{total_urls} files written:{total_files_written} total time:{round(time.time()-start_time,2)}")


In [None]:
# Automatically create sql tables file.
# Will require further editing of most fields: fix PRIMARY KEY, make NOT NULL, change VARTYPE to INT, REAL, remove trailing comma.

def CreateSqlTablesFile(f,tables,primary_keys):
    assert type(tables) is defaultdict
    print(f'PRAGMA journal_mode=WAL;', file=f)
    print(file=f)
    for k,v in tables.items():
        print(f'DROP TABLE IF EXISTS "{k}";', file=f)
        print(file=f)
        print(f'CREATE TABLE "{k}" (', file=f)
        assert type(v) is defaultdict
        for kk,vv in v.items():
            #print('uid:','.'.join([k,kk]))
            assert kk[0].isdigit()
            assert type(vv) is dict
            for kkk,vvv in vv.items():
                #print('3:','.'.join([k,kkk]))
                assert type(vvv) is not list or type(vvv) is not dict
                if kkk in primary_keys:
                    print(f'"{kkk}" INT NOT NULL PRIMARY KEY,', file=f)
                else:
                    print(f'"{kkk}" VARCHAR NULL,', file=f) # or NOT
        for kkk,vvv in vv.items():
            #print(kkk,tables.keys())
            if kkk in tables:
                print(f'-- list of VARCHAR', file=f)
                print(f'FOREIGN KEY ("{kkk}") REFERENCES "{kkk}"(id) ON DELETE NO ACTION,', file=f)
        print(');', file=f)
        print(file=f)

# only create file if it doesn't exist. Must manually delete file if new version is wanted.
create_tables_sql_file = pathlib.Path("acbl_tournament_sessions.sql")
if not create_tables_sql_file.exists():
    with open(create_tables_sql_file,'w') as f:
        CreateSqlTablesFile(f,tables)

In [None]:
# notes:
# takes 9h to process 300,000 files into 40GB sql file. Each sql file is processed in about .05s. Windows 11 2700x system.
# 1) creating in memory db, executing scripts, then writing memory db to disk file is 100x faster than direct writes to file.
# 2) Can be incredibly slow unless "PRAGMA journal_mode=WAL;" is used. Much, much faster with PRAGMA.
# You can manually set, using sqlite3 command, journal_mode to WAL once and it will be persisted.
# .open <file>
# PRAGMA journal_mode=WAL;
# .quit
# Alternatively, connection can be opened in WAL mode. e.g. new SQLiteConnection("Data Source=" + file + ";PRAGMA journal_mode=WAL;")
# None of these statements improve performance:
#   "PRAGMA locking_mode=exclusive;"
#   "PRAGMA synchronous=NORMAL;"
#   db.commit()

# todo:
# Should .sql files use TRANSACTION and COMMIT to prevent partial updates?
# collect errors, report at finish, sort by error type.
# run VALIDATE or CHECK before/after file save?

# options
starting_nfile = 0 # beginning slice
ending_nfile = 0 # ending slice. 0 means all files
write_direct_to_disk = False # write directly to disk. Otherwise will write to memory and then backup to disk (100x faster).
create_tables = True # drop and recreate all tables. Will wipe out all data.
delete_db = True
create_engine_echo = False
perform_integrity_checks = False
db_file = 'acbl_tournament_sessions.sqlite'
db_memory_connection_string = 'sqlite://'
create_tables_sql_file = "acbl_tournament_sessions.sql"

db_file_connection_string = 'sqlite:///'+acblPath.joinpath(db_file).as_posix()
db_file_path = acblPath.joinpath(db_file)

if write_direct_to_disk:
    db_connection_string = db_file_connection_string # disk file based db
else:
    db_connection_string = db_memory_connection_string # memory based db

if delete_db and sqlalchemy_utils.functions.database_exists(db_file_connection_string):
    print(f"Deleting db:{db_file_connection_string}")
    sqlalchemy_utils.functions.drop_database(db_file_connection_string) # warning: can't delete file if in use by another app (restart kernel).

if not sqlalchemy_utils.functions.database_exists(db_connection_string):
    print(f"Creating db:{db_connection_string}")
    sqlalchemy_utils.functions.create_database(db_connection_string)
    create_tables = True
    
engine = sqlalchemy.create_engine(db_connection_string, echo=create_engine_echo)
raw_connection = engine.raw_connection()

if create_tables:
    print(f"Creating tables from:{create_tables_sql_file}")
    with open(create_tables_sql_file, 'r') as f:
        create_sql = f.read()
    raw_connection.executescript(create_sql) # create tables

urls = []
for path in acblPath.joinpath('players').rglob('*.session.sql'): # fyi: PurePathPosix doesn't support glob/rglob
    urls.append(path)

total_script_execution_time = 0
total_scripts_executed = 0
canceled = False
if ending_nfile == 0: ending_nfile = len(urls)
filtered_urls = urls[starting_nfile:ending_nfile]
total_filtered_urls = len(filtered_urls)
start_time = time.time()
for nfile,url in enumerate(filtered_urls):
    nfile += 1
    sql_file = url
    print(f"Executing SQL script ({nfile}/{total_filtered_urls}): file:{sql_file.as_posix()}")
    
    try:
        sql_script = None
        with open(sql_file, 'r') as f:
            sql_script = f.read()
        start_script_time = time.time()
        raw_connection.executescript(sql_script)
    except Exception as e:
        print(f"Error: {type(e).__name__} while processing file:{url.as_posix()}")
        print(traceback.format_exc())
        print(f"Removing {url.as_posix()}")
        error
        sql_file.unlink(missing_ok=True) # delete any bad files, fix issues, rerun.
        continue # todo: log error.
        #break
    except KeyboardInterrupt as e:
        print(f"Error: {type(e).__name__} while processing file:{url.as_posix()}")
        print(traceback.format_exc())
        canceled = True
        break
    else:
        script_execution_time = time.time()-start_script_time
        print(f"SQL script executed: file:{url.as_posix()}: time:{round(script_execution_time,2)}")
        total_script_execution_time += script_execution_time
        total_scripts_executed += 1

print(f"SQL scripts executed ({total_scripts_executed}/{total_filtered_urls}/{len(urls)}): total changes:{raw_connection.total_changes} total script execution time:{round(time.time()-start_time,2)}: avg script execution time:{round(total_script_execution_time/max(1,total_scripts_executed),2)}")
# if using memory db, write memory db to disk file.
if not canceled:
    if perform_integrity_checks:
        # todo: research how to detect and display failures? Which checks are needed?
        print(f"Performing quick_check on file")
        raw_connection.execute("PRAGMA quick_check;") # takes 7m on disk
        print(f"Performing foreign_key_check on file")
        raw_connection.execute("PRAGMA foreign_key_check;") # takes 3m on disk
        print(f"Performing integrity_check on file")
        raw_connection.execute("PRAGMA integrity_check;") # takes 25m on disk
    if not write_direct_to_disk:
        print(f"Writing memory db to file:{db_file_connection_string}")
        engine_file = sqlalchemy.create_engine(db_file_connection_string)
        raw_connection_file = engine_file.raw_connection()
        raw_connection.backup(raw_connection_file.connection)
        raw_connection_file.close()
        engine_file.dispose()

raw_connection.close()
engine.dispose()
print("Done.")