In [1]:
from langchain.agents.agent_types import AgentType
from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent
from langchain_openai import ChatOpenAI

from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import LanceDB
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.prompts import PromptTemplate

import lancedb

from IPython.display import Markdown, display

## Source data

In [33]:
import pandas as pd
from langchain_openai import OpenAI

df = pd.read_parquet('./data/parquet/sources-camel-kafka.parquet', engine='pyarrow')

# add some keywords to improve the lookup
keywords = [
    "custom check, pause, resume",
    "pause consumer, curcuit breaker, kafka",
    "resume adapter, resume API",
    "resume API"
]

df.insert(4, "keywords", keywords, True)

df.head()

Unnamed: 0,project,path,language,tags,keywords,description,source
0,camel,/Users/opiske/code/java/camel/components/camel...,java,,"custom check, pause, resume","""Shows how to create a custom check that can d...","if (count.intValue() <= 1) {\n LOG.info(""Coun..."
1,camel,/Users/opiske/code/java/camel/components/camel...,java,,"pause consumer, curcuit breaker, kafka","""Shows how to build a Camel route can pause a ...",return new RouteBuilder(){\n @Override public...
2,camel,/Users/opiske/code/java/camel/components/camel...,java,,"resume adapter, resume API","""Shows to create an adapter that is run by Cam...",if (count.intValue() <= 1) {\n return true;\n...
3,camel,/Users/opiske/code/java/camel/components/camel...,java,,resume API,"""Shows to create a route that uses the resume ...",return new RouteBuilder(){\n @Override public...


In [34]:
from langchain_community.document_loaders import DataFrameLoader
loader = DataFrameLoader(df, page_content_column="source")
docs = loader.load()
print("Num docs: ", len(docs))

[print(d) for d in list(df["description"])]


Num docs:  4
"Shows how to create a custom check that can determine whether to pause or continue"
"Shows how to build a Camel route can pause a Kafka consumer when using the circuit breaker pattern"
"Shows to create an adapter that is run by Camel when the resume happens "
"Shows to create a route that uses the resume API"


[None, None, None, None]

## Retriever setup

In [35]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
print(f"Created {len(splits)} splits")

# Use `description` as lancedb.embedding and `page_content` as lancedb.text
# this way the lookups will be performed against the description values, but still return the actual sources into the LLM context
embeddings = OpenAIEmbeddings()
db = lancedb.connect("/tmp/lancedb")
table = db.create_table(
    "camel",
    data=[
        {
            "vector": embeddings.embed_query("Hello World"),
            "text": "Hello World",
            "id": "1",
        }
    ],
    mode="overwrite",
)

# manually create the store so we get the embeddings right
lancedb_data = []
for i,doc in enumerate(splits):
    lancedb_data.append(
        {
            "vector": embeddings.embed_query(doc.metadata["keywords"] + " "+ doc.metadata["description"]),
            "text": doc.metadata["description"] +"\n"+ doc.page_content,
            "id": i,
        }
    )
table.add(lancedb_data)

# create a vectorstore from the pre-populated table
vectorstore = LanceDB(table,embeddings)

# Retrieve and generate using the relevant snippets of the sources found in the parquet files.
retriever = vectorstore.as_retriever()
print(type(vectorstore))

Created 6 splits
<class 'langchain_community.vectorstores.lancedb.LanceDB'>


In [None]:
# let's take a look at how the retriever actually works
matches = retriever.get_relevant_documents("resume API")
[print(f"{i}:\n {m.page_content}") for i,m in enumerate(matches)]

## The actual Q&A involving the LLM

In [45]:
#llm = ChatOpenAI(model_name="gpt-3.5-turbo-1106", temperature=0)
llm = ChatOpenAI(model_name="gpt-4-1106-preview", temperature=0)

prompt = PromptTemplate.from_template("""
    You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. 
    If you don't know the answer, just say that you don't know. 
    Please provide end-to-end examples in Java when applicable. 

    Question: {question} 

    Context: {context} 

    Answer:
""")

def format_docs(docs):      
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

