### Get credentials from environmet

In [None]:
import os

from pydantic import BaseModel, SecretStr


class EnvData(BaseModel):
    server_hostname: str
    http_path: str
    access_token: SecretStr

def get_env_data() -> EnvData:
    return EnvData(
        server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"],
        http_path=os.environ["DATABRICKS_HTTP_PATH"],
        access_token=os.environ["DATABRICKS_ACCESS_TOKEN"],
    )

In [None]:
get_env_data()

### Set any reusable constants

In [None]:
CATALOG = os.getenv("DATABRICKS_CATALOG", "utic-dev-tech-fixtures")
TABLE = os.getenv("DATABRICKS_TABLE", "elements")

### Create reusable connection to databricks

In [None]:
from contextlib import contextmanager

from databricks.sql import connect
from databricks.sql.client import Connection as DeltaTableConnection
from databricks.sql.client import Cursor as DeltaTableCursor


@contextmanager
def get_connection() -> DeltaTableConnection:
    env_data = get_env_data()
    with connect(
        server_hostname=env_data.server_hostname,
        http_path=env_data.http_path,
        access_token=env_data.access_token.get_secret_value(),
    ) as connection:
        yield connection

@contextmanager
def get_cursor() -> DeltaTableCursor:
    with get_connection() as connection:
        with connection.cursor() as cursor:
            yield cursor

### Reset database 
This makes sure we're using a fresh instance by dropping it if it already exists and recreating it.

In [None]:
from pathlib import Path


def get_schema_init_string() -> str:
    schema_path = Path("test/integration/connectors/env_setup/sql/databricks_delta_tables/destination/schema.sql") # noqa: E501
    with schema_path.open() as f:
        data_lines = f.readlines()
    data = "".join([line.strip() for line in data_lines])
    return data

def reset():
    table_init_string = get_schema_init_string()
    with get_cursor() as cursor:
        cursor.execute(f"USE CATALOG '{CATALOG}'")
        cursor.execute(f"DROP TABLE IF EXISTS {TABLE}")
        cursor.execute(table_init_string)

### Reading utilities

In [None]:
def get_count() -> int:
    with get_cursor() as cursor:
        cursor.execute(f"USE CATALOG '{CATALOG}'")
        cursor.execute(f"SELECT COUNT(*) FROM {TABLE}")
        count = cursor.fetchone()[0]
        return count

def get_content(limit : int = 10) -> list:
    with get_cursor() as cursor:
        cursor.execute(f"USE CATALOG '{CATALOG}'")
        cursor.execute(f"SELECT * FROM {TABLE} LIMIT {limit}")
        results = cursor.fetchall()
        return results

### Run pipeline

In [None]:
# Make sure PYTHONPATH is set
import sys

base_path = Path(os.getcwd())
project_dir = base_path.parents[3].as_posix()
if project_dir not in sys.path:
    sys.path.append(project_dir)
sys.path

In [None]:
import os
from pathlib import Path

from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.connectors.databricks.volumes_table import (
    CONNECTOR_TYPE,
    DatabricksDeltaTablesConnectionConfig,
    DatabricksDeltaTablesUploadStagerConfig,
    DatabricksVolumeDeltaTableUploaderConfig,
)
from unstructured_ingest.v2.processes.connectors.local import (
    LocalConnectionConfig,
    LocalDownloaderConfig,
    LocalIndexerConfig,
)
from unstructured_ingest.v2.processes.connectors.sql.databricks_delta_tables import (
    DatabricksDeltaTablesAccessConfig,
)
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig

base_path = Path(os.getcwd()).parents[3]
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest" / CONNECTOR_TYPE
output_path = work_dir / "output"
download_path = work_dir / "download"

env_data = get_env_data()

Pipeline.from_configs(
    context=ProcessorConfig(work_dir=str(work_dir.resolve())),
    indexer_config=LocalIndexerConfig(
        input_path=str(docs_path.resolve()) + "/book-war-and-peace-1p.txt"
    ),
    downloader_config=LocalDownloaderConfig(download_dir=download_path),
    source_connection_config=LocalConnectionConfig(),
    partitioner_config=PartitionerConfig(strategy="fast"),
    chunker_config=ChunkerConfig(chunking_strategy="by_title"),
    embedder_config=EmbedderConfig(embedding_provider="huggingface"),
    destination_connection_config=DatabricksDeltaTablesConnectionConfig(
        access_config=DatabricksDeltaTablesAccessConfig(
            token=env_data.access_token.get_secret_value()
        ),
        http_path=env_data.http_path,
        server_hostname=env_data.server_hostname,
    ),
    stager_config=DatabricksDeltaTablesUploadStagerConfig(),
    uploader_config=DatabricksVolumeDeltaTableUploaderConfig(
        catalog=CATALOG,
        database="default",
        table_name=TABLE,
        volume="test-platform",
        volume_path="test-roman",
    ),
).run()

### Validate upload

**Count:**

In [None]:
get_count()

**Content:**