In [1]:
import numpy
import pandas
import io
import uuid
import psycopg.sql
import pyarrow
import pyarrow.parquet

import jobqueue
from jobqueue.connection_manager import ConnectionManager



import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import os
import datetime

from typing import Callable, List

from psycopg import sql

import dmp.keras_interface.model_serialization as model_serialization
from dmp.task.experiment.training_experiment.training_epoch import TrainingEpoch
from dmp.postgres_interface.element.column import Column
from dmp.postgres_interface.element.table import Table
from dmp.postgres_interface.element.column_group import ColumnGroup

from dmp.util.butter_e_export import *

pd.options.display.max_seq_items = None

%load_ext autoreload
%autoreload 2
credentials = jobqueue.load_credentials("dmp")


2023-11-20 16:50:52.248194: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


TODO: convert this into two steps:
    1) Get the ids of the runs we want to extract (in energy batches) ordered by experiment_id

          SELECT
                  {run}.{run_id}
          FROM
                  {run},
                  {job_status},
                  {job_data}
          WHERE TRUE
                  AND {run}.batch like {pattern}
                  AND {job_status}.id = {run}.run_id
                  AND {job_status}.id = {job_data}.id
              AND {job_status}.status = 2
          ORDER BY experiment_id, run_id;

    2) In parallel (using the multiprocessing lib), extract blocks of ids into a partitioned parquet file
      -> partition by the attributes we care about querying by (dataset, size, shape, depth)

              SELECT
                  {columns}
              FROM
                  {run},
                  {job_status},
                  {job_data}
              WHERE TRUE
                  AND {job_status}.id = {run}.run_id
                  AND {job_status}.id = {job_data}.id
                  AND {job_status}.status = 2
                  AND {run}.{run_id} IN ({ids})

        pool = multiprocessing.ProcessPool(multiprocessing.cpu_count())
        results = pool.uimap(download_chunk, chunks)
        for num_rows, chunk in results:
            num_stored += 1
            print(f"Stored {num_rows} in chunk {chunk}, {num_stored} / {len(chunks)}.")

    extra credit) extract butter data matching this as well into a new dataset
    extra credit) make a summary dataset that summarizes the quartiles of # epochs to reach target test loss levels
    

In [2]:
from psycopg import ClientCursor


print(f"run vars {vars(run)}")

columns = (
    run
    + ColumnGroup(*[c for c in job_status.columns if c.name != "id"])
    + job_data.command
)
print(columns.names)


def passthrough(row, index, value, column, data):
    data[column.name] = value


column_converters: List[Callable] = [passthrough for _ in columns]


def flatten_json(json_obj, destination=None, parent_key="", separator="_"):
    if isinstance(destination, dict):
        flattened = destination
    else:
        flattened = {}

    for key, value in json_obj.items():
        new_key = f"{parent_key}{separator}{key}" if parent_key else key
        if isinstance(value, dict):
            flattened.update(flatten_json(value, new_key, separator=separator))
        else:
            flattened[new_key] = value
    return flattened


column_converters[
    columns.get_index_of(job_data.command)
] = lambda row, index, value, column, data: flatten_json(value, destination=data)
column_converters[
    columns.get_index_of(run.run_data)
] = lambda row, index, value, column, data: flatten_json(value, destination=data)


def parquet_to_dataframe(row, index, value, column, data):
    with io.BytesIO(value) as buffer:
        data[column.name] = (
            pyarrow.parquet.read_table(pyarrow.PythonFile(buffer, mode="r"))
            .to_pandas()
            .sort_values(by="epoch")
        )


column_converters[columns.get_index_of(run.run_history)] = parquet_to_dataframe
column_converters[columns.get_index_of(run.run_extended_history)] = parquet_to_dataframe


dfs = []

