In [None]:
import os, sys, json
import itertools, importlib
import asyncpg, asyncio, threading

from edcompanion.eddbreader import edc_dbfilereader, edc_dbfile_process

In [None]:
pgsql_params = dict(
    dsn=os.getenv("PGSQL_URL"),
    server_settings={'search_path': "eddb"}
)
pgpool = await asyncpg.create_pool(**pgsql_params)

In [None]:
dbname_full = 'systems.json.gz'
dbname_1day = 'systems_1day.json.gz'
dbname_1week = 'systems_1week.json.gz'
dbname_2week = 'systems_2weeks.json.gz'
dbname_1month = 'systems_1month.json.gz'
dbname_6months = 'systems_6months.json.gz'

In [None]:
dataset = []
for item in edc_dbfilereader(os.path.join('data', dbname_day)):
    dataset.append(item)
dataset[0]

In [None]:
classifications = {s:c for s, c in zip('OBAFGKMN', range(9))}
#print(json.dumps(classifications, indent=2))

main_star_types={}
try:
    with open(os.path.join('data', 'clasifications.json'),'rt') as jsonfile:
        main_star_types.update(json.load(jsonfile))
except:
    pass

#print(json.dumps(main_star_types, indent=2))


In [None]:
if len(main_star_types) < 40:
    for item in edc_dbfilereader(os.path.join('data', dbname_2week)):
        main_star = item.get('mainStar')

        if main_star and not main_star_types.get(main_star):
            main_star_types[main_star] = len(classifications)


    try:
        with open(os.path.join('data', 'main_star_types.json'),'wt') as jsonfile:
            json.dump(main_star_types, jsonfile, indent=3)
    except:
        pass


## Create & Build

In [None]:
#await pgpool.execute(f"DROP TABLE IF EXISTS systems;")
await pgpool.execute(f"""
    CREATE TABLE IF NOT EXISTS systems (
        id64 BIGINT NOT NULL,
        x DOUBLE PRECISION  NOT NULL,
        y DOUBLE PRECISION  NOT NULL,
        z DOUBLE PRECISION  NOT NULL,
        name TEXT NOT NULL
    );
""")


In [None]:

async def push_records(data):
    return await pgpool.copy_records_to_table("systems", records=data)

records = []
count=0
for item in edc_dbfilereader(os.path.join('data', dbname_full), verbose=True):
    
    if not item:
        continue

    coords = item.get('coords')
    coordinates = [coords[k] for k in ['x','y','z']]
    records.append(
        [item.get("id64")] + coordinates + [item.get('name')] 
    )
    count +=1
    if count > 10000:
        qr = await pgpool.copy_records_to_table("systems", records=records)
        records = []
        count = 0

if count > 0:
    qr = await pgpool.copy_records_to_table("systems", records=records)
    records = []
    count = 0


In [None]:
print(f"Adding indexes ...")
await pgpool.execute(f"""
    CREATE INDEX IF NOT EXISTS systems_x_idx ON systems (x);
    CREATE INDEX IF NOT EXISTS systems_y_idx ON systems (y);
    CREATE INDEX IF NOT EXISTS systems_z_idx ON systems (z); 
    CREATE INDEX IF NOT EXISTS systems_name_idx ON systems (name);
""")

In [None]:
print("Removing duplicates by system name")
await pgpool.execute("""
    DELETE FROM systems a
    WHERE   a.ctid <> (SELECT min(b.ctid)
                     FROM   systems b
                     WHERE  a.name = b.name );"""
)

In [None]:
print(f"Adding unique index on system name ...")
await pgpool.execute(f"""
    DROP INDEX systems_name_idx ;
    CREATE UNIQUE INDEX IF NOT EXISTS systems_name_unique ON eddb.systems (name)
""")

## Update

In [None]:
def get_coordinates_from_item(item):
    coords = item.get('coords')
    return [coords[k] for k in ['x','y','z']]

async def process_data(datachunk):

    return await pgpool.executemany(
            """INSERT INTO systems (id64, x, y, z, name) 
                VALUES ($1, $2, $3, $4, $5) 
                ON CONFLICT DO NOTHING
            """, [
                [item.get("id64")] + get_coordinates_from_item(item) + [item.get('name')] 
                for item in datachunk
            ]
    )

await edc_dbfile_process(
    os.path.join('data', dbname_1month),
    process_data,
    verbose=True
)