# Persisted incremental DataSlice basics

In [None]:
# @title Imports
import os
import pytz
import tempfile

from koladata import kd
from koladata import kd_ext

In [None]:
# @title Utilities, e.g. for printing diagnostics

DataSliceManagerView = kd_ext.persisted_data.DataSliceManagerView
PersistedIncrementalDataSliceManager = (
    kd_ext.persisted_data.PersistedIncrementalDataSliceManager
)


def get_bag_name_to_size_on_disk_in_mb_dict(
    manager: PersistedIncrementalDataSliceManager,
) -> dict[str, float]:
  result = {}
  data_bag_manager = manager._data_bag_manager
  for bag_metadata in data_bag_manager._metadata.data_bag_metadata:
    bag_name = bag_metadata.name
    stat_result = os.stat(data_bag_manager._get_bag_filepath(bag_name))
    result[bag_name] = stat_result.st_size / 1024 / 1024
  return result


def format_percent(numerator, denominator) -> str:
  return f'{numerator / denominator * 100:.2f}%'


def print_diagnostics(
    manager: PersistedIncrementalDataSliceManager, heading: str
):
  bag_manager = manager._data_bag_manager
  available_bag_names = bag_manager.get_available_bag_names()
  loaded_bag_names = bag_manager.get_loaded_bag_names()
  approximate_available_triples = sum([
      bag_manager._read_bag_from_file(bag_name).get_approx_size()
      for bag_name in available_bag_names
  ])
  approximate_loaded_triples = sum([
      bag_manager.get_minimal_bag({bag_name}).get_approx_size()
      for bag_name in loaded_bag_names
  ])
  bag_name_to_size_on_disk_in_mb = get_bag_name_to_size_on_disk_in_mb_dict(
      manager
  )
  loaded_bags_size_on_disk_mb = sum([
      bag_name_to_size_on_disk_in_mb[bag_name] for bag_name in loaded_bag_names
  ])
  available_bags_size_on_disk_mb = sum([
      bag_name_to_size_on_disk_in_mb[bag_name]
      for bag_name in bag_manager.get_available_bag_names()
  ])
  print(heading)
  print('=' * len(heading))
  print(
      f'Managed bags: {len(available_bag_names)}   triples:'
      f' {approximate_available_triples}   size on disk:'
      f' {available_bags_size_on_disk_mb:.2f} Mb'
  )
  print(
      'Loaded bags: '
      f' {len(loaded_bag_names)} ({format_percent(len(loaded_bag_names), len(available_bag_names))})'
      '   triples:'
      f' {approximate_loaded_triples} ({format_percent(approximate_loaded_triples, approximate_available_triples)})'
      f'  size on disk: {loaded_bags_size_on_disk_mb:.2f} Mb'
      f' ({format_percent(loaded_bags_size_on_disk_mb, available_bags_size_on_disk_mb)})'
  )
  print()

## Creating a persisted incremental DataSlice

Consider the following plain Koda DataSlice:

In [None]:
query_schema = kd.named_schema('query')
new_query = query_schema.new
doc_schema = kd.named_schema('doc')
new_doc = doc_schema.new

query_ds = kd.slice([
    new_query(
        id=1,
        text='How high is the Eiffel tower',
        doc=kd.list([
            new_doc(id=10, title='Attractions of Paris', content='foo' * 10000)
        ]),
    ),
    new_query(
        id=2,
        text='How high is the empire state building',
        doc=kd.list([
            new_doc(
                id=11, title='Attractions of New York', content='bar' * 10000
            ),
            new_doc(
                id=12,
                title="The world's tallest buildings",
                content='baz' * 10000,
            ),
        ]),
    ),
])

It is simple to turn it into a version that is persisted to disk and whose data can be loaded incrementally on demand.

Persistence and incremental loading is useful for situations where the full data does not fit into RAM. All the data is persisted to disk, and only the parts that are needed get loaded into RAM.

The examples in this notebook all have tiny data, but the techniques shown here also apply to real-world applications where the data is large.

In [None]:
manager = PersistedIncrementalDataSliceManager(tempfile.mkdtemp())

