<a href="https://colab.research.google.com/github/daigo38/create-pinecone-index/blob/main/create_pinecone_index.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 初期設定でやること

1. 「環境変数とPineconeの定義」で環境変数の設定
1. 「IDの読み込み」でids_pathとnamespaceの設定

# データアップロード
1. 「データ登録」より前のセルを全て実行
1. 「データ登録」でURLを設定して実行
1. 「データ登録」実行中にファイルアップロード
1. 「データ追加」を実行



In [None]:
#@title パッケージインストール（編集不要）
%%capture captured
packages = """
langchain
openai
tiktoken
urllib3<2
pinecone-client

unstructured
selenium
pypdf
pypdfium2
"""

with open(f"./requirements.txt", "w") as f:
    f.write(packages)

!pip install -r requirements.txt

In [None]:
#@title Webドライバーの設定（編集不要）
%%capture captured
%%shell
# Ubuntu no longer distributes chromium-browser outside of snap
#
# Proposed solution: https://askubuntu.com/questions/1204571/how-to-install-chromium-without-snap

# Add debian buster
cat > /etc/apt/sources.list.d/debian.list <<'EOF'
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-buster.gpg] http://deb.debian.org/debian buster main
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-buster-updates.gpg] http://deb.debian.org/debian buster-updates main
deb [arch=amd64 signed-by=/usr/share/keyrings/debian-security-buster.gpg] http://deb.debian.org/debian-security buster/updates main
EOF

# Add keys
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys DCC9EFBF77E11517
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 648ACFD622F3D138
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 112695A0E562B32A

apt-key export 77E11517 | gpg --dearmour -o /usr/share/keyrings/debian-buster.gpg
apt-key export 22F3D138 | gpg --dearmour -o /usr/share/keyrings/debian-buster-updates.gpg
apt-key export E562B32A | gpg --dearmour -o /usr/share/keyrings/debian-security-buster.gpg

# Prefer debian repo for chromium* packages only
# Note the double-blank lines between entries
cat > /etc/apt/preferences.d/chromium.pref << 'EOF'
Package: *
Pin: release a=eoan
Pin-Priority: 500


Package: *
Pin: origin "deb.debian.org"
Pin-Priority: 300


Package: chromium*
Pin: origin "deb.debian.org"
Pin-Priority: 700
EOF

# Install chromium and chromium-driver
apt-get update
apt-get install chromium chromium-driver

In [None]:
#@title ManageIndexクラス（編集不要）
import os
import pinecone
import traceback
import uuid
import pandas as pd

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone
from langchain.document_loaders import (
    SeleniumURLLoader,
    PyPDFLoader,
    PDFMinerLoader,
    PyPDFium2Loader,
    CSVLoader,
    UnstructuredFileLoader,
)

from langchain.text_splitter import (
    MarkdownTextSplitter,
    CharacterTextSplitter
)


extension_dict = {
    '.txt': 'text',
    '.csv': 'csv',
    '.json': 'json',
    '.pdf': 'pdf',
    '.md': 'markdown',
}
special_dirs_dict = {
    "NotionDB": "notion",
}


