# Data Ingestion

This notebook streams a dataset down and stores it in a vector database

## Setup

In [6]:
import openai

import tiktoken
from tenacity import retry, wait_random_exponential, stop_after_attempt
from typing import List
import concurrent
from tqdm import tqdm
import pandas as pd

import numpy as np

MODEL = "text-embedding-ada-002"

## Data Ingestion

Thanks very much to Ryan Greene for the template used here for ingesting the data for this analysis, which is 50,000 articles from Simple Wikipedia

In [7]:
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def get_embeddings(input: List):
    response = openai.Embedding.create(
        input=input,
        model=MODEL,
    )["data"]
    return [data["embedding"] for data in response]


def embed_corpus(
    corpus: List[str],
    batch_size=64,
    num_workers=8,
    max_context_len=8191,
):
    def batchify(iterable, n=1):
        l = len(iterable)
        for ndx in range(0, l, n):
            yield iterable[ndx : min(ndx + n, l)]

    # Encode the corpus, truncating to max_context_len
    encoding = tiktoken.get_encoding("cl100k_base")
    encoded_corpus = [
        encoded_article[:max_context_len] for encoded_article in encoding.encode_batch(corpus)
    ]

    # Calculate corpus statistics: the number of inputs, the total number of tokens, and the estimated cost to embed
    num_tokens = sum(len(article) for article in encoded_corpus)
    cost_to_embed_tokens = num_tokens / 1_000 * 0.0004
    print(
        f"num_articles={len(encoded_corpus)}, num_tokens={num_tokens}, est_embedding_cost={cost_to_embed_tokens:.2f} USD"
    )

    # Embed the corpus
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [
            executor.submit(get_embeddings, text_batch)
            for text_batch in batchify(encoded_corpus, batch_size)
        ]

        with tqdm(total=len(encoded_corpus)) as pbar:
            for _ in concurrent.futures.as_completed(futures):
                pbar.update(batch_size)

        embeddings = []
        for future in futures:
            data = future.result()
            embeddings.extend(data)
        return embeddings

In [8]:
from datasets import load_dataset

dataset = list(load_dataset("wikipedia", "20220301.simple")["train"])
dataset = dataset[:50_000]  # Limit to 10k articles for demo purposes

  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
  value = getattr(cls, name)
Found cached dataset wikipedia (/Users/colin.jarvis/.cache/huggingface/datasets/wikipedia/20220301.simple/2.0.0/aa542ed919df55cc5d3347f42dd4521d05ca68751f50dbc32bae2a7f1e167559)


  0%|          | 0/1 [00:00<?, ?it/s]

In [9]:
%%time
dataset_embeddings = embed_corpus([article["text"] for article in dataset])

num_articles=50000, num_tokens=18272526, est_embedding_cost=7.31 USD


50048it [02:39, 314.01it/s]                                                                                                                                                      


CPU times: user 25.7 s, sys: 4.6 s, total: 30.3 s
Wall time: 2min 42s


In [10]:
title_embeddings = embed_corpus([article["title"] for article in dataset])

num_articles=50000, num_tokens=202363, est_embedding_cost=0.08 USD


50048it [00:53, 930.73it/s]                                                                                                                                                      


In [11]:
article_df = pd.DataFrame(dataset)
article_df['title_embedding'] = title_embeddings
article_df['title_vector'] = article_df['title_embedding'].tolist()
article_df['content_embedding'] = dataset_embeddings
article_df['content_vector'] = article_df['content_embedding'].tolist()
article_df['vector_id'] = article_df.index
article_df['vector_id'] = article_df['vector_id'].apply(str)
article_df.head()

Unnamed: 0,id,url,title,text,title_embedding,title_vector,content_embedding,content_vector,vector_id
0,1,https://simple.wikipedia.org/wiki/April,April,April is the fourth month of the year in the J...,"[0.0010547508718445897, -0.020757636055350304,...","[0.0010547508718445897, -0.020757636055350304,...","[-0.011253940872848034, -0.013491976074874401,...","[-0.011253940872848034, -0.013491976074874401,...",0
1,2,https://simple.wikipedia.org/wiki/August,August,August (Aug.) is the eighth month of the year ...,"[0.0009623901569284499, 0.0008108559413813055,...","[0.0009623901569284499, 0.0008108559413813055,...","[0.0003609954728744924, 0.007262262050062418, ...","[0.0003609954728744924, 0.007262262050062418, ...",1
2,6,https://simple.wikipedia.org/wiki/Art,Art,Art is a creative activity that expresses imag...,"[0.0033528385683894157, 0.006173426751047373, ...","[0.0033528385683894157, 0.006173426751047373, ...","[-0.004959689453244209, 0.015772193670272827, ...","[-0.004959689453244209, 0.015772193670272827, ...",2
3,8,https://simple.wikipedia.org/wiki/A,A,A or a is the first letter of the English alph...,"[0.015449387952685356, -0.013746200129389763, ...","[0.015449387952685356, -0.013746200129389763, ...","[0.024894846603274345, -0.022186409682035446, ...","[0.024894846603274345, -0.022186409682035446, ...",3
4,9,https://simple.wikipedia.org/wiki/Air,Air,Air refers to the Earth's atmosphere. Air is a...,"[0.0222249086946249, -0.020463958382606506, -0...","[0.0222249086946249, -0.020463958382606506, -0...","[0.021524671465158463, 0.018522677943110466, -...","[0.021524671465158463, 0.018522677943110466, -...",4


