# Imbed DOG

In [2]:
from imbed.imbed_project import get_local_mall
from i2 import AttributeMapping
# TODO: Wrap in Iterable SimpleNamespace (from types import SimpleNamespace)
mall = AttributeMapping(**get_local_mall('test'))
list(mall)

['misc', 'segments', 'embeddings', 'clusters', 'planar_embeddings']

In [3]:
mall['segments']['hi'] = ['hello', 'world']
assert mall['segments']['hi'] == ['hello', 'world']

In [None]:
from vd.dog import DOG

DOG

In [4]:
from imbed import imbed_project

mall = imbed_project.get_mall(
        'dog_tests', get_project_mall=imbed_project.get_local_mall
    )

In [5]:
list(mall)

['misc',
 'segments',
 'embeddings',
 'clusters',
 'planar_embeddings',
 'segmenters',
 'embedders',
 'clusterers',
 'planarizers',
 'segmenters_signatures',
 'embedders_signatures',
 'clusterers_signatures',
 'planarizers_signatures']

In [7]:
mall.segments.rootdir

'/Users/thorwhalen/.config/imbed/projects/spaces/dog_tests/stores/segments/'

In [9]:
mall.segments['test'] = ['hello world', 'how are you?']
list(mall.segments)

['segments_2', 'test', 'segments_1']

# Dispatching a mesh

A mesh is a DAG (acyclidc directed graph) that relates functions to other functions via their inputs and outputs. Dispatching a mesh has to do with wrapping it or transforming it into an object that will use it to operate. 

Here, we will consider a simple, yet real life, DAG, and transform it iteratively to enable the simple DAG to operate differently.

In [None]:
import tonal

In [None]:
from typing import Sequence, NewType, KT, Tuple, MutableMapping, Callable, Iterable

Segment = NewType("Segment", str)
Embedding = NewType("Embedding", Sequence[float])
PlanarVector = Tuple[float, float]
ClusterIndex = NewType("ClusterIndex", int)

Segments = Iterable[Segment]
Embeddings = Iterable[Embedding]
PlanarVectors = Iterable[PlanarVector]
ClusterIndices = Iterable[ClusterIndex]

Embedder = Callable[[Segments], Embeddings]
Planarizer = Callable[[Embeddings], PlanarVectors]
Clusterer = Callable[[Embeddings], ClusterIndices]




In [None]:
class Null:
    def __getattr__(self, name): return self
    def __setattr__(self, name, value): pass
    def __getitem__(self, key): return self
    def __setitem__(self, key, value): pass
    def __call__(self, *args, **kwargs): return self
    def __repr__(self): return "Null()"

null = Null()

mk_mesh_for_funcs = null


from collections.abc import MutableMapping

class AttrMapping(MutableMapping):
    def __init__(self, mapping):
        self._mapping = mapping  # no dict() copy

    def __getitem__(self, key): return self._mapping[key]
    def __setitem__(self, key, value): self._mapping[key] = value
    def __delitem__(self, key): del self._mapping[key]
    def __iter__(self): return iter(self._mapping)
    def __len__(self): return len(self._mapping)

    def __getattr__(self, key):
        if key in self._mapping and key.isidentifier() and not hasattr(MutableMapping, key):
            return self._mapping[key]
        raise AttributeError(f"No such attribute: {key}")

    def __setattr__(self, key, value):
        if key.startswith("_") or not key.isidentifier() or hasattr(MutableMapping, key):
            super().__setattr__(key, value)
        else:
            self._mapping[key] = value

    def __dir__(self):
        return list(super().__dir__()) + [
            k for k in self._mapping
            if k.isidentifier() and not hasattr(MutableMapping, k)
        ]
    


In [None]:
from typing import Callable
from functools import partial 
\
from dol import Pipe 

vectorize = lambda func: Pipe(partial(map, func), list)
# f = vectorize(lambda x: 2 * x)
# f([1, 2, 3])  # [2, 4, 6]


