In [1]:
%load_ext autoreload
%autoreload 2

# Proposed API: Definining Data Pipeline

In [57]:
from ai_cookbook.pipeline import DataSource, ProcessingStep, Output
from ai_cookbook.functions.chunking import chunk_text

# Define a mock data source
source_1 = DataSource(
    name="source1",
    catalog="test_catalog",
    schema="test_schema",
    type="volume",
    volume_name="test_volume",
    path="/path/to/data",
    format="csv",
)

# Define a local processing function
def mock_parse_docs(inputs):
    return [
        "It was the best of times, it was the worst of times.",
        "Call me Ishmael.",
        "All happy families are alike.",
        "It is a truth universally acknowledged.",
        "In a hole in the ground there lived a hobbit."
    ]

# Define processing steps
step_1 = ProcessingStep(
    name="step1",
    function=mock_parse_docs, # function supports python functions or UC functions
    inputs=[source_1],
    output_table="output_table1",
)

# Define a processing step that will fail
step_2 = ProcessingStep(
    name="step2",
    function=lambda x: (_ for _ in ()).throw(Exception("Step 2 failed")),
    inputs=[step_1], # inputs is a list of data sources or previous steps
    output_table="output_table2",
)

output_index = Output(
    name="output_index",
    type="vector_index",
    embedding_model="openai-embedding-model",
    inputs=[step_2],
    output_table="output_index",
)

In [58]:
from ai_cookbook.pipeline import Pipeline

pipeline = Pipeline(
    data_sources=[source_1],
    processing_steps=[step_1, step_2],
    outputs=[output_index],
)


In [62]:
run = pipeline.run()

Output()

ValidationError: 2 validation errors for Result[IngestionData, IngestionError]
source_volume
  Input should be a valid string [type=string_type, input_value=DataSource(name='source1'...ne, workspace_link=None), input_type=DataSource]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type
destination_table
  Input should be a valid string [type=string_type, input_value=ProcessingStep(name='step..._table1', parameters={}), input_type=ProcessingStep]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type

In [52]:
step_runs = pipeline.metadata_manager.get_metadata(run)

In [53]:
step_runs

{'step1': ['running', 'completed'],
 'step2': ['running', 'completed'],
 'output_index': ['running', 'completed']}

# Proposed API: Adding model

In [None]:
from ai_cookbook.models.rag import MultiTurnRAG
from ai_cookbook.models.sql_gen import SQLGenModel

sql_gen_model = SQLGenModel()

model = MultiTurnRAG(
    name="rag_model",
    inputs=pipeline,
    steps=[sql_gen_model],
)