run vars {'_name': 'run', '_columns': (), '_index': None}
('experiment_id', 'run_timestamp', 'run_id', 'job_id', 'seed', 'slurm_job_id', 'task_version', 'num_nodes', 'num_cpus', 'num_gpus', 'gpu_memory', 'host_name', 'batch', 'run_data', 'run_history', 'run_extended_history', 'queue', 'status', 'priority', 'start_time', 'update_time', 'worker', 'error_count', 'error', 'parent', 'command')
{Column(_name='experiment_id', type_name='uuid'): 0, Column(_name='run_timestamp', type_name='timestamp'): 1, Column(_name='run_id', type_name='uuid'): 2, Column(_name='job_id', type_name='uuid'): 3, Column(_name='seed', type_name='bigint'): 4, Column(_name='slurm_job_id', type_name='bigint'): 5, Column(_name='task_version', type_name='smallint'): 6, Column(_name='num_nodes', type_name='smallint'): 7, Column(_name='num_cpus', type_name='smallint'): 8, Column(_name='num_gpus', type_name='smallint'): 9, Column(_name='gpu_memory', type_name='integer'): 10, Column(_name='host_name', type_name='text'): 11,

In [3]:
import json
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import multiprocessing 
import tqdm

def get_ids(**kwargs):
    with ConnectionManager(credentials) as connection:
        query = psycopg.sql.SQL(
            """
                SELECT
                    {run}.run_id
                FROM
                    {run},
                    {job_status},
                    {job_data}
                WHERE TRUE
                    AND {run}.batch like {pattern}
                    AND {job_status}.id = {run}.run_id
                    AND {job_status}.id = {job_data}.id
                    AND {job_status}.status = 2
                    AND {job_data}.command @> {json_data}::jsonb
                ORDER BY experiment_id, run_id;
            """ 
        ).format(
            run=run.identifier,
            job_status=job_status.identifier,
            job_data=job_data.identifier,
            pattern=sql.Literal("%energy%"),
            json_data=sql.Literal(get_json(**kwargs)
            )
        )

        with ClientCursor(connection) as c:
           c.mogrify(query)

        with connection.cursor(binary=True) as cursor:
            cursor.execute(query, binary=True)
            ids = cursor.fetchall()
            rows = []
            for id in ids:
                rows.append(str(id[0]))
            return rows
        
def get_json(**kwargs):
    # Given dictionary
    given_dict = {}
   
    # Update the given dictionary with user input
    given_dict.update(kwargs)

    # Convert the updated dictionary to JSON
    json_result = json.dumps(given_dict, indent=2)

    return json_result

In [4]:
def get_keys(d):
    keys = []
    for k, v in d.items():
        if isinstance(v, dict):
            keys.extend(get_keys(v))
        else:
            keys.append((k, v))
    return keys

def convert_bytes_to_dataframe(bytearray):
    # data = None
    # with io.BytesIO(bytearray) as buffer:
    #     data = (
    #         pyarrow.parquet.read_table(pyarrow.PythonFile(buffer, mode="r"))
    #         .to_pandas()
    #         .sort_values(by="epoch")
    #     )
    return str(bytearray)

def save_data(run_ids):
    with ConnectionManager(credentials) as connection:
        query = psycopg.sql.SQL(
            """
                SELECT
                    {columns}
                FROM
                    {run}, 
                    {job_status},
                    {job_data}
                WHERE
                	{job_status}.id = {run}.run_id
                    AND {job_status}.id = {job_data}.id
                    AND {job_status}.status = 2
                    AND {run}.run_id IN ({run_ids});
            """ 
        ).format(
            columns=columns.columns_sql,
            run_ids=sql.SQL(', ').join(map(sql.Literal, run_ids)),
            run=run.identifier,
            job_data=job_data.identifier,
            job_status=job_status.identifier,
        )
       
        with ClientCursor(connection) as c:
            c.mogrify(query)
        
        with connection.cursor(binary=True) as cursor:
            cursor.execute(query, binary=True)
            rows = cursor.fetchall()
            
        # cast rows to a dataframe
        df = pd.DataFrame(rows, columns=columns.names)
        
        data = []
        data_names = []
        # iterate through the rows
        for index, row in df.iterrows():
            # get the keys from the command column
            keys = get_keys(row['command'])
            data_names = [key[0] for key in keys]
            data.append([key[1] for key in keys])

        # convert run_history and run_extended_history to dataframes
        df['run_history'] = df['run_history'].apply(convert_bytes_to_dataframe)
        df['run_extended_history'] = df['run_extended_history'].apply(convert_bytes_to_dataframe)
        

        # create a dataframe from the data
        df2 = pd.DataFrame(data, columns=data_names)
        # get intersection of columns
        intesection = list(set(df.columns) & set(df2.columns))
        # drop the intersection from the first dataframe
        df2 = df2.drop(intesection, axis=1)


        # merge the dataframes
        df = pd.concat([df, df2], axis=1)

        # for each row in the dataframe if the column is a UUID convert it to a string
        for index, row in df.iterrows():
            for column in df.columns:
                if isinstance(row[column], uuid.UUID):
                    df.at[index, column] = str(row[column])
        
        # drop duplicate columns
        df = df.loc[:,~df.columns.duplicated()]

        # convert to pyarrow table
        table = pa.Table.from_pandas(df)

        # write to distributed parquet file saved as ['name','depth','size','shape']
        pq.write_to_dataset(table, root_path='.', partition_cols=['name','shape','depth','size'])
        


In [5]:
def combine_chunk_lists(lists, elements_per_chunk):
    combined = []
    for l in lists:
        combined.extend(l)
    # remove duplicates
    combined = list(set(combined))
    # split into chunks
    chunks = len(combined) // elements_per_chunk
    combined = np.array_split(combined, chunks)
    return combined

def parallel_save_data(chunks):
    # TODO: use multiprocessing to parallelize saving the data
    for chunk in tqdm.tqdm(chunks):
        save_data(chunk)

In [6]:

#  get some example data
ids_rec_depth5_sleep = get_ids(model={"shape":"rectangle", "depth":5},
                dataset={"name":"sleep"}
        )

ids_rec_depth6_sleep = get_ids(model={"shape":"rectangle", "depth":6},
                dataset={"name":"sleep"}
        )

print(f"Number of ids: {len(ids_rec_depth5_sleep)}")
print(f"Number of ids: {len(ids_rec_depth6_sleep)}")

batches = combine_chunk_lists([ids_rec_depth5_sleep, ids_rec_depth6_sleep], 30)

print(f"Number of batches: {len(batches)}")

parallel_save_data(batches)

Number of ids: 32
Number of ids: 30
Number of batches: 2


100%|██████████| 2/2 [00:06<00:00,  3.05s/it]
