In [None]:
! pip install openai langchain pinecone-client

In [None]:
! pip install unstructured

In [None]:
! pip install git+https://github.com/openai/CLIP.git

In [None]:
! pip install filestack-python

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import pinecone
import torch
import os
from pkg_resources import packaging
from PIL import Image
from tqdm.notebook import tqdm
import hashlib
import matplotlib.pyplot as plt
from langchain.document_loaders import UnstructuredHTMLLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
import openai
from openai import OpenAI
import uuid
from langchain.chains.qa_with_sources import load_qa_with_sources_chain
from langchain.llms import OpenAI

print("Torch version:", torch.__version__)

### Import CLIP (Contrastive Language–Image Pre-training)

In [None]:
import clip

clip.available_models()

In [None]:
# please change this to CUDA when you have the GPU
DEVICE = 'cuda'

# we will be using ViT-B/32 model
model, preprocess = clip.load("ViT-B/32")
model = model.to(DEVICE)
input_resolution = model.visual.input_resolution
context_length = model.context_length
vocab_size = model.vocab_size

print("Model parameters:", f"{np.sum([int(np.prod(p.shape)) for p in model.parameters()]):,}")
print("Input resolution:", input_resolution)
print("Context length:", context_length)
print("Vocab size:", vocab_size)

In [None]:
# main image path
MAIN_PATH = "<PATH TO IMAGES"

# create an image dictionary
image_dict = {}

for image_file in os.listdir(MAIN_PATH):
    # get the image path
    image_path = os.path.join(MAIN_PATH, image_file)
    image_dict[image_file] = {
        "filename": image_file,
        "path": image_path
    }

### Upload files to filestack to get the URL

In [None]:
from filestack import Client
client = Client("YOUR FILESTACK API KEY")


def upload_to_filestack(path):

    store_params = {
        "mimetype": "image/png"
    }
    new_filelink = client.upload(filepath=path, store_params=store_params)
    print(new_filelink.url)
    return new_filelink.url

### Create Embeddings

In [None]:
embedding_dictionary = {}
for file_name in tqdm(image_dict):
    file_path = image_dict[file_name]["path"]
    upload_url = upload_to_filestack(file_path)
    try:
        # preprocessing is compulsory here
        preprocess_image = preprocess(Image.open(file_path).convert("RGB")).unsqueeze(0).to(DEVICE)
        encoddings = model.encode_image(preprocess_image).tolist()[0]
    except Exception as error:
        message = "Cannot encode the image, err: {}".format(str(error))
        print(message)
        continue
    # set the encoding dictionary
    embedding_dictionary[file_name] = {
        'name': file_name,
        'path': upload_url,
        'embeddings': encoddings
    }

### Upload Image embeddings to pinecone

In [None]:
# set up pinecone environment
os.environ['PINECONE_API_KEY'] = ""
os.environ['PINECONE_API_ENV'] = ""
os.environ['PINECONE_INDEX_NAME'] = ""
# set index
pinecone.init( api_key=os.environ['PINECONE_API_KEY'], environment=os.environ['PINECONE_API_ENV'])
pinecone_index=pinecone.Index(os.environ['PINECONE_INDEX_NAME'])

In [None]:
# upload to pinecone
for image in embedding_dictionary:
    image_name = embedding_dictionary[image]['name']
    path = embedding_dictionary[image]['path']
    embeddings = embedding_dictionary[image]['embeddings']

    # pinecone settings
    document_hash = hashlib.md5(path.encode("utf-8"))
    metadata = {"image_name": image_name, "file_path": path}
    pinecone_index.upsert([(document_hash.hexdigest(), embeddings, metadata)])
    print("{}===>Added".format(image_name))


### Upload Text Embeddings to Pinecone

#### This is different pinecone setup

- If you do have pinecone paid version, you can create another index which makes the process much easier

In [None]:
# set up pinecone environment
os.environ['MY_PINECONE_API_KEY'] = ""
os.environ['MY_PINECONE_API_ENV'] = ""
os.environ['MY_PINECONE_INDEX_NAME'] = ""
# set index
pinecone.init( api_key=os.environ['MY_PINECONE_API_KEY'], environment=os.environ['MY_PINECONE_API_ENV'])
pinecone_index_text=pinecone.Index(os.environ['MY_PINECONE_INDEX_NAME'])

