Skip to content

Commit

Permalink
feat: Document Loader for Datastore. (#7)
Browse files Browse the repository at this point in the history
* feat: Document Loader for Datastore.

* style: format fix.

* fix: lint issues.

* docs: typo in doc fixed.

* test: add another test.

* style: addressed typos.

* feat: add datastore type to conversion.

* chore: refactor converter.

* chore: refactor.

* chore: refactor user_agent in client.

* chore: refactor document_converter.

* chore: client user_agent refactor.

* chore: refactor type in converter.

* chore: simplify logic.

* fix: raise error instead of continue.

* chore: refactor json.
  • Loading branch information
JU-2094 committed Feb 16, 2024
1 parent f169dd1 commit 28a9c01
Show file tree
Hide file tree
Showing 11 changed files with 1,064 additions and 22 deletions.
108 changes: 87 additions & 21 deletions docs/document_loader.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Google DATABASE\n",
"# Cloud Firestore in Datastore Mode\n",
"\n",
"[Google DATABASE](https://cloud.google.com/DATABASE).\n",
"[Cloud Firestore in Datastore Mode](https://cloud.google.com/datastore) is a NoSQL document database built for automatic scaling, high performance and ease of application development. \n",
"\n",
"Load documents from `DATABASE`."
"Load and store documents from `Firestore` in Datastore Mode."
]
},
{
Expand All @@ -26,7 +26,7 @@
},
"outputs": [],
"source": [
"%pip install PACKAGE_NAME"
"%pip install langchain-google-datastore"
]
},
{
Expand All @@ -37,7 +37,7 @@
},
"outputs": [],
"source": [
"from PACKAGE import LOADER"
"from langchain_google_datastore import DatastoreLoader, DatastoreSaver"
]
},
{
Expand All @@ -51,7 +51,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load from table"
"### Load from Kind"
]
},
{
Expand All @@ -60,7 +60,7 @@
"metadata": {},
"outputs": [],
"source": [
"loader = LOADER()\n",
"loader = DatastoreLoader(\"MyKind\")\n",
"\n",
"data = loader.load()"
]
Expand All @@ -69,7 +69,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load from query"
"### Load from queries"
]
},
{
Expand All @@ -78,16 +78,24 @@
"metadata": {},
"outputs": [],
"source": [
"loader = LOADER()\n",
"from google.cloud import datastore\n",
"\n",
"data = loader.load()"
"client = datastore.Client(database=\"non-default-db\", namespace=\"custom_namespace\")\n",
"query_load = client.query(kind=\"MyKind\")\n",
"query_load.add_filter(\"region\", \"=\", \"west_coast\")\n",
"\n",
"loader_document = DatastoreLoader(query_load)\n",
"\n",
"data = loader_document.load()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Customize Document Page Content & Metadata"
"## Customize Document Page Content & Metadata\n",
"\n",
"The arguments of `page_content_properties` and `metadata_properties` will specify the Entity properties to be written into LangChain Document `page_content` and `metadata`."
]
},
{
Expand All @@ -96,7 +104,11 @@
"metadata": {},
"outputs": [],
"source": [
"loader = LOADER()\n",
"loader = DatastoreLoader(\n",
" source=\"MyKind\",\n",
" page_content_fields=[\"data_field\"],\n",
" metadata_fields=[\"metadata_field\"],\n",
")\n",
"\n",
"data = loader.load()"
]
Expand All @@ -105,14 +117,18 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Customize Page Content Format"
"### Customize Page Content Format\n",
"\n",
"When the `page_content` contains only one field the information will be the field value only. Otherwise the `page_content` will be in JSON format."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Save Documents to table"
"## Save Documents\n",
"\n",
"`DatastoreSaver` can store LangChain Documents into Firestore in Datastore Mode. By default it will try to extract the entity key from the `key` in the Document metadata."
]
},
{
Expand All @@ -121,8 +137,55 @@
"metadata": {},
"outputs": [],
"source": [
"saver = SAVER()\n",
"saver.add_documents(docs)"
"saver = DatastoreSaver()\n",
"\n",
"saver.upsert_documents(data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Save Documents without key\n",
"\n",
"If a `kind` is specified the documents will be stored with an auto generated id."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"saver = DatastoreSaver(\"MyKind\")\n",
"\n",
"saver.upsert_documents(data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Delete Documents"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"saver = DatastoreSaver()\n",
"\n",
"saver.delete_documents(data)\n",
"\n",
"keys_to_delete = [\n",
" [\"Kind1\", \"identifier\"],\n",
" [\"Kind2\", 123],\n",
" [\"Kind3\", \"identifier\", \"NestedKind\", 456],\n",
"]\n",
"# The Documents will be ignored and only the document ids will be used.\n",
"saver.delete_documents(data, keys_to_delete)"
]
},
{
Expand All @@ -138,11 +201,14 @@
"metadata": {},
"outputs": [],
"source": [
"from google.cloud.DATABASE import Client\n",
"from google.auth import compute_engine\n",
"from google.cloud.firestore import Client\n",
"\n",
"creds = \"\"\n",
"client = Client(creds=creds)\n",
"loader = LOADER(\n",
"client = Client(\n",
" database = \"non-default-db\",\n",
" creds=compute_engine.Credentials())\n",
"loader = DatastoreLoader(\n",
" source=\"foo\"\n",
" client=client,\n",
")"
]
Expand All @@ -169,4 +235,4 @@
},
"nbformat": 4,
"nbformat_minor": 4
}
}
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ requires-python = ">=3.8"
dependencies = [
"langchain==0.1.1",
"google-cloud-datastore==2.19.0",
"more_itertools==10.2.0",
]