questions = [
    "Can you show me an example of using the resume API in Camel?",    
    "How do I determine whether to pause or continue a route? Do you have an example?",        
]

from langchain.globals import set_debug
set_debug(False)

display(Markdown("### Example Questions&Answers"))
for i,q in enumerate(questions):
    display(Markdown("#### "+q))
    result = rag_chain.invoke(q)    
    display(Markdown(result))    
    



### Example Questions&Answers

#### Can you show me an example of using the resume API in Camel?

Based on the provided context, it seems you are looking for examples of how to use Apache Camel's resume API, particularly with Kafka consumers and the circuit breaker pattern. The context snippets suggest creating Camel routes that can pause and resume based on certain conditions. Below are the examples based on the context provided:

Example 1: Using the resume API with a Kafka consumer
```java
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;

public class KafkaConsumerRoute extends RouteBuilder {

    private static final String SOURCE_TOPIC = "yourTopic";
    private static final int RETRY_COUNT = 3;

    @Override
    public void configure() {
        from("kafka:" + SOURCE_TOPIC + "?groupId=KafkaPausableConsumerIT&autoOffsetReset=earliest" +
                "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" +
                "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" +
                "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" +
                "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
                .pausable(testConsumerListener, o -> canContinue())
                .routeId("pausable-it")
                .process(exchange -> LOG.info("Got record from Kafka: {}", exchange.getMessage().getBody()))
                .to("direct:intermediate");

        from("direct:intermediate")
                .process(exchange -> {
                    LOG.info("Got record on the intermediate processor: {}", exchange.getMessage().getBody());
                    if (getCount() <= RETRY_COUNT) {
                        throw new RuntimeCamelException("Error");
                    }
                })
                .to(KafkaTestUtil.MOCK_RESULT);
    }

    private boolean canContinue() {
        // Implement your logic to determine if the route can continue
        // For example, based on a retry count
        return getCount() <= RETRY_COUNT;
    }

    private int getCount() {
        // Implement your logic to retrieve the current count
        // This is just a placeholder
        return 0;
    }

    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new KafkaConsumerRoute());
        context.start();
        // Keep the application running until you decide to stop it
        Thread.sleep(Long.MAX_VALUE);
        context.stop();
    }
}
```

Example 2: Using the resume API with a Kafka consumer and circuit breaker pattern
```java
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;

public class KafkaConsumerCircuitBreakerRoute extends RouteBuilder {

    private static final String SOURCE_TOPIC = "yourTopic";
    private ScheduledExecutorService executorService;

    @Override
    public void configure() {
        CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("pausable");
        circuitBreaker.getEventPublisher()
                .onSuccess(event -> {
                    LOG.info("Downstream call succeeded");
                    if (executorService != null) {
                        executorService.shutdownNow();
                        executorService = null;
                    }
                })
                .onError(event -> {
                    LOG.info("Downstream call error. Starting a thread to simulate checking for the downstream availability");
                    if (executorService == null) {
                        executorService = Executors.newSingleThreadScheduledExecutor();
                        executorService.scheduleAtFixedRate(() -> increment(), 1, 1, TimeUnit.SECONDS);
                    }
                });

        getCamelContext().getRegistry().bind("pausableCircuit", circuitBreaker);

        from("kafka:" + SOURCE_TOPIC + "?groupId=KafkaPausableConsumerCircuitBreakerIT&autoOffsetReset=earliest" +
                "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" +
                "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" +
                "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" +
                "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
                .pausable(new KafkaConsumerListener(), o -> canContinue())
                .routeId("pausable-it")
                .process(exchange -> LOG.info("Got record from Kafka: {}", exchange.getMessage().getBody()))
                .circuitBreaker()
                .resilience4jConfiguration()
                .circuitBreaker("pausableCircuit")
                .end()
                .to("direct:intermediate");

        from("direct:intermediate")
                .process(exchange -> {
                    // Process the exchange
                });
    }

    private boolean canContinue() {
        // Implement your logic to determine if the route can continue
        // For example, based on a retry count
        return true; // Placeholder
    }

    private void increment() {
        // Implement your logic to increment the retry count or check downstream availability
    }

    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new KafkaConsumerCircuitBreakerRoute());
        context.start();
        // Keep the application running until you decide to stop it
        Thread.sleep(Long.MAX_VALUE);
        context.stop();
    }
}
```