root = DataSliceManagerView(manager)
root.query = query_ds.implode(ndim=-1), 'Added initial "query" data'

The assignment to `root.query` received the full query data `query_ds.implode(...)` in
memory, alongside a commit message. The manager decomposed the query data into smaller DataBags, persisted the bags,
and kept them in a cache. We clear the cache to make the memory footprint of
the manager tiny.

In [None]:
# @test {"output": "ignore"}

print_diagnostics(manager, 'After adding initial "query" data')

After adding initial "query" data
Managed bags: 10   triples: 30   size on disk: 0.01 Mb
Loaded bags:  10 (100.00%)   triples: 30 (100.00%)  size on disk: 0.01 Mb (100.00%)



In [None]:
# @test {"output": "ignore"}

manager.clear_cache()
print_diagnostics(manager, 'After clearing the cache')

After clearing the cache
Managed bags: 10   triples: 30   size on disk: 0.01 Mb
Loaded bags:  1 (10.00%)   triples: 0 (0.00%)  size on disk: 0.00 Mb (2.10%)



## Getting data from a persisted incremental DataSlice

At this point no data triples are loaded, but the manager always knows the schema of the data.

In [None]:
manager.get_schema()

DataItem(ENTITY(
  query=LIST[query(doc=LIST[doc(content=STRING, id=INT32, title=STRING)], id=INT32, text=STRING)],
), schema: SCHEMA)

In [None]:
root.get_schema()

DataItem(ENTITY(
  query=LIST[query(doc=LIST[doc(content=STRING, id=INT32, title=STRING)], id=INT32, text=STRING)],
), schema: SCHEMA)

Asking for data will load it into memory.

In [None]:
root.query[:].doc[:].id.get_data_slice()

DataSlice([[10], [11, 12]], schema: INT32)

In [None]:
# @test {"output": "ignore"}

print_diagnostics(manager, 'After getting the DataSlice with doc ids')

After getting the DataSlice with doc ids
Managed bags: 10   triples: 30   size on disk: 0.01 Mb
Loaded bags:  6 (60.00%)   triples: 16 (53.33%)  size on disk: 0.00 Mb (23.29%)



By default, it loads only stubs of the data.

In [None]:
# @test {"output": "ignore"}

root.query[:].doc[:].get_data_slice()

DataSlice([
  [Entity():$002UhSVy7ootLyoijpwCQS],
  [Entity():$002UhSVy7ootLyoijpwCQV, Entity():$002UhSVy7ootLyoijpwCQW],
], schema: doc())

You can ask explicitly to populate the descendants.

In [None]:
root.query[:].doc[:].get_data_slice(with_descendants=True)

