Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

add rem loader for processing rem ingested texts #423

Merged
merged 1 commit into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions RAGchain/preprocess/loader/rem_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from datetime import datetime
from typing import List, Iterator, Optional

import pytz
from langchain.document_loaders.base import BaseLoader
from langchain.schema import Document

from RAGchain.utils.util import FileChecker


class RemLoader(BaseLoader):
"""
Load rem storage file from rem sqlite database.
You can set time range to load.
"""

def __init__(self, path: str, time_range: Optional[List[datetime]] = None):
"""
:param path: rem sqlite database file path
:param time_range: time range to load. If None, load all data. We recommend set time range.
It will be slow when you try to load all data from once. Default is None.
"""
self.path = path
if not FileChecker(self.path).check_type(file_type='.sqlite3').is_exist():
raise ValueError(f"{self.path} is not sqlite3 file or do not exist.")
import sqlite3
self.conn = sqlite3.connect(path)
self.time_range = time_range if time_range is not None else [datetime(1970, 1, 1), datetime.now()]
self.__preprocess_time_range()
assert len(self.time_range) == 2, "time_range must be list of datetime with length 2"

def lazy_load(self) -> Iterator[Document]:
query = f"""
SELECT allText.text, frames.timestamp
FROM allText
JOIN frames ON allText.frameId = frames.id
WHERE frames.timestamp BETWEEN '{self.time_range[0]}' AND '{self.time_range[1]}'
"""
cur = self.conn.cursor()
cur.execute(query)
for row in cur.fetchall():
yield Document(page_content=row[0],
metadata={
"source": self.path,
"content_datetime": datetime.strptime(row[1], '%Y-%m-%dT%H:%M:%S.%f'),
})

def load(self) -> List[Document]:
return list(self.lazy_load())

def __preprocess_time_range(self):
for i, time in enumerate(self.time_range):
alter_time = time.astimezone(pytz.UTC)
self.time_range[i] = alter_time.strftime('%Y-%m-%dT%H:%M:%S.%f')
28 changes: 28 additions & 0 deletions tests/RAGchain/preprocess/loader/test_rem_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os
import pathlib
from datetime import datetime

import pytest

from RAGchain.preprocess.loader.rem_loader import RemLoader

root_dir = pathlib.PurePath(os.path.dirname(os.path.realpath(__file__))).parent.parent.parent
rem_path = os.path.join(root_dir, "resources", "rem_sample.sqlite3")


@pytest.fixture
def rem_loader():
loader = RemLoader(rem_path)
time_range_loader = RemLoader(rem_path, time_range=[datetime(2023, 12, 31, 15, 9, 0), datetime.now()])
yield loader, time_range_loader


def test_rem_loader(rem_loader):
result = rem_loader[0].load()
assert len(result) == 39

result = rem_loader[1].load()
assert len(result) == 5

assert bool(result[0].page_content) is True
assert isinstance(result[0].metadata['content_datetime'], datetime)
Binary file added tests/resources/rem_sample.sqlite3
Binary file not shown.