class ManageIndex:
    def __init__(self, load_dir, urls=None, specify_loader=[]):
        self.embeddings = OpenAIEmbeddings()
        self.index_name = os.environ["INDEX_NAME"]
        self.docs = []
        self.load_dir = load_dir
        self.urls = urls
        self.specify_loader = specify_loader

    def _load_pdf(self, path):
        try:
            if "PDFMinerLoader" in self.specify_loader:
                print(f"Try PDFMinerLoader: {path}")
                loader = PDFMinerLoader(path)
            else:
                print(f"Try PyPDFLoader: {path}")
                loader = PyPDFLoader(path)
            documents = loader.load()
        except Exception as e:
            print(traceback.format_exc())
            print(f"Try PyPDFium2Loader: {path}")
            loader = PyPDFium2Loader(path)
            documents = loader.load()

        text_splitter = CharacterTextSplitter(separator="\n", chunk_size=1000, chunk_overlap=0)
        split_docs = text_splitter.split_documents(documents)
        return split_docs

    def _load_url(self, urls):
        loader = SeleniumURLLoader(urls)
        documents = loader.load()
        text_splitter = CharacterTextSplitter(separator="\n", chunk_size=1000, chunk_overlap=0)
        split_docs = text_splitter.split_documents(documents)
        return split_docs

    def _load_csv(self, path):
        print("try CSVLoader")
        loader = CSVLoader(path)
        documents = loader.load()
        return documents

    def _load_text(self, path):
        print("try UnstructuredFileLoader")
        loader = UnstructuredFileLoader(path)
        documents = loader.load()
        text_splitter = CharacterTextSplitter(separator="\n", chunk_size=1000, chunk_overlap=0)
        split_docs = text_splitter.split_documents(documents)
        return split_docs

    def _entry_documents(self, doc_type, doc_value):
        self.docs.append(
            {"type": doc_type, "value": doc_value}
        )

    def _load_documents(self):
        # entry website
        if self.urls:
            self._entry_documents("url", self.urls)

        # entry files
        for root, _, files in os.walk(self.load_dir):
            dir_name = os.path.basename(root)
            if dir_name in special_dirs_dict:
                self._entry_documents(special_dirs_dict[dir_name], root)
                continue
            for file in files:
                file_path = os.path.join(root, file)
                _, ext = os.path.splitext(file_path)
                file_type = extension_dict.get(ext, ext)

                self._entry_documents(file_type, file_path)

    def from_documents(self, namespace=None):
        self._load_documents()

        documents = []
        source_list = []
        for doc in self.docs:
            try:
                document = None
                if doc["type"] == "pdf":
                    document = self._load_pdf(doc["value"])
                elif doc["type"] == "markdown":
                    document = self._load_pdf(doc["value"])
                elif doc["type"] == "csv":
                    document = self._load_csv(doc["value"])
                elif doc["type"] == "url":
                    document = self._load_url(doc["value"])
                elif doc["type"] == "text":
                    document = self._load_text(doc["value"])
                elif doc["type"] == "notion":
                    document = self._load_pdf(doc["value"])

                if document:
                    documents.extend(document)
                    source_list.append(doc["value"])
            except Exception as e:
                print(traceback.format_exc())
                return doc["value"]

        ids_dict = {}
        if documents:
            # fix metadata
            for document in documents:
                source = document.metadata.get("source")
                if source:
                    document.metadata["source"] = source.replace(self.load_dir+"/", "")

                ids_dict[str(uuid.uuid4())] = document.metadata

            self.pinecone = Pinecone.from_documents(
                documents=documents,
                embedding=self.embeddings,
                ids=list(ids_dict.keys()),
                index_name=self.index_name,
                namespace=namespace,
            )
            print(documents)
        return ids_dict

    def from_items(self, namespace=None):
        try:
            df = pd.read_csv(self.load_dir+"/source_documents.csv")

            ids_dict = {}
            texts = []
            metadatas = []
            ids = []
            for i in range(len(df)):
                row = df.iloc[i]

                # idとpage_content以外をmetadataとして保存
                metadata = {}
                metadata_include_content = {}
                for column in df.columns:
                    if column != "id" and pd.notna(row[column]):
                        metadata_include_content[column] = row[column]
                        if column != "page_content":
                            metadata[column] = row[column]

                ids_dict[row["id"]] = metadata_include_content
                texts.append(row["page_content"])
                metadatas.append(metadata)
                ids.append(row["id"])
        except Exception as e:
            print(traceback.format_exc())
            return f"Invalid file content: {e}"

        self.pinecone = Pinecone.from_texts(
            texts=texts,
            embedding=self.embeddings,
            metadatas=metadatas,
            ids=ids,
            index_name=self.index_name,
            namespace=namespace,
        )
        return ids_dict


In [None]:
#@title 環境変数とPineconeの定義（初期設定 1）
os.environ["INDEX_NAME"] = "INDEX_NAME"
os.environ["PINECONE_API_KEY"] = "PINECONE_API_KEY"
os.environ["PINECONE_ENV"] = "PINECONE_ENV"
os.environ["OPENAI_API_KEY"] = "OPENAI_API_KEY"

# initialize pinecone
pinecone.init(
    api_key=os.environ["PINECONE_API_KEY"],  # find at app.pinecone.io
    environment=os.environ["PINECONE_ENV"]  # next to api key in console
)

In [None]:
#@title IDの読み込み（初期設定 2）
import json

from google.colab import drive
drive.mount('/content/drive')

# IDを保存するファイルのパスを設定
ids_path = "/content/drive/MyDrive/db/pinecone"

# 名前空間（インデックスの分類）を設定
namespace = "knowledge-ai" # デフォルトは空

try:
    with open(f"{ids_path}/ids_dict_{namespace}.json", "r") as f:
        current_ids_dict = json.load(f)
except FileNotFoundError:
    # IDを保存するファイルを作成
    with open(f"{ids_path}/ids_dict_{namespace}.json", "w") as f:
        f.write("{}")
    current_ids_dict = {}