In [None]:
TEXT_PATH = "backyard-birds.html"

In [None]:
# Read the html document
loader = UnstructuredHTMLLoader(TEXT_PATH)
text_info = loader.load()
text_file = text_info[0].page_content
print("Number of documents: {}".format(len(text_info)))

Number of documents: 1


In [None]:
# split the texts
text_splitter = RecursiveCharacterTextSplitter(
        # Set a really small chunk size, just to show.
        chunk_size = 1000,
        chunk_overlap  = 20,
        length_function = len,
        is_separator_regex = False,
    )
    # text splitter
texts = text_splitter.create_documents([text_file])

In [None]:
# set the openai key
openai.api_key = "OPEANAI_API_KEY"

In [None]:
# embeddings model
MODEL = "text-embedding-ada-002"

for index, sub_docs in enumerate(texts):
    document_hash = hashlib.md5(sub_docs.page_content.encode("utf-8"))
    embedding = openai.embeddings.create(model= MODEL,input=sub_docs.page_content).data[0].embedding
    metadata = {"chunk": str(uuid.uuid4()), "text": sub_docs.page_content, "doc_index":index}
    pinecone_index_text.upsert([(document_hash.hexdigest(), embedding, metadata)])
    print("{} ==> Done".format(index))

### NOTE

- If you use two seperate pinecone setups, you cannot run the both queries. Try to run it independently, then it will work
- Or create seperate scripts. Below things are for the reference.

### Text Query

In [None]:
def qa_engine(question):
    # pinecone env
    index=pinecone.Index(os.environ['MY_PINECONE_INDEX_NAME'])

    question_embed_call = openai.embeddings.create(input = question ,model = MODEL)
    query_embeds = question_embed_call.data[0].embedding
    response = index.query(query_embeds,top_k = 1,include_metadata = True)
    # get the response text and metadata
    response = response["matches"][0]["metadata"]
    text = response.get("text", "UNKNOWN")
    chunk = response.get("chunk", "UNKNOWN")
    doc_index = response.get("doc_index", "UNKNOWN")
    offset=", OFFSET="+str(response.get("chunk","UNKNOWN"))

    # query document
    query_doc = []

    # create metadata for q&a chain
    metadata = {
        "id": chunk,
        "filename": doc_index,
        "source": str(doc_index) + offset
    }

    query_doc.append(Document(page_content=text, metadata = metadata))

    # query the answer from llm
    llm = OpenAI(temperature=0, openai_api_key = openai.api_key)
    chain = load_qa_with_sources_chain(llm, verbose = False)
    # get the chain response
    chain_response = chain.run(input_documents = query_doc, question = question )
    print(chain_response)


qa_engine("Where does American Goldfinch's yellow hue come from?")

 The American Goldfinch's yellow hue comes from pigments called carotenoids.
SOURCES: 6.0, OFFSET=363f5aaa-524e-4a53-acdd-d28c6e96bea4


In [None]:
qa_engine("What is American Goldfinch's")

 The American Goldfinch is a small bird with a yellow plumage. It primarily eats seeds from plants in the aster family, such as thistles and sunflowers.
SOURCES: 6.0, OFFSET=363f5aaa-524e-4a53-acdd-d28c6e96bea4


### Image Query

In [None]:
def query_images(query):
    # get the tekenizers
    tokens = clip.tokenize(query).to(DEVICE)
    query_embeds = model.encode_text(tokens).tolist()[0]
    response = pinecone_index_image.query(query_embeds,top_k = 1,include_metadata = True)
    file_path = response['matches'][0]['metadata']['file_path']
    image_name = response['matches'][0]['metadata']['image_name']
    score = response['matches'][0]['score']
    print("Image Name: {}".format(image_name))
    # open the image
    plt.figure(figsize = (5,5))
    plt.imshow(Image.open(file_path).convert("RGB"))
    plt.title("confidence Score: {}".format(score))
    plt.axis('off');
    plt.show()

In [None]:
QUERY = "Where does American Goldfinch's yellow hue come from?"
query_images(QUERY)