DataSlice([
  [
    Entity(
      content='foofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofo'...'oofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoo',
      id=10,
      title='Attractions of Paris',
    ),
  ],
  [
    Entity(
      content='barbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarba'...'arbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbar',
      id=11,
      title='Attractions of New York',
    ),
    Entity(
      content='bazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazba'...'azbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazba

In [None]:
# @test {"output": "ignore"}

print_diagnostics(manager, 'After getting all doc data')

After getting all doc data
Managed bags: 10   triples: 30   size on disk: 0.01 Mb
Loaded bags:  8 (80.00%)   triples: 24 (80.00%)  size on disk: 0.01 Mb (91.52%)



You can also ask for the data of the ancestors up to the root. What you get is a minimal skeleton of the ancestors. For example, in the next cell you do not get the query text.

In [None]:
root.query[:].doc[:].get_data_slice(with_ancestors=True, with_descendants=True)

DataItem(Entity(
  query=List[
    Entity(
      doc=List[
        Entity(
          content='foofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofo'...'oofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoo',
          id=10,
          title='Attractions of Paris',
        ),
      ],
    ),
    Entity(
      doc=List[
        Entity(
          content='barbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarba'...'arbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbarbar',
          id=11,
          title='Attractions of New York',
        ),
        Entity(
          content='bazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazbazba

In [None]:
# @test {"output": "ignore"}

print_diagnostics(manager, 'After getting all doc data and the minimal skeleton of doc ancestors up to the root')

After getting all doc data and the minimal skeleton of doc ancestors up to the root
Managed bags: 10   triples: 30   size on disk: 0.01 Mb
Loaded bags:  8 (80.00%)   triples: 24 (80.00%)  size on disk: 0.01 Mb (91.52%)



## Updating a persisted incremental DataSlice

It is simple to add or override data.

In [None]:
query = root.query[:]

query.locale = (
    kd.slice(['en-GB', 'en-US']),
    'Added locale information to queries',
)

query.text = (
    kd.slice(['How high is the Eiffel tower in Paris', None])
    | query.text.get_data_slice(),
    'Updated the text of the first query about the Eiffel tower',
)

In [None]:
query.locale.get_data_slice()

DataSlice(['en-GB', 'en-US'], schema: STRING)

In [None]:
query.text.get_data_slice()

DataSlice(['How high is the Eiffel tower in Paris', 'How high is the empire state building'], schema: STRING)

## Looking at the revision history

In [None]:
# @test {"output": "ignore"}

manager.get_action_history(tz=pytz.timezone('Europe/Zurich'))

[CreationMetadata(description='Initial state with an empty root DataSlice', timestamp='2025-09-19 13:15:19 CEST'),
 AttributeUpdateMetadata(description='Added initial "query" data', timestamp='2025-09-19 13:15:19 CEST', at_path='', attr_name='query'),
 AttributeUpdateMetadata(description='Added locale information to queries', timestamp='2025-09-19 13:19:56 CEST', at_path='.query[:]', attr_name='locale'),
 AttributeUpdateMetadata(description='Updated the text of the first query about the Eiffel tower', timestamp='2025-09-19 13:19:56 CEST', at_path='.query[:]', attr_name='text')]

## Branching

Any manager can be branched. Branches are useful for rolling back updates, for what-if experiments, and for filtered versions of the data.

### Branching to roll back updates

In [None]:
query.text.get_data_slice()

DataSlice(['How high is the Eiffel tower in Paris', 'How high is the empire state building'], schema: STRING)

In [None]:
before_updating_query_text = manager.branch(
    tempfile.mkdtemp(),
    # Note that we pass action_history_index=2. The action with index 2 in
    # manager.get_action_history(...) from above has a description that says:
    # "Added locale information to queries". We will create a branch on top of
    # that, i.e. the branch will contain the locale information, but it will not
    # include the effect of the action with index 3 (whose description is
    # "Updated the text of the first query about the Eiffel tower").
    action_history_index=2,
    description='Branch to roll back state',
)

In [None]:
root_before_updating_query_text = DataSliceManagerView(before_updating_query_text)

root_before_updating_query_text.query[:].text.get_data_slice()

DataSlice(['How high is the Eiffel tower', 'How high is the empire state building'], schema: STRING)

Updates to the branch do not affect the original manager, and vice versa. The two managers evolve independently.

In [None]:
root_before_updating_query_text.query[:].text = (
    kd.slice(['How heavy is the Eiffel Tower', None])
    | root_before_updating_query_text.query[:].text.get_data_slice(),
    'Updated the text of the first query to be about weight',
)

In [None]:
root_before_updating_query_text.query[:].text.get_data_slice()

DataSlice(['How heavy is the Eiffel Tower', 'How high is the empire state building'], schema: STRING)

In [None]:
root.query[:].text.get_data_slice()

DataSlice(['How high is the Eiffel tower in Paris', 'How high is the empire state building'], schema: STRING)

### Branching to filter data

Creating a branch for filtering data is useful because then the filtering updates do not update the original data/manager.

In [None]:
branch_manager = manager.branch(
    tempfile.mkdtemp(), description='Branch to filter data'
)

branch_root = DataSliceManagerView(branch_manager)
branch_query = branch_root.query[:]
branch_doc = branch_query.doc[:]

doc_mask = kd.strings.contains(branch_doc.title.get_data_slice(), 'Paris')
filtered_docs = branch_doc.get_data_slice().select(doc_mask)
query_mask = kd.agg_any(kd.has(filtered_docs))
filtered_queries = branch_query.get_data_slice().select(query_mask)

branch_query.doc = (
    filtered_docs.implode(),
    'Filtered the docs to keep only ones with "Paris" in the title',
)
branch_root.query = (
    filtered_queries.implode(),
    (
        'Filtered the queries to keep only those with at least one doc with'
        ' "Paris" in the title'
    ),
)

The filtering code above loaded only the data that was needed to perform the filtering.

In [None]:
# @test {"output": "ignore"}

print_diagnostics(branch_manager, 'After filtering')

After filtering
Managed bags: 16   triples: 45   size on disk: 0.01 Mb
Loaded bags:  10 (62.50%)   triples: 25 (55.56%)  size on disk: 0.00 Mb (32.81%)



In [None]:
branch_root.get_data_slice(with_descendants=True)

DataItem(Entity(
  query=List[
    Entity(
      doc=List[
        Entity(
          content='foofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofo'...'oofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoo',
          id=10,
          title='Attractions of Paris',
        ),
      ],
      id=1,
      locale='en-GB',
      text='How high is the Eiffel tower in Paris',
    ),
  ],
), schema: ENTITY(
  query=LIST[query(
    doc=LIST[doc(content=STRING, id=INT32, title=STRING)],
    id=INT32,
    locale=STRING,
    text=STRING,
  )],
))

In [None]:
for index, action in enumerate(
    branch_manager.get_action_history(tz=pytz.timezone('Europe/Zurich'))
):
  print(f'{index}. {type(action).__name__}: {action.description}')

0. BranchMetadata: Branch to filter data
1. AttributeUpdateMetadata: Filtered the docs to keep only ones with "Paris" in the title
2. AttributeUpdateMetadata: Filtered the queries to keep only those with at least one doc with "Paris" in the title


## Views are aware of the full schema

Each view knows about its parent (unless it is the root), its children and the root.

In [None]:
assert query.get_grandparent() == root

In [None]:
for child in query.get_children():
  print(child.get_path_from_root())
  assert child.get_parent() == query

.query[:].doc
.query[:].id
.query[:].locale
.query[:].text


In [None]:
doc = query.doc[:]

assert doc.title.get_parent() == doc

Operations on views are checked immediately against the schema. Errors are flagged immediately.

In [None]:
try:
  root[:]
except ValueError as e:
  print(e)

invalid data slice path: '[:]'


In [None]:
try:
  query.foo
except ValueError as e:
  print(e)

invalid data slice path: '.query[:].foo'


Views give you the ability to talk about an entire feature/column. Views do not support operations that involve anything less than that. Instead, such operations have to be performed on the DataSlices, i.e. after calling `view.get_data_slice()`.

In [None]:
try:
  root.query[0]
except ValueError as e:
  print(e)

only the [:] syntax is supported; got a request for [0]


In [None]:
root.query.get_data_slice(with_descendants=True)[0]

DataItem(Entity(
  doc=List[
    Entity(
      content='foofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofo'...'oofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoofoo',
      id=10,
      title='Attractions of Paris',
    ),
  ],
  id=1,
  locale='en-GB',
  text='How high is the Eiffel tower in Paris',
), schema: query(
  doc=LIST[doc(content=STRING, id=INT32, title=STRING)],
  id=INT32,
  locale=STRING,
  text=STRING,
))

When you type `view.<tab>` in a notebook, auto-complete will suggest a schema-aware list of methods.

Colab has some limitations: after long chains of operations on a view, it may not give the proper completions anymore. Saving the long chain in a variable seems to solve that. Auto-complete will suggest good completions for the variable and short chains of operations on it.

## Loading data sequentially and in parallel

In [None]:
def get_data_sequential(
    manager: PersistedIncrementalDataSliceManager,
) -> dict[str, kd.types.DataSlice]:
  root = DataSliceManagerView(manager)
  query = root.query[:]
  doc = query.doc[:]

  return dict(
      # The calls to get_data_slice() here are run in sequence. However, the
      # index structure of queries and docs is cached across the calls.
      query_text=query.text.get_data_slice(),
      doc_title=doc.title.get_data_slice(),
      doc_id=doc.id.get_data_slice(),
  )


def convert_to_pure_python(data: dict[str, kd.types.DataSlice]):
  return kd.new(**data).flatten().to_py(obj_as_dict=True)

In [None]:
manager.clear_cache()

In [None]:
# @test {"output": "ignore"}

%%time

data_sequential = get_data_sequential(manager)

CPU times: user 5.48 ms, sys: 1.15 ms, total: 6.62 ms
Wall time: 6.07 ms


In [None]:
def get_data_parallel(
    manager: PersistedIncrementalDataSliceManager,
) -> dict[str, kd.types.DataSlice]:
  root = DataSliceManagerView(manager)
  query = root.query[:]
  doc = query.doc[:]

  needed_views = {
      'query_text': query.text,
      'doc_title': doc.title,
      'doc_id': doc.id,
  }
  needed_paths = {
      name: view.get_path_from_root() for name, view in needed_views.items()
  }
  # Here is the crux of parallel loading: we ask the manager directly for a
  # DataSlice that is populated with an arbitrary set of features. The manager
  # will load the needed bags from disk in parallel.
  ds = manager.get_data_slice(populate=needed_paths.values())
  # We can use manager.get_data_slice(populate=needed_paths.values()) as a
  # command to preload data in parallel, i.e. discard its return value and now
  # do `return get_data_sequential(manager)`. But getting the data directly from
  # the populated DataSlice is arguably more elegant:
  return {name: path.evaluate(ds) for name, path in needed_paths.items()}

Because our data is so small, the results are very noisy, and the parallel loading isn't always faster. But on bigger examples the difference is much more pronounced.

In [None]:
manager.clear_cache()

In [None]:
# @test {"output": "ignore"}

%%time

data_parallel = get_data_parallel(manager)

CPU times: user 4.3 ms, sys: 2.63 ms, total: 6.93 ms
Wall time: 5.48 ms


In [None]:
assert convert_to_pure_python(data_sequential) == convert_to_pure_python(
    data_parallel
)

## Restrictions

PersistedIncrementalDataSliceManager indexes the data by considering only its schema. Roughly speaking, it is a column-based storage abstraction.

The use of kd.OBJECT schema is banned in incremental data, because the schema
(kd.OBJECT) does not contain the information about the structure of the data that is needed for proper indexing.

The next example shows how OBJECTs can hide information about their true character: based on the schema alone, it is not possible to infer that the update does not only add the attribute `root.naughty_obj`, but also `root.foo`. Updates that use kd.OBJECT somewhere are always rejected outright.

In [None]:
try:
  root.naughty_obj = (
      kd.obj(root.get_data_slice().with_attrs(foo=123)),
      'Trying to add an OBJECT',
  )
except ValueError as e:
  print(e)

OBJECT schemas are not supported. Please use a structured schema, or remove the data, or serialize it and attach it as BYTES data instead


A somewhat unfortunate consequence of banning OBJECT is that Koda functors can only be stored in serialized form in incremental DataSlices.

Additionally, the manager enforces a complete separation between data and schema. That is needed to make its reasoning about updates, which is based only on the schema of the data, consistent with the behavior of analogous updates in standard Koda DataSlices (i.e. non-incremental DataSlices).

In particular, schemas cannot be stored inside data, i.e. kd.SCHEMA is banned. The next example shows why: not only does it add an attribute `root.my_naughty_schema` with schema `kd.SCHEMA`, but it also covertly adds an attribute `root.foo`:

In [None]:
try:
  root.my_naughty_schema = root.get_schema().with_attr('foo', kd.INT32)
except ValueError as e:
  print(e)

SCHEMA schemas are not supported. Please remove it and try again. If you truly want to include data with schema SCHEMA, then consider serializing it and attaching the resulting BYTES data instead


The separation between data and schema is complete. Not only can data not include SCHEMAs, but SCHEMAs cannot include data.

In particular, non-primitive data cannot be piggy-backed on schemas (schema metadata, to be precise). The next example shows why: not only does the update add an attribute `root.my_naughty_data`, but it also covertly adds an attribute `foo` to the first query.

In [None]:
first_query = query.get_data_slice().S[0]

innocent_looking_schema = kd.named_schema('i_am_innocent')
innocent_looking_schema = kd.with_metadata(
    innocent_looking_schema, covert_data=first_query.with_attrs(foo='bar')
)

try:
  root.my_naughty_data = (
      innocent_looking_schema.new(a=123),
      'Covert data updates are banned',
  )
except ValueError as e:
  print(e)

schema i_am_innocent(a=INT32, __schema_metadata__=Obj(covert_data=Entity(foo='bar'))) has metadata attributes that are not primitives


## Aliases and graph data

Although persisted incremental DataSlices are rooted, they do not have to be trees. Aliases and graph data are fully supported.

In [None]:
def aliasing_example():
  e = kd.new(x=kd.new(z=1))

  manager = PersistedIncrementalDataSliceManager(tempfile.mkdtemp())
  root = DataSliceManagerView(manager)

  root.e1 = e, 'Added data for e1'
  root.e2 = e, 'Made e2 an alias for e1'
  root.e3 = root.e2.get_data_slice(), 'Made e3 an alias for e2'
  root.e4 = e.stub(), 'Yet another alias'

  e1 = root.e1
  e2 = root.e2
  e3 = root.e3
  e4 = root.e4

  # Next, we add a new feature to e1. It is implicitly added to e2, e3 and e4 as
  # well because of the aliasing.
  e1.y = kd.item(2)

  for view in [e1, e2, e3, e4]:
    assert view.y.get_data_slice().to_py() == 2

  # Starting from a state where nothing is loaded, the manager still knows
  # that "y" is a descendant of e1, e2, e3, and e4.
  manager.clear_cache()
  for view in [e1, e2, e3, e4]:
    assert view.y.get_data_slice().to_py() == 2

  # The same is also true for completely new instances that read the state from
  # the persistence directory.
  new_manager = PersistedIncrementalDataSliceManager(
      manager.get_persistence_directory()
  )
  new_root = DataSliceManagerView(new_manager)
  assert new_root.e4.y.get_data_slice().to_py() == 2


aliasing_example()

In [None]:
def graph_data_example():
  graph_node_schema = kd.named_schema(
      'GraphNode',
      label=kd.STRING,
      outgoing_edges=kd.list_schema(kd.named_schema('GraphNode')),
  )
  node1 = graph_node_schema.new(label='node1')
  node2 = graph_node_schema.new(label='node2', outgoing_edges=kd.list([node1]))
  node1 = node1.updated(kd.attrs(node1, outgoing_edges=kd.list([node2])))

  kd.testing.assert_deep_equivalent(
      node1.outgoing_edges[:].outgoing_edges[:].flatten(),
      kd.slice([node1]),
  )

  manager = PersistedIncrementalDataSliceManager(tempfile.mkdtemp())
  root = DataSliceManagerView(manager)

  root.graph_nodes = (
      kd.list([node1, node2]),
      'Populated graph_nodes with node1 and node2',
  )
  kd.testing.assert_deep_equivalent(
      root.graph_nodes[:]
      .outgoing_edges[:]
      .outgoing_edges[:]
      .label.get_data_slice()
      .flatten(),
      kd.slice(['node1', 'node2']),
  )
  kd.testing.assert_deep_equivalent(
      root.graph_nodes[:].outgoing_edges[:].label.get_data_slice().flatten(),
      kd.slice(['node2', 'node1']),
  )

  graph_nodes = root.graph_nodes[:]
  outgoing_edges_lists = graph_nodes.outgoing_edges
  graph_nodes.outgoing_edges = (
      kd.reverse(outgoing_edges_lists.get_data_slice()),
      'Changed both outgoing edges into self-edges by swapping the lists',
  )
  kd.testing.assert_deep_equivalent(
      root.graph_nodes[:].outgoing_edges[:].label.get_data_slice().flatten(),
      kd.slice(['node1', 'node2']),
  )


graph_data_example()