In [None]:
import pickle

import numpy as np
import pandas as pd

from cyclops.processors.aggregate import Aggregator
from cyclops.processors.cleaning import (
    normalize_categories,
    normalize_names,
    normalize_values,
)
from cyclops.processors.column_names import (
    ADMIT_TIMESTAMP,
    AGE,
    CARE_UNIT,
    DIAGNOSIS_CODE,
    DIAGNOSIS_TRAJECTORY,
    ENCOUNTER_ID,
    EVENT_CATEGORY,
    EVENT_NAME,
    EVENT_TIMESTAMP,
    EVENT_VALUE,
    EVENT_VALUE_UNIT,
    HOSPITAL_ID,
    SEX,
    SUBJECT_ID,
    TIMESTEP,
    YEAR,
)
from cyclops.processors.constants import (
    BINARY,
    BY,
    CATEGORICAL_INDICATOR,
    FEATURE_INDICATOR_ATTR,
    FEATURE_MAPPING_ATTR,
    FEATURE_TYPE_ATTR,
    FEATURE_TYPES,
    FEATURES,
    MEAN,
    MIN_MAX,
    MISSING_CATEGORY,
    NUMERIC,
    ORDINAL,
    STANDARD,
    STRING,
    TARGETS,
)
from cyclops.query import mimic
from cyclops.query import process as qp
from cyclops.utils.file import load_dataframe, save_dataframe

https://github.com/sqlalchemy/sqlalchemy/wiki/RangeQuery-and-WindowedRangeQuery

In [None]:
YEARS = [2015, 2016, 2017, 2018, 2019, 2020]
AFTER_DATE = f"{min(YEARS)}-01-01"
OUTCOME_DEATH = "outcome_death"

In [None]:
"""Utility functions for batching."""

import psutil

from cyclops.query.util import TableTypes


def infer_feasible_batch_size(
    query: TableTypes, use_mem_percent: float, test_size: int = 1000
) -> int:
    """Infer a feasible batch size for a given query.

    Parameters
    ----------
    query: cyclops.query.util.TableTypes
        Query for which to get a feasible batch size.
    use_mem_percent: float
        A decimal percentage of the available memory to use for a batch.
    test_size: int, default = 1000
        The test size to query and evaluate the memory for.

    Returns
    -------
    int
        The batch size.

    """
    available = psutil.virtual_memory().available
    can_use = available * use_mem_percent
    generator = run_query_in_batches(events_query, test_size)
    data = next(generator)
    did_use = data.memory_usage().sum()
    batch_size = int(((can_use) / did_use) * test_size)
    return batch_size