!mkdir load_files
load_dir = "/content/load_files"

In [None]:
#@title インデックス操作関数定義（編集不要）
def update_ids_json(upsert_ids_dict={}, delete_ids_list=[]):
    for id, metadata in upsert_ids_dict.items():
        current_ids_dict[id] = metadata

    for id in delete_ids_list:
        try:
            current_ids_dict.pop(id)
        except KeyError as e:
            print(f"Not found in IDs dict: {e}")

    with open(f"{ids_path}/ids_dict_{namespace}.json", "w") as f:
        json.dump(current_ids_dict, f, indent=4, ensure_ascii=False)

def upsert_from_documents(load_dir, urls, specify_loader):
    manage_index = ManageIndex(load_dir=load_dir, urls=urls, specify_loader=specify_loader)
    upsert_ids_dict = manage_index.from_documents(namespace)
    if isinstance(upsert_ids_dict, str):
        print(upsert_ids_dict)
        return
    update_ids_json(upsert_ids_dict=upsert_ids_dict)
    print(f"Upsert {len(upsert_ids_dict)} vector items.")

def upsert_from_items(load_dir):
    manage_index = ManageIndex(load_dir=load_dir)
    upsert_ids_dict = manage_index.from_items(namespace)
    if isinstance(upsert_ids_dict, str):
        print(upsert_ids_dict)
        return
    update_ids_json(upsert_ids_dict=upsert_ids_dict)
    print(f"Upsert {len(upsert_ids_dict)} vector items.")

def delete_items(delete_ids_list=[], delete_source_list=[]):
    index = pinecone.Index(os.environ["INDEX_NAME"])

    if delete_source_list:
        for id, value in current_ids_dict.items():
            if value["source"] in delete_source_list:
                delete_ids_list.append(id)

    index.delete(ids=delete_ids_list, namespace=namespace)
    update_ids_json(delete_ids_list=delete_ids_list)
    print(f"Delete {len(delete_ids_list)} vector items.")

In [None]:
#@title データ登録 { vertical-output: true }
print("読み込ませるURLまたはファイルを指定してください。")
print("URLのみ読み込ませる場合はCancel uploadをクリックしてください。")
print("フォルダを登録する場合は、圧縮してzipファイルをアップロードしてください。")
print("アイテム修正の場合は、決まったフォーマットのsource_documents.csvをアップロードしてください。\n")

# URLを指定
"""Example:
urls = [
  "https://www.google.com/",
  "https://openai.com/"
]
"""
urls = [

]
print(f"URL list: {urls}\n")

from google.colab import files
import os

os.chdir(load_dir)
uploaded = files.upload()

for fn in uploaded.keys():
    if fn.endswith('.zip'):
        !unzip -o "$fn" -d .
        print(f'Extracted all contents from {fn}')
        !rm "$fn"
        !rm -rf "$load_dir"/__MACOSX
    else:
        print(f'Uploaded {fn}')

os.chdir('/content')


読み込ませるURLまたはファイルを指定してください。
URLのみ読み込ませる場合はCancel uploadをクリックしてください。
フォルダを登録する場合は、圧縮してzipファイルをアップロードしてください。

URL list: []



In [None]:
#@title データ追加 { vertical-output: true }

specify_loader = []
# specify_loader = ["PDFMinerLoader"] # 文字化けする場合はこの行のコメントアウトを外して実行

upsert_from_documents(load_dir, urls, specify_loader)
!rm -rf /content/load_files/* # 追加後、アップロードしたファイルは削除される

In [None]:
#@title アイテムの修正 { vertical-output: true }
upsert_from_items(load_dir)
!rm -rf /content/load_files/* # 追加後、アップロードしたファイルは削除される

In [None]:
#@title アイテム削除 { vertical-output: true }
print("削除するアイテムのIDのリストを入力してください。")
delete_ids_list = [

]
delete_source_list=[

]
delete_items(delete_ids_list=delete_ids_list, delete_source_list=delete_source_list)

# ids_dict.jsonの全てを削除する場合
# delete_index(delete_ids_list=list(current_ids_dict.keys()))

削除するデータのIDのリストを入力してください。


In [None]:
#@title Pinecone上の全アイテム削除 { vertical-output: true }
# index = pinecone.Index(os.environ["INDEX_NAME"])
# result = index.query(
#     vector=[i for i in range(1536)],
#     top_k=10000,
#     include_values=True,
#     namespace=namespace
# )

# delete_ids = []
# for vector in result["matches"]:
#     delete_ids.append(vector["id"])

# print(len(delete_ids))
# delete_index(delete_ids_list=delete_ids)