# Amazon Bedrock - API Gateway invocation with Langchain

## Install requirements

In [None]:
%pip install -q langchain==0.0.340

### Setup Environment

We are going to invoke Amazon API Gateway through `langchain`

In [None]:
from langchain.chains import LLMChain
from langchain.llms.amazon_api_gateway import AmazonAPIGateway
from langchain.prompts import PromptTemplate

### Setting up API Url

In [None]:
api_url = "<API_URL>"
api_key = "<API_KEY>"
team_id = "<TEAM_ID>"

### Define Default Prompt

In [None]:
PROMPT_DEFAULT = PromptTemplate(
    template="{question}", input_variables=["question"]
)

### Amazon Titan Text Express

In [None]:
model_id = "amazon.titan-text-express-v1"

model_kwargs = {
    "maxTokenCount": 4096,
    "temperature": 0.2
}

prompt = "What is Amazon Bedrock?"

In [None]:
llm = AmazonAPIGateway(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id
    },
    model_kwargs=model_kwargs
)

chain = LLMChain(
    llm=llm,
    prompt=PROMPT_DEFAULT,
    verbose=True
)

In [None]:
response = chain.predict(question=prompt)

# Print response
print(response)

### Amazon Titan Text Express - Streaming

In [None]:
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.amazon_api_gateway import AmazonAPIGateway
from langchain.llms.utils import enforce_stop_tokens
import requests
import time
from typing import Any, List, Optional

class AmazonAPIGatewayExtended(AmazonAPIGateway):
    streaming: bool = False
    polling_wait: int = 2

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Call out to Amazon API Gateway model.

                Args:
                    prompt: The prompt to pass into the model.
                    stop: Optional list of stop words to use when generating.

                Returns:
                    The string generated by the model.

                Example:
                    .. code-block:: python

                        response = se("Tell me a joke.")
                """
        _model_kwargs = self.model_kwargs or {}
        payload = self.content_handler.transform_input(prompt, _model_kwargs)

        try:
            response = requests.post(
                self.api_url,
                headers=self.headers,
                json=payload,
            )
            if not self.streaming:
                text = self.content_handler.transform_output(response)
            else:
                request_id = response.json()[0]["request_id"]

                start_time = time.time()
                max_time = 120

                while (time.time() - start_time) < max_time:
                    response = requests.post(
                        self.api_url + f"&requestId={request_id}",
                        headers=self.headers,
                        json={},
                    )

                    if "generated_text" in response.json()[0]:
                        break

                    time.sleep(self.polling_wait)

                text = self.content_handler.transform_output(response)

        except Exception as error:
            raise ValueError(f"Error raised by the service: {error}")

        if stop is not None:
            text = enforce_stop_tokens(text, stop)

        if response.status_code != 200:
            raise Exception(text)

        return text

In [None]:
model_id = "amazon.titan-text-express-v1"

model_kwargs = {
    "maxTokenCount": 4096,
    "temperature": 0.2
}

prompt = "What is Amazon Bedrock?"

In [None]:
llm = AmazonAPIGatewayExtended(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id,
        "streaming": "true"
    },
    model_kwargs=model_kwargs,
    streaming=True
)

chain = LLMChain(
    llm=llm,
    prompt=PROMPT_DEFAULT,
    verbose=True
)

In [None]:
response = chain.predict(question=prompt)

# Print response
print(response)

### Amazon Titan Embeddings

In [None]:
from langchain.embeddings.bedrock import Embeddings
import requests
from typing import List

class AmazonAPIGatewayEmbeddings(Embeddings):
    def __init__(self, api_url, headers):
        self.api_url = api_url
        self.headers = headers

    def embed_documents(self, texts: List[str], parameters: dict = {}) -> List[List[float]]:
        results = []
        for text in texts:
            response = requests.post(
                self.api_url,
                json={"inputs": text, "parameters": parameters},
                headers=self.headers
            )
            results.append(response.json()[0]["embedding"])

        return results

    def embed_query(self, text: str, parameters: dict = {}) -> List[float]:
        response = requests.post(
                self.api_url,
                json={"inputs": text, "parameters": parameters},
                headers=self.headers
            )

        return response.json()[0]["embedding"]

In [None]:
model_id = "amazon.titan-embed-text-v1"

prompt = """
What is Amazon Bedrock?"
"""

In [None]:
embeddings = AmazonAPIGatewayEmbeddings(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id,
        "type": "embeddings"
    }
)

In [None]:
embeddings.embed_query(prompt)

### Anthropic Claude

In [None]:
model_id = "anthropic.claude-v2"

model_kwargs = {
    "max_tokens_to_sample": 4096,
    "temperature": 0.2
}

prompt = """
What is Amazon Bedrock?"
"""

In [None]:
llm = AmazonAPIGateway(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id
    },
    model_kwargs=model_kwargs
)

chain = LLMChain(
    llm=llm,
    prompt=PROMPT_DEFAULT,
    verbose=True
)

In [None]:
response = chain.predict(question=prompt)

# Print response
print(response)

### Amazon Claude - Streaming

In [None]:
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.amazon_api_gateway import AmazonAPIGateway
from langchain.llms.utils import enforce_stop_tokens
import requests
import time
from typing import Any, List, Optional

class AmazonAPIGatewayExtended(AmazonAPIGateway):
    streaming: bool = False
    polling_wait: int = 2

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Call out to Amazon API Gateway model.

                Args:
                    prompt: The prompt to pass into the model.
                    stop: Optional list of stop words to use when generating.

                Returns:
                    The string generated by the model.

                Example:
                    .. code-block:: python

                        response = se("Tell me a joke.")
                """
        _model_kwargs = self.model_kwargs or {}
        payload = self.content_handler.transform_input(prompt, _model_kwargs)

        try:
            response = requests.post(
                self.api_url,
                headers=self.headers,
                json=payload,
            )
            if not self.streaming:
                text = self.content_handler.transform_output(response)
            else:
                request_id = response.json()[0]["request_id"]

                start_time = time.time()
                max_time = 120

                while (time.time() - start_time) < max_time:
                    response = requests.post(
                        self.api_url + f"&requestId={request_id}",
                        headers=self.headers,
                        json={},
                    )

                    if "generated_text" in response.json()[0]:
                        break

                    time.sleep(self.polling_wait)

                text = self.content_handler.transform_output(response)

        except Exception as error:
            raise ValueError(f"Error raised by the service: {error}")

        if stop is not None:
            text = enforce_stop_tokens(text, stop)

        if response.status_code != 200:
            raise Exception(text)

        return text