funcs = {
    'embedder': Callable[[Segments], Embeddings],
    'planarizer': Callable[[Embeddings], PlanarVectors],
    'clusterer': Callable[[Embeddings, int], ClusterIndices],
}
val_stores = {
    'segments': {
        'type': Segments,
        'store': {'segments_1': ['segment1', 'segment2', 'segment3'],
                  'segments_2': ['segment4', 'segment5']
        },
    },
    'embeddings': {
        'type': Embeddings,
        'store': dict(),
    },
    'planar_vectors': {
        'type': PlanarVectors,
        'store': dict(),
    },
    'cluster_indices': {
        'type': ClusterIndices,
        'store': dict(),
    },
}
func_stores = {
    'embedder': {
        'name': 'embedders',
        'store': {
            'constant': lambda segments: vectorize(lambda s: [1, 2, 3])(segments),
            'segment_based': lambda segments: vectorize(lambda s: [len(s), 0.5, 0.5])(segments),
        },
    },
    'planarizer': {
        'name': 'planarizers',
        'store': {
            'constant': lambda embeddings: vectorize(lambda e: (e[0], e[1]))(embeddings),
            'embedding_based': lambda embeddings: vectorize(lambda e: (e[0] * 0.5, e[1] * 0.5))(embeddings),
        },
    },
    ...
}



mesh = mk_mesh_for_funcs(
    funcs=funcs, val_stores=val_stores, func_stores=func_stores
)


list(mesh.stores)
# ['segments', 'embeddings', 'planar_vectors', 'cluster_indices']

mesh.stores['segments'] == mesh.stores['segments']  

sorted(mesh.stores['segments'])  # list the segments (keys)
# ['segments_1', 'segments_2']

# You also have a store of functions:
list(mesh.func)
# ['embedder', 'planarizer', 'clusterer']

list(mesh.func['embedder'])  # list the embedders
# ['constant', 'segment_based', ...]


# write to the segments store (save some segments)
mesh.stores['segments']['segments_3'] = ['segment6', 'segment7']

# call an embedder function on a value of segments
output_store_key, output_val_key = mesh.call(
    mesh.func['embedder']['constant'],
    mesh.stores['segments']['segments_3']
)

# You don't get the output: You get the reference where you can find it
output_store = mesh.stores[output_store_key]
output_val = output_store[output_val_key]
assert output_val == [[1, 2, 3], [1, 2, 3]]  # because segments_3 has two segments





KeyboardInterrupt: 

In [24]:
[[1,2,3]] * 4

[[1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3]]

In [22]:
import dol

t = AttrDict(dol.Files('~/tmp'))


Get:



In [None]:
def embedder(segments: Segments) -> Embeddings:
    """
    Embed the given segments into a sequence of embeddings.
    """

def planarizer(embeddings: Embeddings) -> PlanarVectors:
    """
    Convert the given embeddings into planar vectors.
    """

def clusterer(embeddings: Embeddings, n_clusters: int) -> ClusterIndices:
    """
    Cluster the given embeddings into a sequence of cluster indices.
    """





# DOG

In [2]:
import pytest
from typing import Callable, Any, Dict, List
from functools import partial
from collections.abc import MutableMapping
from dol import Pipe

# --- Mocks for demonstration ---
# In a real scenario, these would be proper classes or enums
class Segments: pass
class Embeddings: pass
class PlanarVectors: pass
class ClusterIndices: pass
class AnalysisReports: pass
class Summaries: pass


# The `vectorize` utility function
vectorize = lambda func: Pipe(partial(map, func), list)