def merge_batches_grouping_id(
    batch_before: pd.DataFrame, batch_after: pd.DataFrame, id_col: str
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
    """Merge two consequtive batches sorted by sample IDs.

    batch_before: pandas.DataFrame
        The batch coming consequtively before batch_after. Sorted by sample IDs.
    batch_after: pandas.DataFrame
        The batch coming consequtively after batch_before. Sorted by sample IDs.
    id_col: str
        Name of the sample ID column.

    Returns
    -------
    tuple
        A tuple of (pandas.DataFrame, pandas.DataFrame or None), where the first
        DataFrame consists of batch_before and any samples in batch_after
        with the same ID as the last ID in batch_before. The second DataFrame
        consists of the remaining samples, or None if there all the IDs matched.

    """
    # Check if ID is continued from the previous file
    if batch_after[id_col].iloc[0] == batch_before[id_col].iloc[-1]:
        # Check if the entire file consists of the same ID
        if batch_after[id_col].iloc[-1] == batch_after[id_col].iloc[0]:
            return pd.concat([batch_before, batch_after]), None

        change_ind = (
            (batch_after[id_col].shift() != batch_after[id_col]).iloc[1:].idxmax()
        )
        save_batch = pd.concat([batch_before, batch_after.iloc[:change_ind]])
        return save_batch, batch_after.iloc[change_ind:]

    return batch_before, batch_after

In [None]:
@table_params_to_type(Select)
def run_query_in_batches(
    self,
    query: TableTypes,
    batch_size: int,
) -> None:
    """Generate query batches.

    Parameters
    ----------
    query: cyclops.query.util.TableTypes
        Query to run.
    batch_size: int
        Batch size.

    Yields
    ------
    pandas.DataFrame
        A query batch.

    """
    generator = pd.read_sql_query(query, self.engine, chunksize=batch_size)
    while True:
        try:
            yield next(generator)
        except StopIteration:
            return


@table_params_to_type(Select)
def run_query_in_grouped_batches(
    self,
    query: TableTypes,
    batch_size: int,
    id_col: str,
) -> None:
    """Generate query batches with complete sets of sample IDs.

    Queries are sorted and grouped such that the rows for a given sample ID are kept
    together in a single batch.

    Parameters
    ----------
    query: cyclops.query.util.TableTypes
        Query to run.
    batch_size: int
        Approximate batch size before rearranging based on sample IDs.
    id_col: str
        Name of the sample ID column by which to batch.

    Yields
    ------
    pandas.DataFrame
        A query batch with complete sets of sample IDs.

    """
    # Sort in order to keep same IDs together, except perahps across the transitions
    # of batches
    # query = select(sort_values(query, id_col)) - REPLACE WITH qp.OrderBy
    # qp.OrderBy

    print("Happened1")

    generator = self.run_query_in_batches(query, batch_size)

    print("Happened2")

    batch_before = next(generator)
    print("Happened3")
    while True:
        try:
            batch_after = next(generator)
        except StopIteration:
            break

        save_batch, batch_before = merge_batches_grouping_id(
            batch_before, batch_after, id_col
        )

        # If batch_before is now None, all of batch_after was merged into batch_before
        # and it is necessary to check the next batch for the same ID
        if batch_before is None:
            batch_before = save_batch
        # If not all of the batch was merged, then we have all of the last ID and
        # we can yield
        else:
            yield save_batch
            del save_batch

    # Yield the last batch
    yield batch_before
    return

In [None]:
events_interface = mimic.events(after_date=AFTER_DATE)
events_interface.save_in_grouped_batches("./test_batches", ENCOUNTER_ID, int(5e6))

In [None]:
lens = []
value_counts = []
for i in range(66):
    df = load_dataframe("./test_batches_SAVE/" + f"batch_" + "{:04d}".format(i))
    lens.append(len(df))
    value_counts.append(df[ENCOUNTER_ID].value_counts())

In [None]:
value_counts

In [None]:
all_value_counts = pd.concat(value_counts)
all_value_counts.groupby(all_value_counts.index).count()

In [None]:
sum(lens)

In [None]:
df = load_dataframe("./test_batches_SAVE/batch_0001.parquet")
df

In [None]:
events_interface = mimic.events(after_date=AFTER_DATE, limit=10000)  # , limit=100000)
# events_query = events_interface.query
# events_query = select(events_query).limit(10000)

In [None]:
query = mimic.events(after_date=AFTER_DATE).query
query = qp.OrderBy(ENCOUNTER_ID)(query)
mimic.get_interface(query).run(limit=10)

In [None]:
from sqlalchemy import select

mimic.get_interface(select(query).offset(10)).run(limit=10)

In [None]:
q = (
    sess.query(Object)
    .yield_per(100)
    .options(lazyload("*"), joinedload(Object.some_related))
)

In [None]:
from sqlalchemy.orm import Session

query = mimic.events(after_date=AFTER_DATE).query

e = mimic._db.engine
sess = Session(e)
q = sess.query(query).yield_per(100).enable_eagerloads(False)

In [None]:
for i, data in enumerate(q):
    print(len(data))
    break

In [None]:
with e.connect() as conn:
    result = conn.execution_options(yield_per=100).execute(query)

    for partition in result.partitions():
        ## partition is an iterable that will be at most 100 items
        # for row in partition:
        #    print(f"{row}")
        print(len(partition))

In [None]:
s = 0
for data in events_interface.run_in_grouped_batches(5000, ENCOUNTER_ID):
    unique = data[ENCOUNTER_ID].unique()
    unique.sort()
    print(len(data), unique)
    s += len(data)
s

In [None]:
import sqlalchemy
from sqlalchemy import and_, func, select

from cyclops.query.mimic import get_interface
from cyclops.query.util import get_column

col = get_column(events_interface.query, ENCOUNTER_ID)

# table = select(col)

table = select(col, func.count(col).label("count")).group_by(col)

# col = get_column(table, ENCOUNTER_ID)
# table = table.order_by(col)

# table = qp.GroupByAggregate(ENCOUNTER_ID, {ENCOUNTER_ID: "count"})(table)
df = get_interface(table).run()

In [None]:
df = df.sort_values(ENCOUNTER_ID)
df

In [None]:
df = df.sort_values(ENCOUNTER_ID)
df["cumsum"] = df["count"].cumsum()
df

In [None]:
# 26067901, 515179

In [None]:
maximum = int(5e7)

max_count = df["count"].max()

if maximum < max_count:
    raise ValueError(f"Maximum must be at least {max_count}.")

In [None]:
def compute_query_dividers(query, id_col, maximum):
    # Compute the row count for each unique value
    col = get_column(query, id_col)
    table = select(col, func.count(col).label("count")).group_by(col)
    count_data = get_interface(table).run()

    # count_data = self.run_query(table)

    # Sort and create a cumulative sum of row counts
    count_data = count_data.sort_values(id_col)
    count_data["cumsum"] = count_data["count"].cumsum()

    # Create query dividers
    last_sum = 0
    cur_sum = 0
    dividers = []
    for i, s in enumerate(count_data["cumsum"].values):
        if s - last_sum > maximum:
            dividers.append(count_data[id_col].iloc[i - 1])
            last_sum = count_data["cumsum"].iloc[i - 1]

    return dividers

In [None]:
compute_dividers(events_interface.query, ENCOUNTER_ID, int(5e6))

In [None]:
minimum
maximum

In [None]:
import numpy as np


def get_dividers(series: pd.Series, partitions: int):
    ids = encounters[ENCOUNTER_ID]
    ids = ids.unique()
    ids.sort()

    split_points = np.linspace(0, 1, num=partitions - 1) * len(ids)
    split_points = split_points[:-1].astype(int)
    dividers = [ids[i] for i in split_points]
    return dividers

In [None]:
import sqlalchemy
from sqlalchemy import and_, func, select


def column_windows(session, column, windowsize):
    """Return a series of WHERE clauses against
    a given column that break it into windows.

    Result is an iterable of tuples, consisting of
    ((start, end), whereclause), where (start, end) are the ids.

    Requires a database that supports window functions,
    i.e. Postgresql, SQL Server, Oracle.

    Enhance this yourself !  Add a "where" argument
    so that windows of just a subset of rows can
    be computed.

    """

    def int_for_range(start_id, end_id):
        if end_id:
            return and_(column >= start_id, column < end_id)
        else:
            return column >= start_id

    intervals = get_dividers

    while intervals:
        start = intervals.pop(0)
        if intervals:
            end = intervals[0]
        else:
            end = None
        yield int_for_range(start, end)


def windowed_query(q, column, windowsize, engine):
    """ "Break a Query into windows on a given column."""

    for whereclause in column_windows(q.session, column, windowsize):
        yield pd.read_sql_query(
            select(q.filter(whereclause).order_by(column).subquery()), engine
        )
        # for row in q.filter(whereclause).order_by(column):
        #    yield row

In [None]:
import sqlalchemy
from sqlalchemy import and_, func, select


def column_windows(session, column, windowsize):
    """Return a series of WHERE clauses against
    a given column that break it into windows.

    Result is an iterable of tuples, consisting of
    ((start, end), whereclause), where (start, end) are the ids.

    Requires a database that supports window functions,
    i.e. Postgresql, SQL Server, Oracle.

    Enhance this yourself !  Add a "where" argument
    so that windows of just a subset of rows can
    be computed.

    """

    def int_for_range(start_id, end_id):
        if end_id:
            return and_(column >= start_id, column < end_id)
        else:
            return column >= start_id

    # q = session.query(
    #    column,
    #    func.row_number().over(partition_by=column, order_by=column).label('rownum')
    # ).from_self(column)

    # print("H")
    # table = select(col, func.count(col).label("count")).group_by(col)

    # print(len(pd.read_sql_query(table.subquery(), engine)))

    # print("H2")

    # return None

    if windowsize > 1:
        print("TEXT")
        print(sqlalchemy.text("rownum %% %d=1" % windowsize))
        q = q.filter(sqlalchemy.text("rownum %% %d=1" % windowsize))

    # print("\n")
    # print(pd.read_sql_query(q.subquery(), engine))

    # intervals = [id for id, in q]

    # Remove duplicates
    result = []
    for item in intervals:
        if item not in result:
            result.append(item)

    intervals = result

    print("intervals\n", intervals)
    print([intervals[i + 1] - intervals[i] for i in range(len(intervals) - 1)])
    print("\n")

    while intervals:
        start = intervals.pop(0)
        if intervals:
            end = intervals[0]
        else:
            end = None
        yield int_for_range(start, end)


def windowed_query(q, column, windowsize, engine):
    """ "Break a Query into windows on a given column."""

    for whereclause in column_windows(q.session, column, windowsize):
        yield pd.read_sql_query(
            select(q.filter(whereclause).order_by(column).subquery()), engine
        )
        # for row in q.filter(whereclause).order_by(column):
        #    yield row

In [None]:
from cyclops.query.mimic import _db

session = _db.session
engine = _db.engine

In [None]:
from cyclops.query.util import get_column

q = session.query(events_interface.query)
column = get_column(events_interface.query, ENCOUNTER_ID)

s = 0
num_encounters = 0
for data in windowed_query(q, column, 3, engine):
    print(len(data), len(data[ENCOUNTER_ID].unique()))
    s += len(data)
    num_encounters += len(data[ENCOUNTER_ID].unique())

print("Total length:", s)
print("Num encounters:", num_encounters)

In [None]:
import numpy as np

lens = []
for i in range(100):
    events_interface.clear_data()
    data = events_interface.run()
    lens.append(len(data[ENCOUNTER_ID].unique()))

lens = np.array(lens)
lens.mean()

In [None]:
len(data[ENCOUNTER_ID].unique())

In [None]:
data[ENCOUNTER_ID].min()

In [None]:
data[ENCOUNTER_ID].max()

In [None]:
# len(events_interface.run())

In [None]:
for df in events_interface.run_in_batches(10000):
    print(len(df))

In [None]:
for df in events_interface.run_in_grouped_batches(10000, ENCOUNTER_ID):
    print("LENGTH:", len(df))
    print(df[ENCOUNTER_ID])
    print("\n")

In [None]:
events_interface.save_in_grouped_batches("./test_batches", 10000, ENCOUNTER_ID)

In [None]:
events_interface.save_in_grouped_batches("./test_batches", 5e6, ENCOUNTER_ID)

In [None]:
from typing import Generator, List, Union

from sqlalchemy.sql.schema import Column

from cyclops.utils.batching import query_batch_conditions


@table_params_to_type(Select)
def windowed_query(
    self, query, column, window_size
) -> Generator[pd.DataFrame, None, None]:
    """ "Break a query into batches by segmenting a given column into value ranges.

    Parameters
    ----------
    query:

    """

    # Opportunity for multi-processing/parallelization here!
    for condition in query_batch_conditions(self.session, column, window_size):
        query = query.where(cond).subquery()
        yield pd.read_sql_query(run_query, self.engine)

In [None]:
session = mimic._db.session
session

In [None]:
engine = mimic._db.engine
engine

In [None]:
from sqlalchemy import select

from cyclops.query.util import get_column

query = events_interface.query

s = 0
for data in windowed_query(
    session.query(query), session, engine, get_column(query, ENCOUNTER_ID), int(1e7)
):
    print(len(data), len(data[ENCOUNTER_ID].unique()))
    s += len(data)
s

In [None]:
select(query)

In [None]:
events_interface

In [None]:
df = events_interface.run()
df

In [None]:
df[ENCOUNTER_ID].unique()

In [None]:
events

In [None]:
events_interface = mimic.events(after_date=AFTER_DATE)
events_query = events_interface.query
events_query = qp.Drop(["warning", "itemid", "storetime"])(events_query)
events_interface = mimic.get_interface(events_query)
events = events_interface.run()  # limit=1000000)

In [None]:
# Reverse the deidentified dating
events = pd.merge(
    encounters[[ENCOUNTER_ID, "anchor_year_difference"]], events, on=ENCOUNTER_ID
)


def add_offset(row):
    row[EVENT_TIMESTAMP] += pd.DateOffset(years=row["anchor_year_difference"])
    return row


events = events.apply(add_offset, axis=1)
events = events.drop("anchor_year_difference", axis=1)

In [None]:
# Create the target as a timeseries event
target_events = encounters[encounters[OUTCOME_DEATH] == True]
target_events = target_events[[ENCOUNTER_ID, "deathtime"]]
target_events = target_events.rename({"deathtime": EVENT_TIMESTAMP}, axis=1)
target_events[EVENT_NAME] = OUTCOME_DEATH
target_events[EVENT_CATEGORY] = TARGETS
target_events[EVENT_VALUE] = 1
target_events.head(5)

In [None]:
# Include target
events = pd.concat([events, target_events])

In [None]:
# Preprocessing
events[EVENT_NAME] = normalize_names(events[EVENT_NAME])
events[EVENT_CATEGORY] = normalize_categories(events[EVENT_CATEGORY])
# events[EVENT_VALUE] = normalize_values(events[EVENT_VALUE])

# Concatenate event name and category since some names are the same in
# different categories, e.g., 'flow' for categories 'heartware' and 'ecmo'
events[EVENT_NAME] = events[EVENT_CATEGORY] + " - " + events[EVENT_NAME]
events.head(5)

In [None]:
save_dataframe(events, "events.parquet")

In [None]:
sort by encounters
break between (not in the middle of) encounters

In [None]:
batch_over = ENCOUNTER_ID

In [None]:
batch_size = int(10e6)
batch_size

In [None]:
import random

import sqlalchemy
from sqlalchemy import Column, Integer, and_, create_engine, func, select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session

from cyclops.query import mimic
from cyclops.query.util import get_column


def column_windows(session, column, windowsize):
    """Return a series of WHERE clauses against
    a given column that break it into windows.

    Result is an iterable of tuples, consisting of
    ((start, end), whereclause), where (start, end) are the ids.

    Requires a database that supports window functions,
    i.e. Postgresql, SQL Server, Oracle.

    Enhance this yourself !  Add a "where" argument
    so that windows of just a subset of rows can
    be computed.

    """

    def int_for_range(start_id, end_id):
        if end_id:
            return and_(column >= start_id, column < end_id)
        else:
            return column >= start_id

    q = session.query(
        column, func.row_number().over(order_by=column).label("rownum")
    ).from_self(column)
    if windowsize > 1:
        q = q.filter(sqlalchemy.text("rownum %% %d=1" % windowsize))

    intervals = [id for id, in q]

    while intervals:
        start = intervals.pop(0)
        if intervals:
            end = intervals[0]
        else:
            end = None
        yield int_for_range(start, end)


def windowed_query(q, column, windowsize):
    """ "Break a Query into windows on a given column."""

    for whereclause in column_windows(q.session, column, windowsize):
        for row in q.filter(whereclause).order_by(column):
            yield row


e = mimic._db.engine
s = Session(e)

query = mimic.events(after_date=AFTER_DATE, limit=10000).query
q = s.query(query)

encounter_ids = []
for data in windowed_query(q, get_column(query, ENCOUNTER_ID), 1000):
    encounter_ids.append(data[0])

print(len(encounter_ids))
np.unique(encounter_ids)