In [None]:
model_id = "anthropic.claude-v2"

model_kwargs = {
    "max_tokens_to_sample": 4096,
    "temperature": 0.2
}

prompt = """
What is Amazon Bedrock?"
"""

In [None]:
llm = AmazonAPIGatewayExtended(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id,
        "streaming": "true"
    },
    model_kwargs=model_kwargs,
    streaming=True
)

chain = LLMChain(
    llm=llm,
    prompt=PROMPT_DEFAULT,
    verbose=True
)

In [None]:
response = chain.predict(question=prompt)

# Print response
print(response)

### AI21 Jurassic

In [None]:
model_id = "ai21.j2-ultra"

model_kwargs = {
    "maxTokens": 4096,
    "temperature": 0.2
}

prompt = """
What is Amazon Bedrock?"
"""

In [None]:
llm = AmazonAPIGateway(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id
    },
    model_kwargs=model_kwargs
)

chain = LLMChain(
    llm=llm,
    prompt=PROMPT_DEFAULT,
    verbose=True
)

In [None]:
response = chain.predict(question=prompt)

# Print response
print(response)

### Cohere Command

In [None]:
model_id = "cohere.command-text-v14"

model_kwargs = {
    "max_tokens": 4000,
    "temperature": 0.2
}

prompt = """
What is Amazon Bedrock?"
"""

In [None]:
llm = AmazonAPIGateway(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id
    },
    model_kwargs=model_kwargs
)

chain = LLMChain(
    llm=llm,
    prompt=PROMPT_DEFAULT,
    verbose=True
)

In [None]:
response = chain.predict(question=prompt)

# Print response
print(response)

### Cohere Embed Multilingual

In [None]:
from langchain.embeddings.bedrock import Embeddings
import requests
from typing import List

class AmazonAPIGatewayEmbeddings(Embeddings):
    def __init__(self, api_url, headers):
        self.api_url = api_url
        self.headers = headers

    def embed_documents(self, texts: List[str], parameters: dict = {}) -> List[List[float]]:
        results = []
        for text in texts:
            response = requests.post(
                self.api_url,
                json={"inputs": text, "parameters": parameters},
                headers=self.headers
            )
            results.append(response.json()[0]["embedding"])

        return results

    def embed_query(self, text: str, parameters: dict = {}) -> List[float]:
        response = requests.post(
                self.api_url,
                json={"inputs": text, "parameters": parameters},
                headers=self.headers
            )

        return response.json()[0]["embedding"]

In [None]:
model_id = "cohere.embed-multilingual-v3"

model_kwargs = {
    "input_type": "search_document"
}

prompt = """
What is Amazon Bedrock?"
"""

In [None]:
embeddings = AmazonAPIGatewayEmbeddings(
    api_url=f"{api_url}/invoke_model?model_id={model_id}",
    headers={
        "x-api-key": api_key,
        "team_id": team_id,
        "type": "embeddings"
    }
)

In [None]:
embeddings.embed_query(prompt)