# --- Core DOG Abstraction (Simplified for example) ---
# This class represents the 'mk_mesh_for_funcs' output, renamed to DOG
class DOG:
    def __init__(self, funcs: Dict[str, Any], val_stores: Dict[str, Any], func_stores: Dict[str, Any]):
        self.func_signatures = funcs
        self._val_stores_config = val_stores
        self._func_stores_config = func_stores
        
        # Initialize actual value stores
        self.stores = {name: config['store'] for name, config in val_stores.items()}
        
        # Initialize actual function stores
        self.func = {name: config['store'] for name, config in func_stores.items()}

        # Simple counter for unique output keys
        self._output_counter = 0

    def call(self, func_impl: Callable, *inputs: Any) -> tuple[str, str]:
        """
        Calls a function implementation and stores its output, returning a reference.
        Determines the output store based on the function's expected return type.
        """
        # A simple mechanism to map function outputs to specific stores based on types.
        # In a real system, this would be more sophisticated (e.g., based on func_signatures)
        output_type = None
        for func_name, signature in self.func_signatures.items():
            if func_impl in self.func[func_name].values(): # Naive lookup
                # This needs a more robust way to map func_impl to its signature's return type
                # For simplicity, we'll infer based on function name conventions or mock it.
                if func_name == 'embedder':
                    output_type = 'embeddings'
                elif func_name == 'planarizer':
                    output_type = 'planar_vectors'
                elif func_name == 'clusterer':
                    output_type = 'cluster_indices'
                elif func_name == 'analyzer': # For new function type
                    output_type = 'analysis_reports'
                elif func_name == 'summarizer': # For new function type
                    output_type = 'summaries'
                break
        
        if not output_type or output_type not in self.stores:
            raise ValueError(f"Could not determine output store for function: {func_impl}")

        output_data = func_impl(*inputs)
        
        self._output_counter += 1
        output_key = f"output_{output_type}_{self._output_counter}"
        self.stores[output_type][output_key] = output_data
        
        return output_type, output_key

# --- Test Data & Configuration ---

# Extend function signatures for new capabilities
funcs = {
    'embedder': Callable[[Segments], Embeddings],
    'planarizer': Callable[[Embeddings], PlanarVectors],
    'clusterer': Callable[[Embeddings, int], ClusterIndices],
    'analyzer': Callable[[Embeddings, PlanarVectors], AnalysisReports], # New function type
    'summarizer': Callable[[AnalysisReports], Summaries], # Another new function type
}

# Extend value stores for new data types and initial data
val_stores = {
    'segments': {
        'type': Segments,
        'store': {
            'segments_1': ['segment1', 'segment2', 'segment3'],
            'segments_2': ['segment4', 'segment5']
        },
    },
    'embeddings': {
        'type': Embeddings,
        'store': dict(), # Will store Embedding objects
    },
    'planar_vectors': {
        'type': PlanarVectors,
        'store': dict(), # Will store PlanarVector objects
    },
    'cluster_indices': {
        'type': ClusterIndices,
        'store': dict(), # Will store ClusterIndex objects
    },
    'analysis_reports': { # New store for analysis outputs
        'type': AnalysisReports,
        'store': dict(),
    },
    'summaries': { # New store for summary outputs
        'type': Summaries,
        'store': dict(),
    },
}