## Pinecone

Now we'll look to index these embedded documents in a vector database and search them. The first option we'll look at is **Pinecone**, a managed vector database which offers a cloud-native option for customers (pinecone.io).

For this we will:
- Create indices in Pinecone
- Store our data there
- Fire some similarity search queries
- Try a real use case

In [8]:
import pinecone
import os

In [9]:
api_key = os.getenv("PINECONE_API_KEY")
pinecone.init(api_key=api_key)

### Create Index

In [30]:
from typing import Iterator

class BatchGenerator:
    """ Models a simple batch generator that make chunks out of an input DataFrame. """
    
    def __init__(self, batch_size: int = 10) -> None:
        self.batch_size = batch_size
    
    def to_batches(self, df: pd.DataFrame) -> Iterator[pd.DataFrame]:
        """ Makes chunks out of an input DataFrame. """
        splits = self.splits_num(df.shape[0])
        if splits <= 1:
            yield df
        else:
            for chunk in np.array_split(df, splits):
                yield chunk
    
    def splits_num(self, elements: int) -> int:
        """ Determines how many chunks DataFrame contians. """
        return round(elements / self.batch_size)
    
    __call__ = to_batches

df_batcher = BatchGenerator(300)

In [19]:
# Pick a name for the new index
index_name = 'wikipedia-articles'

In [20]:
# Check whether the index with the same name already exists
if index_name in pinecone.list_indexes():
    pinecone.delete_index(index_name)

In [21]:
pinecone.create_index(name=index_name, dimension=len(article_df['content_vector'][0]))

In [22]:
index = pinecone.Index(index_name=index_name)

In [23]:
pinecone.list_indexes()

['wikipedia-articles']

In [24]:
# Upsert content vectors in content namespace
print("Uploading vectors to content namespace..")
for batch_df in df_batcher(article_df):
    index.upsert(vectors=zip(batch_df.vector_id, batch_df.content_vector), namespace='content')

Uploading vectors to title namespace..


In [25]:
# Upsert title vectors in title namespace
print("Uploading vectors to title namespace..")
for batch_df in df_batcher(article_df):
    index.upsert(vectors=zip(batch_df.vector_id, batch_df.title_vector), namespace='title')

Uploading vectors to title namespace..


In [26]:
# Check index size for each namespace
index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.2,
 'namespaces': {'content': {'vector_count': 50000},
                'title': {'vector_count': 50000}},
 'total_vector_count': 100000}

### Perform search

Now we'll enter some dummy searches and check we get decent results back

In [60]:
titles_mapped = dict(zip(article_df.vector_id,article_df.title))
content_mapped = dict(zip(article_df.vector_id,article_df.text))

In [106]:
def query_article(query, namespace, top_k=5):
    '''Queries an article using its title in the specified
     namespace and prints results.'''

    # Create vector embeddings based on the title column
    embedded_query = openai.Embedding.create(
                                                input=query,
                                                model=MODEL,
                                            )["data"][0]['embedding']

    # Query namespace passed as parameter using title vector
    query_result = index.query(embedded_query, 
                                      namespace=namespace, 
                                      top_k=top_k)

    # Print query results 
    print(f'\nMost similar results querying {query} in "{namespace}" namespace:\n')
    if not query_result.matches:
        print('no query result')
    
    matches = query_result.matches
    #print(matches)
    ids = [res.id for res in matches]
    #print(ids)
    scores = [res.score for res in matches]
    #[print(x) for x in ids]
    df = pd.DataFrame({'id':ids, 
                       'score':scores,
                       'title': [titles_mapped[_id] for _id in ids],
                       'content': [content_mapped[_id] for _id in ids],
                       })
    
    counter = 0
    for k,v in df.iterrows():
        counter += 1
        print(f'Result {counter} with a score of {v.score} is {v.title}')
    
    print('\n')

    return df

In [107]:
query_output = query_article('modern art in Europe','title')
query_output


Most similar results querying modern art in Europe in "title" namespace:

