Jina-serve is a framework for building and deploying AI services that communicate via gRPC, HTTP and WebSockets. Scale your services from local development to production while focusing on your core logic.
- Native support for all major ML frameworks and data types
- High-performance service design with scaling, streaming, and dynamic batching
- LLM serving with streaming output
- Built-in Docker integration and Executor Hub
- One-click deployment to Jina AI Cloud
- Enterprise-ready with Kubernetes and Docker Compose support
Comparison with FastAPI
Key advantages over FastAPI:
- DocArray-based data handling with native gRPC support
- Built-in containerization and service orchestration
- Seamless scaling of microservices
- One-command cloud deployment
pip install jina
See guides for Apple Silicon and Windows.
Three main layers:
- Data: BaseDoc and DocList for input/output
- Serving: Executors process Documents, Gateway connects services
- Orchestration: Deployments serve Executors, Flows create pipelines
Let's create a gRPC-based AI service using StableLM:
from jina import Executor, requests
from docarray import DocList, BaseDoc
from transformers import pipeline
class Prompt(BaseDoc):
text: str
class Generation(BaseDoc):
prompt: str
text: str
class StableLM(Executor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.generator = pipeline(
'text-generation', model='stabilityai/stablelm-base-alpha-3b'
)
@requests
def generate(self, docs: DocList[Prompt], **kwargs) -> DocList[Generation]:
generations = DocList[Generation]()
prompts = docs.text
llm_outputs = self.generator(prompts)
for prompt, output in zip(prompts, llm_outputs):
generations.append(Generation(prompt=prompt, text=output))
return generations
Deploy with Python or YAML:
from jina import Deployment
from executor import StableLM
dep = Deployment(uses=StableLM, timeout_ready=-1, port=12345)
with dep:
dep.block()
jtype: Deployment
with:
uses: StableLM
py_modules:
- executor.py
timeout_ready: -1
port: 12345
Use the client:
from jina import Client
from docarray import DocList
from executor import Prompt, Generation
prompt = Prompt(text='suggest an interesting image generation prompt')
client = Client(port=12345)
response = client.post('/', inputs=[prompt], return_type=DocList[Generation])
Chain services into a Flow:
from jina import Flow
flow = Flow(port=12345).add(uses=StableLM).add(uses=TextToImage)
with flow:
flow.block()
Boost throughput with built-in features:
- Replicas for parallel processing
- Shards for data partitioning
- Dynamic batching for efficient model inference
Example scaling a Stable Diffusion deployment:
jtype: Deployment
with:
uses: TextToImage
timeout_ready: -1
py_modules:
- text_to_image.py
env:
CUDA_VISIBLE_DEVICES: RR
replicas: 2
uses_dynamic_batching:
/default:
preferred_batch_size: 10
timeout: 200
- Structure your Executor:
TextToImage/
├── executor.py
├── config.yml
├── requirements.txt
- Configure:
# config.yml
jtype: TextToImage
py_modules:
- executor.py
metas:
name: TextToImage
description: Text to Image generation Executor
- Push to Hub:
jina hub push TextToImage
jina export kubernetes flow.yml ./my-k8s
kubectl apply -R -f my-k8s
jina export docker-compose flow.yml docker-compose.yml
docker-compose up
Deploy with a single command:
jina cloud deploy jcloud-flow.yml
Enable token-by-token streaming for responsive LLM applications:
- Define schemas:
from docarray import BaseDoc
class PromptDocument(BaseDoc):
prompt: str
max_tokens: int
class ModelOutputDocument(BaseDoc):
token_id: int
generated_text: str
- Initialize service:
from transformers import GPT2Tokenizer, GPT2LMHeadModel
class TokenStreamingExecutor(Executor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.model = GPT2LMHeadModel.from_pretrained('gpt2')
- Implement streaming:
@requests(on='/stream')
async def task(self, doc: PromptDocument, **kwargs) -> ModelOutputDocument:
input = tokenizer(doc.prompt, return_tensors='pt')
input_len = input['input_ids'].shape[1]
for _ in range(doc.max_tokens):
output = self.model.generate(**input, max_new_tokens=1)
if output[0][-1] == tokenizer.eos_token_id:
break
yield ModelOutputDocument(
token_id=output[0][-1],
generated_text=tokenizer.decode(
output[0][input_len:], skip_special_tokens=True
),
)
input = {
'input_ids': output,
'attention_mask': torch.ones(1, len(output[0])),
}
- Serve and use:
# Server
with Deployment(uses=TokenStreamingExecutor, port=12345, protocol='grpc') as dep:
dep.block()
# Client
async def main():
client = Client(port=12345, protocol='grpc', asyncio=True)
async for doc in client.stream_doc(
on='/stream',
inputs=PromptDocument(prompt='what is the capital of France ?', max_tokens=10),
return_type=ModelOutputDocument,
):
print(doc.generated_text)
Jina-serve is backed by Jina AI and licensed under Apache-2.0.