# Extend function implementations
func_stores = {
    'embedder': {
        'name': 'embedders',
        'store': {
            'constant': lambda segments: vectorize(lambda s: [1, 2, 3])(segments),
            'segment_based': lambda segments: vectorize(lambda s: [len(s), 0.5, 0.5])(segments),
        },
    },
    'planarizer': {
        'name': 'planarizers',
        'store': {
            'constant': lambda embeddings: vectorize(lambda e: (e[0], e[1]))(embeddings),
            'embedding_based': lambda embeddings: vectorize(lambda e: (e[0] * 0.5, e[1] * 0.5))(embeddings),
        },
    },
    'clusterer': {
        'name': 'clusterers',
        'store': {
            'kmeans': lambda embeddings, num_clusters: ['cluster_a', 'cluster_b'] * (len(embeddings) // 2 + len(embeddings) % 2),
            'dbscan': lambda embeddings, min_points: ['noise'] * len(embeddings),
        },
    },
    'analyzer': { # New analyzer functions
        'name': 'analyzers',
        'store': {
            'similarity_scorer': lambda embeddings, planar_vectors: [{'score': (e[0] + p[0]) / 2} for e, p in zip(embeddings, planar_vectors)],
            'complex_report_generator': lambda embeddings, planar_vectors: {'report_id': 'complex-123', 'summary': 'Detailed analysis'},
        },
    },
    'summarizer': { # New summarizer functions
        'name': 'summarizers',
        'store': {
            'text_summary': lambda reports: "Overall summary from reports",
            'key_metric_extractor': lambda reports: {'total_score': sum(r.get('score', 0) for r in reports)},
        },
    },
}


# --- The User Story Test ---

def test_dog_operations():
    # Instantiate the DOG
    # We initialize our Data Operation Graph (DOG) with the defined function signatures,
    # value stores, and function implementations.
    dog_instance = DOG(funcs=funcs, val_stores=val_stores, func_stores=func_stores)

    # --- Store Inspection ---
    # We want to check if all expected value stores are accessible.
    print("\n--- Store Inspection ---")
    assert sorted(list(dog_instance.stores.keys())) == sorted(['segments', 'embeddings', 'planar_vectors', 'cluster_indices', 'analysis_reports', 'summaries'])
    print("All value stores are present.")

    # We expect the 'segments' store to have its initial data.
    assert 'segments_1' in dog_instance.stores['segments']
    assert dog_instance.stores['segments']['segments_1'] == ['segment1', 'segment2', 'segment3']
    print("Initial segment data verified.")

    # We want to verify that all expected function types are registered.
    assert sorted(list(dog_instance.func.keys())) == sorted(['embedder', 'planarizer', 'clusterer', 'analyzer', 'summarizer'])
    print("All function types are registered.")

    # We want to check if specific function implementations for 'embedder' are available.
    assert 'constant' in dog_instance.func['embedder']
    assert 'segment_based' in dog_instance.func['embedder']
    print("Embedder function implementations verified.")

    # --- CRUD Operations on Stores ---

    # We want to add new data to an existing store.
    dog_instance.stores['segments']['segments_3'] = ['segment6', 'segment7']
    assert 'segments_3' in dog_instance.stores['segments']
    assert dog_instance.stores['segments']['segments_3'] == ['segment6', 'segment7']
    print("New segments_3 data added successfully.")

    # We want to update existing data in a store.
    dog_instance.stores['segments']['segments_1'] = ['updated_segment_A', 'updated_segment_B']
    assert dog_instance.stores['segments']['segments_1'] == ['updated_segment_A', 'updated_segment_B']
    print("segments_1 data updated successfully.")

    # We want to read data from a store.
    retrieved_segments = dog_instance.stores['segments']['segments_2']
    assert retrieved_segments == ['segment4', 'segment5']
    print("segments_2 data retrieved successfully.")

    # We want to delete data from a store.
    del dog_instance.stores['segments']['segments_3']
    assert 'segments_3' not in dog_instance.stores['segments']
    print("segments_3 data deleted successfully.")

    # --- Function Call and Output Management ---

    # We want to call an 'embedder' function ('constant') on existing segment data
    # and ensure its output is correctly stored and referenced.
    print("\n--- Function Call and Output Management ---")
    segments_to_embed = dog_instance.stores['segments']['segments_1'] # Using updated segments_1
    output_store_key_embed, output_val_key_embed = dog_instance.call(
        dog_instance.func['embedder']['constant'],
        segments_to_embed
    )

    # We expect the output to be in the 'embeddings' store.
    assert output_store_key_embed == 'embeddings'
    output_store_embed = dog_instance.stores[output_store_key_embed]
    output_val_embed = output_store_embed[output_val_key_embed]
    # segments_1 has 2 items, so constant embedder should produce 2 outputs
    assert output_val_embed == [[1, 2, 3], [1, 2, 3]]
    print(f"Embedder function 'constant' called. Output stored at '{output_store_key_embed}' with key '{output_val_key_embed}'.")
    print(f"Retrieved embeddings: {output_val_embed}")

    # We want to call a 'planarizer' function ('embedding_based') on the newly generated embeddings.
    # This demonstrates chaining operations using store references.
    output_store_key_planar, output_val_key_planar = dog_instance.call(
        dog_instance.func['planarizer']['embedding_based'],
        output_val_embed # Using the actual value from the previous step
    )

    # We expect the output to be in the 'planar_vectors' store.
    assert output_store_key_planar == 'planar_vectors'
    output_store_planar = dog_instance.stores[output_store_key_planar]
    output_val_planar = output_store_planar[output_val_key_planar]
    # Expected: (1*0.5, 2*0.5) = (0.5, 1.0) and (3*0.5, unknown*0.5) (second and third values are 2,3 for [1,2,3])
    assert output_val_planar == [[0.5, 1.0], [0.5, 1.0]] # Assumes constant embedder gave [1,2,3] for each segment
    print(f"Planarizer function 'embedding_based' called. Output stored at '{output_store_key_planar}' with key '{output_val_key_planar}'.")
    print(f"Retrieved planar vectors: {output_val_planar}")

    # We want to call a 'clusterer' function ('kmeans') using the generated embeddings
    # and a direct integer input.
    num_clusters = 2
    output_store_key_cluster, output_val_key_cluster = dog_instance.call(
        dog_instance.func['clusterer']['kmeans'],
        output_val_embed, # Embeddings from previous step
        num_clusters      # Direct integer input
    )

    # We expect the output to be in the 'cluster_indices' store.
    assert output_store_key_cluster == 'cluster_indices'
    output_store_cluster = dog_instance.stores[output_store_key_cluster]
    output_val_cluster = output_store_cluster[output_val_key_cluster]
    # Expected: 2 segments -> ['cluster_a', 'cluster_b']
    assert output_val_cluster == ['cluster_a', 'cluster_b']
    print(f"Clusterer function 'kmeans' called. Output stored at '{output_store_key_cluster}' with key '{output_val_key_cluster}'.")
    print(f"Retrieved cluster indices: {output_val_cluster}")

    # --- Demonstrating New Function Calls and Chaining ---

    # We want to call the new 'analyzer' function ('similarity_scorer') using two different data stores as input.
    print("\n--- Demonstrating Advanced Chaining ---")
    output_store_key_analyze, output_val_key_analyze = dog_instance.call(
        dog_instance.func['analyzer']['similarity_scorer'],
        output_val_embed, # Embeddings
        output_val_planar # Planar vectors
    )

    assert output_store_key_analyze == 'analysis_reports'
    output_store_analyze = dog_instance.stores[output_store_key_analyze]
    output_val_analyze = output_store_analyze[output_val_key_analyze]
    # Based on embeddings [[1,2,3],[1,2,3]] and planar_vectors [[0.5,1.0],[0.5,1.0]]
    # Expecting [ (1+0.5)/2, (1+0.5)/2 ] -> [0.75, 0.75]
    expected_scores = [{'score': 0.75}, {'score': 0.75}]
    # The current mock 'planarizer' constant returns (e[0], e[1]), meaning [1,2,3] -> (1,2)
    # Then planar_vectors are [0.5, 1.0] from [1,2]. So the scores are (1+0.5)/2 = 0.75
    assert output_val_analyze == expected_scores
    print(f"Analyzer function 'similarity_scorer' called. Output stored at '{output_store_key_analyze}' with key '{output_val_key_analyze}'.")
    print(f"Retrieved analysis reports: {output_val_analyze}")


    # We want to call the new 'summarizer' function ('text_summary') on the analysis reports.
    output_store_key_summary, output_val_key_summary = dog_instance.call(
        dog_instance.func['summarizer']['text_summary'],
        output_val_analyze
    )

    assert output_store_key_summary == 'summaries'
    output_store_summary = dog_instance.stores[output_store_key_summary]
    output_val_summary = output_store_summary[output_val_key_summary]
    assert output_val_summary == "Overall summary from reports"
    print(f"Summarizer function 'text_summary' called. Output stored at '{output_store_key_summary}' with key '{output_val_key_summary}'.")
    print(f"Retrieved summary: '{output_val_summary}'")

    print("\nAll DOG operations tested successfully!")


test_dog_operations()


--- Store Inspection ---
All value stores are present.
Initial segment data verified.
All function types are registered.
Embedder function implementations verified.
New segments_3 data added successfully.
segments_1 data updated successfully.
segments_2 data retrieved successfully.
segments_3 data deleted successfully.

--- Function Call and Output Management ---
Embedder function 'constant' called. Output stored at 'embeddings' with key 'output_embeddings_1'.
Retrieved embeddings: [[1, 2, 3], [1, 2, 3]]


AssertionError: 

In [None]:
import strand

In [6]:
dog_instance.stores['segments']['segments_1']

['updated_segment_A', 'updated_segment_B']

In [None]:
import au