## Prerequisites

* Install Docker to use [SingleStore DB Dev Container](https://github.com/singlestore-labs/singlestoredb-dev-image)

In [4]:
# !pip install -q -U "unstructured[s3, pdf, singlestore, embed-huggingface]" 

[0mCollecting unstructured[embed-huggingface,pdf,s3]
  Obtaining dependency information for unstructured[embed-huggingface,pdf,s3] from https://files.pythonhosted.org/packages/62/e2/4356f12efd277fac39e80dfe1e00c9e9798ea9ebb6159acb0ec6f5af938b/unstructured-0.14.9-py3-none-any.whl.metadata
  Downloading unstructured-0.14.9-py3-none-any.whl.metadata (28 kB)
Collecting chardet (from unstructured[embed-huggingface,pdf,s3])
  Obtaining dependency information for chardet from https://files.pythonhosted.org/packages/38/6f/f5fbc992a329ee4e0f288c1fe0e2ad9485ed064cac731ed2fe47dcc38cbf/chardet-5.2.0-py3-none-any.whl.metadata
  Using cached chardet-5.2.0-py3-none-any.whl.metadata (3.4 kB)
Collecting filetype (from unstructured[embed-huggingface,pdf,s3])
  Obtaining dependency information for filetype from https://files.pythonhosted.org/packages/18/79/1b8fa1bb3568781e84c9200f951c735f3f157429f44be0495da55894d620/filetype-1.2.0-py2.py3-none-any.whl.metadata
  Using cached filetype-1.2.0-py2.p

In [27]:
import subprocess

schema_path = "/Users/mk/PycharmProjects/singlestore-demo/schema.sql"
password = "pwd"

command = [
    "docker", "run", "-d", "--name", "singlestoredb-dev",
    "-e", f'ROOT_PASSWORD={password}',
    "--platform", "linux/amd64",
    "-p", "3306:3306", "-p", "8080:8080", "-p", "9000:9000",
    "-v", f"{schema_path}:/init.sql",
    "ghcr.io/singlestore-labs/singlestoredb-dev:latest",
]

process = subprocess.Popen(command, stdout=subprocess.PIPE)
output, error = process.communicate()

if process.returncode == 0:
    print('Command executed successfully. Output:')
    print(output.decode())
else:
    print('Command failed. Error:')
    print(error.decode())

Command executed successfully. Output:
5a3518036f1d1936efe9145165ff52d2478ac9c49c1c471adecb3726d5adbac2



In [29]:
!docker ps

CONTAINER ID   IMAGE                                               COMMAND               CREATED          STATUS                    PORTS                                                                    NAMES
5a3518036f1d   ghcr.io/singlestore-labs/singlestoredb-dev:latest   "/scripts/start.sh"   18 seconds ago   Up 17 seconds (healthy)   0.0.0.0:3306->3306/tcp, 0.0.0.0:8080->8080/tcp, 0.0.0.0:9000->9000/tcp   singlestoredb-dev


In [25]:
# Alternative 
# !docker compose -f docker-compose.yml up -d --wait-timeout 60

[1A[1B[0G[?25l[+] Running 0/0
 [33m⠋[0m wait Pulling                                                            [34m0.1s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠙[0m wait Pulling                                                            [34m0.2s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠹[0m wait Pulling                                                            [34m0.3s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠸[0m wait Pulling                                                            [34m0.4s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠼[0m wait Pulling                                                            [34m0.5s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠴[0m wait Pulling                                                            [34m0.6s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠦[0m wait Pulling                                                            [34m0.7s [0m
[?25h[1A[1A[0G

In [8]:
import os
import dotenv

dotenv.load_dotenv('.env')

True

In [9]:
from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.fsspec.s3 import (
    S3ConnectionConfig,
    S3DownloaderConfig,
    S3IndexerConfig,
    S3AccessConfig,
)
from unstructured.ingest.v2.processes.connectors.singlestore import (
    SingleStoreAccessConfig,
    SingleStoreConnectionConfig,
    SingleStoreUploaderConfig,
    SingleStoreUploadStagerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig


In [30]:
pipeline = Pipeline.from_configs(
    
    context=ProcessorConfig(
        verbose=True,
        tqdm=True,
        num_processes=20,
    ),
    
    indexer_config=S3IndexerConfig(remote_url=os.getenv("AWS_S3_NAME")),
    downloader_config=S3DownloaderConfig(),
    source_connection_config=S3ConnectionConfig(
        access_config=S3AccessConfig(
            key=os.getenv("AWS_KEY"),
            secret=os.getenv("AWS_SECRET"))
    ),
    
    partitioner_config=PartitionerConfig(
        partition_by_api=True,
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        partition_endpoint=os.getenv("UNSTRUCTURED_URL"),
    ),
    
    chunker_config=ChunkerConfig(
        chunking_strategy="by_title",
        chunk_max_characters=512,
        chunk_combine_text_under_n_chars=200,
    ),
    
    embedder_config=EmbedderConfig(
        embedding_provider="langchain-huggingface",
        embedding_model_name="BAAI/bge-base-en-v1.5",
    ),
    
    destination_connection_config=SingleStoreConnectionConfig(
        access_config=SingleStoreAccessConfig(password=password),
        host="localhost",
        port=3306,
        database="ingest_test",
        user="root",
    ),
    stager_config=SingleStoreUploadStagerConfig(),
    uploader_config=SingleStoreUploaderConfig(table_name="elements"), 
)

pipeline.run()

2024-07-03 10:43:27,694 MainProcess INFO     Created index with configs: {"remote_url": "s3://marias-rag-demo/", "protocol": "s3", "path_without_protocol": "marias-rag-demo/", "supported_protocols": ["s3", "s3a", "abfs", "az", "gs", "gcs", "box", "dropbox", "sftp"], "recursive": false, "file_glob": null}, connection configs: {"access_config": "***REDACTED***", "connector_type": "s3", "supported_protocols": ["s3", "s3a"], "endpoint_url": null, "anonymous": false}
2024-07-03 10:43:27,696 MainProcess INFO     Created download with configs: {"download_dir": null}, connection configs: {"access_config": "***REDACTED***", "connector_type": "s3", "supported_protocols": ["s3", "s3a"], "endpoint_url": null, "anonymous": false}
2024-07-03 10:43:27,696 MainProcess INFO     Created partition with configs: {"strategy": "auto", "ocr_languages": null, "encoding": null, "additional_partition_args": null, "skip_infer_table_types": null, "fields_include": ["element_id", "text", "type", "metadata", "embed

## Test outputs

In [3]:
import singlestoredb as s2
from singlestoredb.connection import Connection

def get_connection(
    host: str = None, port: int = None, database: str = None, user: str = None, password: str = None
) -> Connection:
    conn = s2.connect(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
    )
    return conn


def validate(table_name: str, conn: Connection, num_elements: int):
    with conn.cursor() as cur:
        stmt = f"select * from {table_name}"
        count = cur.execute(stmt)
        assert (
            count == num_elements
        ), f"found count ({count}) doesn't match expected value: {num_elements}"
    print("validation successful")


def run_validation(
    host: str,
    port: int,
    user: str,
    database: str,
    password: str,
    table_name: str,
    num_elements: int,
):
    print(f"Validating that table {table_name} in database {database} has {num_elements} entries")
    conn = get_connection(host=host, port=port, database=database, user=user, password=password)
    validate(table_name=table_name, conn=conn, num_elements=num_elements)


run_validation(
    host = "localhost",
    port = 3306,
    user = "root",
    database = "ingest_test",
    password = "pwd",
    table_name = "elements",
    num_elements = 345,
)

Validating that table elements in database ingest_test has 345 entries
validation successful


## Retrieval

In [86]:
from sentence_transformers import SentenceTransformer
import json

def get_embedding(query):
    model = SentenceTransformer("BAAI/bge-base-en-v1.5")
    return model.encode(query, normalize_embeddings=True)

def retrieve_documents(conn: Connection, query: str, num_results: int = 5):

    embedding = get_embedding(query)
    embedding_list = embedding.tolist()
    embedding_json = json.dumps(embedding_list)
    
    with conn.cursor() as cur:        
                
        stmt = """
            SELECT
                text,
                filename,
                DOT_PRODUCT(embeddings, JSON_ARRAY_PACK_F32(%s)) AS score
            FROM elements
            ORDER BY score DESC 
            LIMIT %s
        """

        cur.execute(stmt, [embedding_json, num_results])

        results = cur.fetchall()
        
    return results

In [87]:
conn = get_connection(host="localhost", port=3306, database="ingest_test", user="root", password="pwd")
retrieve_documents(conn, "pest control through mating disruption pheromones")

[('Controlling pest insects is a challenge of main importance to preserve crop pro- duction. In the context of Integrated Pest Management (IPM) programs, we develop a generic model to study the impact of mating disruption control using an artiﬁcial female pheromone to confuse males and adversely aﬀect their mating opportunities. Consequently the reproduction rate is diminished leading to a decline in the population size. For more eﬃcient control, trapping is used to capture the males attracted to the artiﬁcial',
  '1608.04880v1.pdf',
  0.8843122720718384),
 ('In order to maintain the pest population to a low level, we consider a control using female- pheromone-traps to disrupt male mating behaviour. More precisely, we take into account two aspects for the control. The ﬁrst aspect consists of disturbing the mating between males and females to reduce the fertilisation opportunities, which in turn, reduces the number of oﬀspring. This is done using traps that are releasing a female pherom