[project.urls]
Expand Down Expand Up @@ -38,4 +39,4 @@ python_version = "3.11"
warn_unused_configs = true
exclude = [
"owlbot.py"
]
]
4 changes: 4 additions & 0 deletions src/langchain_google_datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from langchain_google_datastore.document_loader import DatastoreLoader, DatastoreSaver

__all__ = ["DatastoreLoader", "DatastoreSaver"]
150 changes: 150 additions & 0 deletions src/langchain_google_datastore/document_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import itertools
import json
from enum import StrEnum
from typing import TYPE_CHECKING, Any, Dict, List

import more_itertools
from google.cloud.datastore import Entity, Key
from google.cloud.datastore.helpers import GeoPoint
from langchain_core.documents import Document

if TYPE_CHECKING:
from google.cloud.datastore import Client, Entity


DATASTORE_TYPE = "datastore_type"
KEY = "key"
ENTITY = "entity"
GEOPOINT = "geopoint"


def convert_firestore_entity(
entity: Entity,
page_content_properties: List[str] = [],
metadata_properties: List[str] = [],
) -> Document:
data_entity = dict(entity.items())
metadata = {
"key": {
"path": entity.key.flat_path,
DATASTORE_TYPE: KEY,
}
}

set_page_properties = set(
page_content_properties or (data_entity.keys() - set(metadata_properties))
)
set_metadata_properties = set(
metadata_properties or (data_entity.keys() - set_page_properties)
)

page_content = {}

for k in sorted(set_metadata_properties):
if k in data_entity:
metadata[k] = _convert_from_firestore(data_entity[k])
for k in sorted(set_page_properties):
if k in data_entity:
print("--here")
page_content[k] = _convert_from_firestore(data_entity[k])
print(page_content)
print("--here")

if len(page_content) == 1:
page_content = str(page_content.popitem()[1]) # type: ignore
else:
page_content = json.dumps(page_content) # type: ignore

print("--after conversion hhere")
print(page_content)
print("--after conversion hhere")
doc = Document(page_content=page_content, metadata=metadata) # type: ignore
return doc


def convert_langchain_document(document: Document, client: Client) -> dict:
metadata = document.metadata.copy()
path = None
data = {}

if metadata.get("key", {}).get(DATASTORE_TYPE) == KEY:
path = metadata["key"]
metadata.pop("key")

if metadata:
data.update(_convert_from_langchain(metadata, client))

if document.page_content:
try:
content_dict = json.loads(document.page_content)
except (ValueError, SyntaxError):
content_dict = {"page_content": document.page_content}
converted_page = _convert_from_langchain(content_dict, client)
data.update(converted_page)

return {"key": path, "properties": data}


def _convert_from_firestore(val: Any) -> Any:
val_converted = val
if isinstance(val, dict):
val_converted = {k: _convert_from_firestore(v) for k, v in val.items()}
if isinstance(val, list):
val_converted = [_convert_from_firestore(v) for v in val]
elif isinstance(val, Key):
val_converted = {
"key": val.flat_path,
DATASTORE_TYPE: KEY,
}
elif isinstance(val, GeoPoint):
val_converted = {
"latitude": val.latitude,
"longitude": val.longitude,
DATASTORE_TYPE: GEOPOINT,
}
elif isinstance(val, Entity):
val_converted = {
"key": val.key.flat_path,
"properties": _convert_from_firestore(dict(val.items())),
DATASTORE_TYPE: ENTITY,
}

return val_converted


def _convert_from_langchain(val: Any, client: Client) -> Any:
val_converted = val
if isinstance(val, list):
val_converted = [_convert_from_langchain(v, client) for v in val]
elif isinstance(val, dict):
l = len(val)
if val.get(DATASTORE_TYPE) == KEY:
val_converted = client.key(*val["key"])
elif val.get(DATASTORE_TYPE) == GEOPOINT:
val_converted = GeoPoint(val["latitude"], val["longitude"])
elif val.get(DATASTORE_TYPE) == ENTITY:
key = client.key(*val["key"])
entity = client.entity(key)
entity.update(val["properties"])
val_converted = entity
else:
val_converted = {
k: _convert_from_langchain(v, client) for k, v in val.items()
}
return val_converted
Loading

0 comments on commit 28a9c01

Please sign in to comment.