# DocArray @ PyCon US 2023!

The goal of this notebook is to concisely showcase the **three pillars of DocArray**:

- _Represent_
- _Send_
- _Store_

Multimodal data for ML!

To do that, the notebook builds a dummy **podcast transcription service**, that represents and loads podcast data, can perform semantic similarity search between podcast episodes, and is served over FastAPI.

Keep in mind that this is a _dummy_ application, but it should make clear how you can take the same ideas to a serious project!

# Represent

Let's represent a Podcast file using DocArray:

In [1]:
# imports
from docarray import BaseDoc
from docarray.typing import TorchTensor, AudioNdArray, AudioUrl, TextUrl, AnyTensor, AudioTorchTensor
from typing import Optional

In [2]:
# define schema == Document
class Podcast(BaseDoc):
    # audio
    audio_url: AudioUrl
    audio_tensor: Optional[AudioTorchTensor]
    # text
    transcript_url: Optional[TextUrl]
    transcript: Optional[str]
    # embedding
    embedding: Optional[TorchTensor[512]]

In [3]:
# creat a Podcast object

# url's to our data
# could be on the web or local
audio_url = './docarray/star-spangled-banner.oga'
transcript_url = './docarray/star-spangle-banner-lyrics.txt'

# instantiate the object
podcast = Podcast(audio_url=audio_url, transcript_url=transcript_url)

In [4]:
# load the data

podcast.audio_tensor, frame_rate = podcast.audio_url.load()
podcast.transcript = podcast.transcript_url.load()

In [5]:
podcast.summary()

In [6]:
print(podcast.audio_tensor.shape)
podcast.summary()

torch.Size([7841920])


## Represent batches of data (yay, ML!)

You can also represent _batches_ of data, using the `DocVec` class!

In [7]:
from docarray import BaseDoc, DocVec
from docarray.typing import TorchTensor, AudioNdArray, AudioUrl, TextUrl, AnyTensor
from typing import Optional

In [8]:
# create DocVec
pod_vec = DocVec[Podcast]([podcast, podcast, podcast], tensor_type=TorchTensor)
print(pod_vec.audio_tensor.shape)

torch.Size([3, 7841920])


In [9]:
# process using an ML model
import torch
from torch import nn


# define the model
class MyAudioModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(512, 512, dtype=torch.float64)

    def forward(self, audio_tensor: TorchTensor['n_batch', 'len_audio']) -> TorchTensor['n_batch', 512]:
        return self.linear(audio_tensor[:, :512])


model = MyAudioModel()

# create and assign embeddings
pod_vec.embedding = model(pod_vec.audio_tensor)

# Send

Let's see how you can Documents over FastAPI!

In [10]:
# imports
import numpy as np
from fastapi import FastAPI
from httpx import AsyncClient

from docarray import BaseDoc
from docarray.typing import NdArray
from docarray.base_doc import DocArrayResponse

In [11]:
# defined output model
# we _could_ re-use our `Podcas` class, but we want to return something else!

class OutputDoc(BaseDoc):
    audio_embedding: NdArray[512]
    transcript: str

In [12]:
# create FastAPI app

app = FastAPI()


def my_whisper_model(audio_tensor: AnyTensor) -> str:
    return 'this is the transcript of your podcast'


@app.post("/transcribe", response_model=OutputDoc, response_class=DocArrayResponse)
async def transcribe(pod: Podcast) -> OutputDoc:
    if not pod.audio_tensor:
        pod.audio_tensor, _ = pod.audio_url.load()
    embedding = model(pod.audio_tensor.unsqueeze(0))
    transcript = pod.transcript if pod.transcript else my_whisper_model(pod.audio_tensor)
    return OutputDoc(audio_embedding=embedding.detach().numpy(), transcript=transcript)

In [13]:
import asyncio
import uvicorn

if __name__ == "__main__":
    config = uvicorn.Config(app)
    server = uvicorn.Server(config)
    await server.serve()

INFO:     Started server process [33423]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:56026 - "POST /transcribe HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [33423]


# Store

Finally, let's store and retrieve some data using a vector database!

In [14]:
# imports
from docarray.index import HnswDocumentIndex

In [16]:
# create a Document Index
# this is our API to access a vector database
# in this case, HNSWLib as a local option
doc_index = HnswDocumentIndex[Podcast](work_dir='./test11')

In [17]:
# put together our transcription app!
@app.post("/transcribe_and_index", response_model=OutputDoc, response_class=DocArrayResponse)
async def transcribe_and_index(pod: Podcast) -> OutputDoc:
    """Creates a transcription and indexes the pod in the Document Index"""
    if not pod.audio_tensor:
        pod.audio_tensor, _ = pod.audio_url.load()
    pod.embedding = model(pod.audio_tensor.unsqueeze(0))
    pod.transcript = pod.transcript if pod.transcript else my_whisper_model(pod.audio_tensor)
    doc_index.index([pod])  # add to vector database (Document Index)
    return OutputDoc(audio_embedding=pod.embedding.detach().numpy(), transcript=pod.transcript)


@app.post("/find", response_model=Podcast, response_class=DocArrayResponse)
async def find(pod: Podcast) -> Podcast:
    """Finds a previously indexed pod based on semantic/vector search"""
    if not pod.embedding:
        if not pod.audio_tensor:
            pod.audio_tensor, _ = pod.audio_url.load()
        pod.embedding = model(pod.audio_tensor.unsqueeze(0))

    similar_pods, scores = doc_index.find(pod, search_field='embedding', limit=1)
    pod = similar_pods[0]
    pod.audio_tensor = None
    return pod

In [None]:
import asyncio
import uvicorn

if __name__ == "__main__":
    config = uvicorn.Config(app)
    server = uvicorn.Server(config)
    await server.serve()

INFO:     Started server process [33423]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:39596 - "POST /transcribe HTTP/1.1" 200 OK




INFO:     127.0.0.1:56778 - "POST /find HTTP/1.1" 500 Internal Server Error


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/johannes/.cache/pypoetry/virtualenvs/docarray-EljsZLuq-py3.8/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py", line 419, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/home/johannes/.cache/pypoetry/virtualenvs/docarray-EljsZLuq-py3.8/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "/home/johannes/.cache/pypoetry/virtualenvs/docarray-EljsZLuq-py3.8/lib/python3.8/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/home/johannes/.cache/pypoetry/virtualenvs/docarray-EljsZLuq-py3.8/lib/python3.8/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/johannes/.cache/pypoetry/virtualenvs/docarray-EljsZLuq-py3.8/lib/p