Result 1 with a score of 0.890994787 is Early modern Europe
Result 2 with a score of 0.875113845 is Museum of Modern Art
Result 3 with a score of 0.867352843 is Western Europe
Result 4 with a score of 0.86422205 is Renaissance art
Result 5 with a score of 0.860472 is Pop art




Unnamed: 0,id,score,title,content
0,34185,0.890995,Early modern Europe,The early modern period is a term used by hist...
1,23266,0.875114,Museum of Modern Art,The Museum of Modern Art (MoMA) is a modern ar...
2,12249,0.867353,Western Europe,Western Europe is a geographic region of Europ...
3,15436,0.864222,Renaissance art,Many of the most famous and best-loved works o...
4,23265,0.860472,Pop art,Pop art is a modern art movement that develope...


## Weaviate

The other vector database option we'll explore here is **Weaviate**, which offers both a managed, SaaS option like Pinecone, as well as a self-hosted option. As we've already looked at a cloud vector database, we'll try the self-hosted option here.

For this we will:
- Set up a local deployment of Weaviate
- Create indices in Weaviate
- Store our data there
- Fire some similarity search queries
- Try a real use case

### Setup

To get Weaviate running locally we used Docker and followed the instructions contained in this article: https://weaviate.io/developers/weaviate/current/installation/docker-compose.html

For an example docker-compose.yaml file please refer to `./weaviate/docker-compose.yaml` in this repo

You can start Weaviate up locally by navigating to this directory and running `docker-compose up -d `

In [1]:
import weaviate

In [3]:
client = weaviate.Client("http://localhost:8080/")

In [20]:
client.schema.delete_all()
client.schema.get()

{'classes': []}

In [21]:
client.is_ready()

True

### Create Schema

In Weaviate you create __schemas__ to capture each of the entities you will be searching. 

In this case we'll create a schema called **Article** with 

In [22]:
class_obj = {
    "class": "Article",
    "vectorizer": "none", # explicitly tell Weaviate not to vectorize anything, we are providing the vectors ourselves through our BERT model
    "properties": [{
        "name": "title",
        "description": "Title of the article",
        "dataType": ["text"]
    },
        {
        "name": "content",
        "description": "Contents of the article",
        "dataType": ["text"]
    }]
}

In [23]:
client.schema.create_class(class_obj)

In [24]:
client.schema.get()

