Skip to content

Commit

Permalink
fixes for bugs associating display IDs across cells (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
shouples committed Dec 14, 2022
1 parent ed9e1e7 commit c516580
Show file tree
Hide file tree
Showing 8 changed files with 456 additions and 206 deletions.
1 change: 1 addition & 0 deletions src/dx/comms/assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def handle_assignment_comm(msg: dict, ipython_shell: Optional[InteractiveShell]
display_id=data["display_id"],
sql_filter=sql_filter,
filters=filters,
assign_subset=False,
)

ipython = ipython_shell or get_ipython()
Expand Down
26 changes: 19 additions & 7 deletions src/dx/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from dx.sampling import get_df_dimensions
from dx.settings import get_settings, settings_context
from dx.types.filters import DEXFilterSettings, DEXResampleMessage
from dx.utils.tracking import DXDF_CACHE, SUBSET_TO_DISPLAY_ID, generate_df_hash, get_db_connection
from dx.utils.tracking import (
DXDF_CACHE,
SUBSET_HASH_TO_PARENT_DATA,
generate_df_hash,
get_db_connection,
)

logger = structlog.get_logger(__name__)
db_connection = get_db_connection()
Expand Down Expand Up @@ -49,6 +54,8 @@ def resample_from_db(
display_id: str,
sql_filter: str,
filters: Optional[list] = None,
cell_id: Optional[str] = None,
assign_subset: bool = True,
ipython_shell: Optional[InteractiveShell] = None,
) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -85,12 +92,16 @@ def resample_from_db(
col = ", ".join(col)
new_df[col] = new_df[col].astype(dtype)

# this is associating the subset with the original dataframe,
# which will be checked when the DisplayFormatter.format() is called
# during update_display(), which will prevent re-registering the display ID to the subset
new_df_hash = generate_df_hash(new_df)
logger.debug(f"assigning subset {new_df_hash} to {display_id=}")
SUBSET_TO_DISPLAY_ID[new_df_hash] = display_id
if assign_subset:
# this is associating the subset with the original dataframe,
# which will be checked when the DisplayFormatter.format() is called
# during update_display(), which will prevent re-registering the display ID to the subset
new_df_hash = generate_df_hash(new_df)
logger.debug(f"assigning subset {cell_id}+{new_df_hash} to {display_id=}")
SUBSET_HASH_TO_PARENT_DATA[new_df_hash] = {
"cell_id": cell_id,
"display_id": display_id,
}

return new_df

Expand All @@ -106,6 +117,7 @@ def handle_resample(
"display_id": msg.display_id,
"sql_filter": f"SELECT * FROM {{table_name}} LIMIT {sample_size}",
"filters": raw_filters,
"cell_id": msg.cell_id,
}

if raw_filters:
Expand Down
49 changes: 41 additions & 8 deletions src/dx/formatters/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
normalize_index_and_columns,
to_dataframe,
)
from dx.utils.tracking import DXDF_CACHE, SUBSET_TO_DISPLAY_ID, DXDataFrame, get_db_connection
from dx.utils.tracking import DXDF_CACHE, SUBSET_HASH_TO_PARENT_DATA, DXDataFrame, get_db_connection

logger = structlog.get_logger(__name__)
db_connection = get_db_connection()
Expand All @@ -44,13 +44,7 @@ def datalink_processing(
extra_metadata: Optional[dict] = None,
):
dxdf = DXDataFrame(df)

parent_display_id = SUBSET_TO_DISPLAY_ID.get(dxdf.hash)
if parent_display_id is None:
DXDF_CACHE[dxdf.display_id] = dxdf
else:
logger.debug(f"df is subset of existing {parent_display_id=}")

parent_display_id = determine_parent_display_id(dxdf)
payload, metadata = format_output(
dxdf.df,
update=parent_display_id,
Expand Down Expand Up @@ -218,6 +212,45 @@ def format_output(
return (payload, metadata)


def determine_parent_display_id(dxdf: DXDataFrame) -> Optional[str]:
"""
Before rendering a DataFrame, we need to check and see if this is the result
of a resample request, which will appear as the same display ID and cell ID
used to format the previous/original dataframe that we see here, which is used
to update an existing display handler.
- If the hash is the same, but the cell ID is different, we're executing in a different
cell and should use a new (DXDataFrame-generated) display ID.
- If the hash is different and found in SUBSET_HASH_TO_PARENT_DATA, we have a resample request
result that's rendering a smaller subset of the original dataframe, and will
update the existing display handler based on display ID.
- If the hash is different and is *not* found in SUBSET_HASH_TO_PARENT_DATA, we have
a new dataframe altogether, which should trigger a new output.
"""
parent_dataset_info = SUBSET_HASH_TO_PARENT_DATA.get(dxdf.hash, {})

parent_display_id = parent_dataset_info.get("display_id")
no_parent_id = parent_display_id is None
logger.debug(f"{dxdf.display_id=} & {parent_display_id=}")
if no_parent_id:
DXDF_CACHE[dxdf.display_id] = dxdf
else:
logger.debug(f"df is subset of existing {parent_display_id=}")

parent_cell_id = parent_dataset_info.get("cell_id")
different_cell_output = parent_cell_id != dxdf.cell_id
logger.debug(f"{dxdf.cell_id=} & {parent_cell_id=}")
if different_cell_output and parent_display_id is not None:
logger.debug(
f"disregarding {parent_display_id=} and using {dxdf.display_id=} since this is a new cell_id",
parent_cell_id=parent_cell_id,
cell_id=dxdf.cell_id,
)
# doesn't matter if this dataset was associated with another,
# we shouldn't be re-rendering the display ID from another cell ID
parent_display_id = None
return parent_display_id


def dev_display(payload, metadata):

from IPython.display import JSON, display
Expand Down
41 changes: 41 additions & 0 deletions src/dx/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pandas as pd
import structlog
from IPython import get_ipython
from IPython.core.interactiveshell import InteractiveShell
from pandas import set_option as pandas_set_option
from pydantic import BaseSettings, validator
Expand Down Expand Up @@ -218,10 +219,50 @@ def set_option(
if key == "LOG_LEVEL":
set_log_level(value)

# allow enabling/disabling comms based on settings
enable_disable_comms(
setting_name=key,
enabled=value,
ipython_shell=ipython_shell,
)

return
raise ValueError(f"`{key}` is not a valid setting")


def enable_disable_comms(
setting_name: str,
enabled: bool,
ipython_shell: Optional[InteractiveShell] = None,
) -> None:
"""
Registers/unregisters a target based on its associated name within Settings.
For example, the following will unregister the "datalink_resample" comm:
>>> enable_disable_comms("ENABLE_DATALINK", False)
And to re-register it:
>>> enable_disable_comms("ENABLE_DATALINK", True)
"""
from dx import comms

comm_setting_targets = {
"ENABLE_DATALINK": ("datalink_resample", comms.resample.resampler),
"ENABLE_RENAMER": ("rename", comms.rename.renamer),
"ENABLE_ASSIGNMENT": ("datalink_assignment", comms.assignment.dataframe_assignment),
}
if setting_name not in comm_setting_targets:
return

ipython_shell = ipython_shell or get_ipython()
if getattr(ipython_shell, "kernel", None) is None:
return

comm_target, comm_callback = comm_setting_targets[setting_name]
if enabled:
ipython_shell.kernel.comm_manager.register_target(comm_target, comm_callback)
else:
ipython_shell.kernel.comm_manager.unregister_target(comm_target, comm_callback)


@contextmanager
def settings_context(ipython_shell: Optional[InteractiveShell] = None, **option_kwargs):
global settings
Expand Down
24 changes: 20 additions & 4 deletions src/dx/utils/tracking.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import os
import uuid
from functools import lru_cache
from typing import List, Optional, Union
Expand All @@ -19,10 +20,8 @@

# should be (display_id: DXDataFrame) pairs
DXDF_CACHE = {}
# not currently used -- will be needed to disambiguate subsets across different cells
CELL_ID_TO_DISPLAY_ID = {}
# used to track when a filtered subset should be tied to an existing display ID
SUBSET_TO_DISPLAY_ID = {}
SUBSET_HASH_TO_PARENT_DATA = {}


@lru_cache
Expand Down Expand Up @@ -68,7 +67,9 @@ def __init__(

self.df = normalize_index_and_columns(df)
self.hash = generate_df_hash(self.df)
self.display_id = SUBSET_TO_DISPLAY_ID.get(self.hash, str(uuid.uuid4()))

self.cell_id = self.get_cell_id()
self.display_id = self.get_display_id()

self.metadata: dict = generate_metadata(
df=self.df,
Expand All @@ -86,6 +87,21 @@ def __repr__(self):
)
return f"<DXDataFrame {attr_str}>"

def get_cell_id(self) -> str:
last_executed_cell_id = os.environ.get("LAST_EXECUTED_CELL_ID")
cell_id = SUBSET_HASH_TO_PARENT_DATA.get(self.hash, {}).get(
"cell_id", last_executed_cell_id
)
logger.debug(f"{last_executed_cell_id=} / last associated {cell_id=}")
return cell_id

def get_display_id(self) -> str:
display_id = SUBSET_HASH_TO_PARENT_DATA.get(self.hash, {}).get(
"display_id", str(uuid.uuid4())
)
logger.debug(f"{display_id=}")
return display_id


def generate_df_hash(df: pd.DataFrame) -> str:
"""
Expand Down
3 changes: 3 additions & 0 deletions tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def test_assignment_handled(
"display_id": display_id,
"sql_filter": f"SELECT * FROM {{table_name}} LIMIT {sample_size}",
"filters": [],
"assign_subset": False,
}
mock_resample.assert_called_once_with(**resample_params)
assert "new_df" in get_ipython.user_ns
Expand Down Expand Up @@ -199,6 +200,7 @@ def test_assignment_handled_with_filters(
"display_id": display_id,
"sql_filter": sql_filter,
"filters": filters,
"assign_subset": False,
}
mock_resample.assert_called_once_with(**resample_params)
assert "new_df" in get_ipython.user_ns
Expand Down Expand Up @@ -239,6 +241,7 @@ def test_assignment_handled_with_existing_variable(
"display_id": display_id,
"sql_filter": f"SELECT * FROM {{table_name}} LIMIT {sample_size}",
"filters": [],
"assign_subset": False,
}
mock_resample.assert_called_once_with(**resample_params)

Expand Down
Loading

0 comments on commit c516580

Please sign in to comment.