Please note that these examples are based on the context provided and may require additional details to be fully functional. You will need to implement the `canContinue`, `getCount`, and `increment` methods according to your specific use case. Additionally, you will need to replace `yourTopic` with the actual Kafka topic you are using and ensure that the Kafka consumer configuration matches your Kafka setup.

#### How do I determine whether to pause or continue a route? Do you have an example?

To determine whether to pause or continue a route, you can use a custom check within your Camel route that evaluates certain conditions to decide the flow of processing. In the provided context, the custom check is based on a count value that is compared against a threshold (SIMULATED_FAILURES). If the count is less than or equal to 1, or greater than or equal to SIMULATED_FAILURES, processing is allowed to proceed. Otherwise, the processing cannot proceed.

Here is an example in Java that demonstrates how to create a Camel route with a custom check to determine whether to pause or continue processing, using the circuit breaker pattern:

```java
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.CircuitBreaker;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.RuntimeCamelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CamelRouteExample {

    private static final Logger LOG = LoggerFactory.getLogger(CamelRouteExample.class);
    private static final int SIMULATED_FAILURES = 5;
    private static AtomicInteger count = new AtomicInteger(0);
    private static ScheduledExecutorService executorService;

    public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("pausable");
                circuitBreaker.getEventPublisher().onSuccess(event -> {
                    LOG.info("Downstream call succeeded");
                    if (executorService != null) {
                        executorService.shutdownNow();
                        executorService = null;
                    }
                }).onError(event -> {
                    LOG.info("Downstream call error. Starting a thread to simulate checking for the downstream availability");
                    if (executorService == null) {
                        executorService = Executors.newSingleThreadScheduledExecutor();
                        executorService.scheduleAtFixedRate(() -> increment(), 1, 1, TimeUnit.SECONDS);
                    }
                });
                getCamelContext().getRegistry().bind("pausableCircuit", circuitBreaker);

                from("kafka:sourceTopic?groupId=group&autoOffsetReset=earliest")
                    .routeId("pausable-route")
                    .process(exchange -> LOG.info("Got record from Kafka: {}", exchange.getMessage().getBody()))
                    .circuitBreaker()
                        .resilience4jConfiguration()
                        .circuitBreaker("pausableCircuit")
                    .end()
                    .to("direct:intermediate");

                from("direct:intermediate")
                    .process(exchange -> {
                        LOG.info("Got record on the intermediate processor: {}", exchange.getMessage().getBody());
                        if (!canContinue()) {
                            throw new RuntimeCamelException("Cannot proceed at the moment");
                        }
                    })
                    .to("mock:result");
            }
        });

        context.start();
        Thread.sleep(10000);
        context.stop();
    }

    private static boolean canContinue() {
        if (count.intValue() <= 1) {
            LOG.info("Count is 1, allowing processing to proceed");
            return true;
        }
        if (count.intValue() >= SIMULATED_FAILURES) {
            LOG.info("Count is {}, allowing processing to proceed because it's greater than retry count {}", count.intValue(), SIMULATED_FAILURES);
            return true;
        }
        LOG.info("Cannot proceed at the moment ... count is {}", count.intValue());
        return false;
    }

    private static void increment() {
        count.incrementAndGet();
    }
}
```

In this example, the `canContinue()` method is used to determine whether to proceed with processing or not. The `increment()` method is used to simulate an external factor that affects the decision (e.g., checking for downstream system availability). The Camel route listens to a Kafka topic and processes messages, using a circuit breaker to manage the flow based on the custom check.

Please note that you need to replace `"kafka:sourceTopic?groupId=group&autoOffsetReset=earliest"` with the actual Kafka topic and configuration you are using. Also, the `mock:result` endpoint is used for demonstration purposes and should be replaced with your actual destination endpoint.