{'classes': [{'class': 'Article',
   'invertedIndexConfig': {'bm25': {'b': 0.75, 'k1': 1.2},
    'cleanupIntervalSeconds': 60,
    'stopwords': {'additions': None, 'preset': 'en', 'removals': None}},
   'properties': [{'dataType': ['text'],
     'description': 'Title of the article',
     'name': 'title',
     'tokenization': 'word'},
    {'dataType': ['text'],
     'description': 'Contents of the article',
     'name': 'content',
     'tokenization': 'word'}],
   'shardingConfig': {'virtualPerPhysical': 128,
    'desiredCount': 1,
    'actualCount': 1,
    'desiredVirtualCount': 128,
    'actualVirtualCount': 128,
    'key': '_id',
    'strategy': 'hash',
    'function': 'murmur3'},
   'vectorIndexConfig': {'skip': False,
    'cleanupIntervalSeconds': 300,
    'maxConnections': 64,
    'efConstruction': 128,
    'ef': -1,
    'dynamicEfMin': 100,
    'dynamicEfMax': 500,
    'dynamicEfFactor': 8,
    'vectorCacheMaxObjects': 2000000,
    'flatSearchCutoff': 40000,
    'distance': 'cos

### Import Data

Now we batch up the data and import it into the schema we created

In [25]:
client.batch.configure(
  # `batch_size` takes an `int` value to enable auto-batching
  # (`None` is used for manual batching)
  batch_size=100, 
  # dynamically update the `batch_size` based on import speed
  dynamic=False,
  # `timeout_retries` takes an `int` value to retry on time outs
  timeout_retries=3,
  # checks for batch-item creation errors
  # this is the default in weaviate-client >= 3.6.0
  callback=weaviate.util.check_batch_result,
)
#result = client.batch.create_objects(batch)

<weaviate.batch.crud_batch.Batch at 0x107885960>

In [27]:
# Make a list of tuples
data_objects = []
for k,v in article_df.iterrows():
    data_objects.append((v['title'],v['text'],v['title_embedding'],v['vector_id']))

In [None]:
# Template function for setting up parallel upload process
def transcription_extractor(audio_filepath):
    response = call_asr(openai.api_key,audio_filepath)
    return(response)

In [34]:
# Upsert into article schema
print("Uploading vectors to article_schema..")
uuids = []
for articles in data_objects:
    uuid = client.data_object.create(
                              {
                                  "title": articles[0],
                                  "content": articles[1]
                              },
                              "Article",
                              vector=articles[2]
                            )
    uuids.append(uuid)

Uploading vectors to article_schema..


In [36]:
client.data_object.get()

{'deprecations': None,
 'objects': [{'class': 'Article',
   'creationTimeUnix': 1671632308225,
   'id': '0000a04d-ba96-4c16-9e00-d1a1d4bf0f71',
   'lastUpdateTimeUnix': 1671632308225,
   'properties': {'content': 'Ophthalmosaurus was an ichthyosaur of the Upper Jurassic (165 to 150 million years ago), named after its large eyes. Well-preserved skeletons, ranging in age from juveniles to adults, have been found in Europe, North America and Argentina.\n\nOphthalmosaurus had the largest eyes of any vertebrate relative to its body size. Its eyes, 4 inches in diameter, occupied most of the space in the skull. They were protected by bony plates (sclerotic rings), which most likely assisted to maintain the shape of the eyeballs against water pressure at depth.\nThe size of the eyes and the sclerotic rings suggests that Ophthalmosaurus hunted at a depth where there is not much light or that it may have hunted at night when a prey species was more active. Its snout was long and thin, perfect fo

### Query Data

In [45]:
def query_weaviate(query, schema, top_k=5):
    '''Queries an article using its title in the specified
     namespace and prints results.'''

    # Create vector embeddings based on the title column
    embedded_query = openai.Embedding.create(
                                                input=query,
                                                model=MODEL,
                                            )["data"][0]['embedding']
    
    near_vector = {"vector": embedded_query}

    # Query namespace passed as parameter using title vector
    query_result = client.query.get(schema,["title","content", "_additional {certainty}"]).with_near_vector(near_vector).do()
    
    return query_result
    # Print query results 
    
    
    '''
    print(f'\nMost similar results querying {query} in "{namespace}" namespace:\n')
    if not query_result.matches:
        print('no query result')
    
    matches = query_result.matches
    #print(matches)
    ids = [res.id for res in matches]
    #print(ids)
    scores = [res.score for res in matches]
    #[print(x) for x in ids]
    df = pd.DataFrame({'id':ids, 
                       'score':scores,
                       'title': [titles_mapped[_id] for _id in ids],
                       'content': [content_mapped[_id] for _id in ids],
                       })
    
    counter = 0
    for k,v in df.iterrows():
        counter += 1
        print(f'Result {counter} with a score of {v.score} is {v.title}')
    
    print('\n')

    return df'''

In [46]:
query_result = query_weaviate('modern art in Europe','Article')
query_result

{'data': {'Get': {'Article': [{'_additional': {'certainty': 0.9455017447471619},
     'content': 'The early modern period is a term used by historians for the period in Western Europe and its first colonies which spans the three centuries between the Middle Ages and the Industrial Revolution.\n\nThe early modern period is characterized by the rise to importance of science and technological progress, civic politics and the nation state.  Capitalism began its rise, beginning in northern Italian republics such as Genoa. The early modern period also saw the rise and dominance of the economic theory of mercantilism. As such, the early modern period represents the decline and eventual disappearance, in much of the European sphere, of feudalism, serfdom and the power of the Catholic Church.\n\nThe period includes the Protestant Reformation, the Thirty Years\' War, the European colonization of the Americas and the peak of the European hunt of witches.\n\nThe beginning of the early modern perio

In [53]:
counter = 0
for article in query_result['data']['Get']['Article']:
    if counter < 20:
        counter += 1
        print(f"{counter}. Title: {article['title']} Certainty: {article['_additional']['certainty']}")
    else:
        exit

1. Title: Early modern Europe Certainty: 0.9455017447471619
2. Title: Museum of Modern Art Certainty: 0.9376430511474609
3. Title: Western Europe Certainty: 0.9336977899074554
4. Title: Renaissance art Certainty: 0.9321110248565674
5. Title: Pop art Certainty: 0.9302527010440826
6. Title: Art exhibition Certainty: 0.9281987249851227
7. Title: History of Europe Certainty: 0.927833616733551
8. Title: Northern Europe Certainty: 0.9273118078708649
9. Title: Concert of Europe Certainty: 0.9268544018268585
10. Title: Hellenistic art Certainty: 0.9264450967311859
11. Title: Modernist literature Certainty: 0.9235998094081879
12. Title: Piet Mondrian Certainty: 0.9235771298408508
13. Title: European Capital of Culture Certainty: 0.9227772951126099
14. Title: Art film Certainty: 0.9217151403427124
15. Title: Europa Certainty: 0.9216940104961395
16. Title: Art rock Certainty: 0.9213176369667053
17. Title: Central Europe Certainty: 0.9212862849235535
18. Title: Art Certainty: 0.9207752346992493
19