

### Databricks Documentation RAG Ingestion Pipeline
### This notebook implements a high-quality RAG ingestion pipeline with:
### Data reading from Databricks tables
### Semantic chunking for context-aware splitting
### Metadata extraction for enhanced retrieval

#### Package Installation and Setup

In [0]:
# Install required packages
%pip install sentence-transformers scikit-learn nltk

# Download NLTK data
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')

# Restart Python to use updated packages
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


[nltk_data] Downloading package punkt to /home/spark-
[nltk_data]     aef36c0d-db65-497b-829a-20/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /home/spark-
[nltk_data]     aef36c0d-db65-497b-829a-20/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [0]:
# STEP 1: Pre-download the model (run this ONCE first)
from ingestion_pipeline import download_model
download_model()  # Downloads ~90MB, takes about 30 seconds

Downloading model: all-MiniLM-L6-v2
This may take a minute on first run (~90MB download)...


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

✓ Model 'all-MiniLM-L6-v2' downloaded and cached successfully!
  Max sequence length: 256
  Embedding dimension: 384


True

In [0]:
%pip install -r requirements.txt

Collecting nltk>=3.8.0 (from -r requirements.txt (line 5))
  Downloading nltk-3.9.2-py3-none-any.whl.metadata (3.2 kB)
Collecting sentence-transformers>=2.2.0 (from -r requirements.txt (line 6))
  Downloading sentence_transformers-5.2.0-py3-none-any.whl.metadata (16 kB)
Collecting transformers>=4.30.0 (from -r requirements.txt (line 13))
  Downloading transformers-4.57.3-py3-none-any.whl.metadata (43 kB)
Collecting torch>=2.0.0 (from -r requirements.txt (line 14))
  Downloading torch-2.9.1-cp312-cp312-manylinux_2_28_aarch64.whl.metadata (30 kB)
Collecting regex>=2021.8.3 (from nltk>=3.8.0->-r requirements.txt (line 5))
  Downloading regex-2025.11.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl.metadata (40 kB)
Collecting tqdm (from nltk>=3.8.0->-r requirements.txt (line 5))
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting huggingface-hub>=0.20.0 (from sentence-transformers>=2.2.0->-r requirements.txt (line 6))
  Downloading hu

In [0]:
from ingestion_pipeline import (
    read_databricks_docs,
    SemanticChunker,
    MetadataExtractor,
    process_document,
    process_all_documents,
    process_all_documents_v2
)

#### Step 1: Read Data from Databricks Table

In [0]:
# Read documentation data
docs_df = read_databricks_docs("databricks_databricks_documentation_dataset.v01.docs")
display(docs_df.limit(5))

Loaded 5169 documents from databricks_databricks_documentation_dataset.v01.docs


id,url,content
25277,https://docs.databricks.com/en/ingestion/bad-records.html,"Handle bad records and files Databricks provides a number of options for dealing with files that contain bad records. Examples of bad data include: Incomplete or corrupt records: Mainly observed in text based file formats like JSON and CSV. For example, a JSON record that doesn’t have a closing brace or a CSV record that doesn’t have as many columns as the header or first record of the CSV file. Mismatched data types: When the value for a column doesn’t have the specified or inferred data type. Bad field names: Can happen in all file formats, when the column name specified in the file or record has a different casing than the specified or inferred schema. Corrupted files: When a file cannot be read, which might be due to metadata or data corruption in binary file types such as Avro, Parquet, and ORC. On rare occasion, might be caused by long-lasting transient failures in the underlying storage system. Missing files: A file that was discovered during query analysis time and no longer exists at processing time. Use badRecordsPath Use badRecordsPath When you set badRecordsPath, the specified path records exceptions for bad records or files encountered during data loading. In addition to corrupt records and files, errors indicating deleted files, network connection exception, IO exception, and so on are ignored and recorded under the badRecordsPath. Note Using the badRecordsPath option in a file-based data source has a few important limitations: It is non-transactional and can lead to inconsistent results. Transient errors are treated as failures. Unable to find input file Unable to find input file val df = spark.read .option(""badRecordsPath"", ""/tmp/badRecordsPath"") .format(""parquet"").load(""/input/parquetFile"") // Delete the input parquet file '/input/parquetFile' dbutils.fs.rm(""/input/parquetFile"") df.show() In the above example, since df.show() is unable to find the input file, Spark creates an exception file in JSON format to record the error. For example, /tmp/badRecordsPath/20170724T101153/bad_files/xyz is the path of the exception file. This file is under the specified badRecordsPath directory, /tmp/badRecordsPath. 20170724T101153 is the creation time of this DataFrameReader. bad_files is the exception type. xyz is a file that contains a JSON record, which has the path of the bad file and the exception/reason message. Input file contains bad record Input file contains bad record // Creates a json file containing both parsable and corrupted records Seq(""""""{""a"": 1, ""b"": 2}"""""", """"""{bad-record"""""").toDF().write.format(""text"").save(""/tmp/input/jsonFile"") val df = spark.read .option(""badRecordsPath"", ""/tmp/badRecordsPath"") .schema(""a int, b int"") .format(""json"") .load(""/tmp/input/jsonFile"") df.show() In this example, the DataFrame contains only the first parsable record ({""a"": 1, ""b"": 2}). The second bad record ({bad-record) is recorded in the exception file, which is a JSON file located in /tmp/badRecordsPath/20170724T114715/bad_records/xyz. The exception file contains the bad record, the path of the file containing the record, and the exception/reason message. After you locate the exception files, you can use a JSON reader to process them."
25278,https://docs.databricks.com/en/ingestion/copy-into/configure-data-access.html,"Configure data access for ingestion This article describes how admin users can configure access to data in a bucket in Amazon S3 (S3) so that Databricks users can load data from S3 into a table in Databricks. This article describes the following ways to configure secure access to source data: (Recommended) Create a Unity Catalog volume. Create a Unity Catalog external location with a storage credential. Launch a compute resource that uses an AWS instance profile. Generate temporary credentials (an AWS access key ID, a secret key, and a session token). Before you begin Before you begin Before you configure access to data in S3, make sure you have the following: Data in an S3 bucket in your AWS account. To create a bucket, see Creating a bucket in the AWS documentation. To access data using a Unity Catalog volume (recommended), the READ VOLUME privilege on the volume. For more information, see Create and work with volumes and Unity Catalog privileges and securable objects. To access data using a Unity Catalog external location, the READ FILES privilege on the external location. For more information, see Create an external location to connect cloud storage to Databricks. To access data using a compute resource with an AWS instance profile, Databricks workspace admin permissions. A Databricks SQL warehouse. To create a SQL warehouse, see Create a SQL warehouse. Familiarity with the Databricks SQL user interface. Configure access to cloud storage Configure access to cloud storage Use one of the following methods to configure access to S3: (Recommended) Create a Unity Catalog volume. For more information, see Create and work with volumes. Configure a Unity Catalog external location with a storage credential. For more information about external locations, see Create an external location to connect cloud storage to Databricks. Configure a compute resource to use an AWS instance profile. For more information, see Configure a SQL warehouse to use an instance profile. Generate temporary credentials (an AWS access key ID, a secret key, and a session token) to share with other Databricks users. For more information, see Generate temporary credentials for ingestion. Clean up Clean up You can clean up the associated resources in your cloud account and Databricks if you no longer want to keep them. Delete the AWS CLI named profile In your ~/.aws/credentials file for Unix, Linux, and macOS, or in your %USERPROFILE%\.aws\credentials file for Windows, remove the following portion of the file, and then save the file: [] aws_access_key_id = aws_secret_access_key = Delete the IAM user Open the IAM console in your AWS account, typically at https://console.aws.amazon.com/iam. In the sidebar, click Users. Select the box next to the user, and then click Delete. Enter the name of the user, and then click Delete. Delete the IAM policy Open the IAM console in your AWS account, if it is not already open, typically at https://console.aws.amazon.com/iam. In the sidebar, click Policies. Select the option next to the policy, and then click Actions > Delete. Enter the name of the policy, and then click Delete. Delete the S3 bucket Open the Amazon S3 console in your AWS account, typically at https://console.aws.amazon.com/s3. Select the option next to the bucket, and then click Empty. Enter permanently delete, and then click Empty. In the sidebar, click Buckets. Select the option next to the bucket, and then click Delete. Enter the name of the bucket, and then click Delete bucket. Stop the SQL warehouse If you are not using the SQL warehouse for any other tasks, you should stop the SQL warehouse to avoid additional costs. In the SQL persona, on the sidebar, click SQL Warehouses. Next to the name of the SQL warehouse, click Stop. When prompted, click Stop again. Next steps Next steps After you complete the steps in this article, users can run the COPY INTO command to load the data from the S3 bucket into your Databricks workspace. To load data using a Unity Catalog volume or external location, see Load data using COPY INTO with Unity Catalog volumes or external locations. To load data using a SQL warehouse with an AWS instance profile, see Load data using COPY INTO with an instance profile. To load data using temporary credentials (an AWS access key ID, a secret key, and a session token), see Load data using COPY INTO with temporary credentials."
25279,https://docs.databricks.com/en/ingestion/copy-into/examples.html,"Common data loading patterns using COPY INTO Learn common patterns for using COPY INTO to load data from file sources into Delta Lake. There are many options for using COPY INTO. You can also use temporary credentials with COPY INTO in combination with these patterns. See COPY INTO for a full reference of all options. Create target tables for COPY INTO Create target tables for COPY INTO COPY INTO must target an existing Delta table. In Databricks Runtime 11.3 LTS and above, setting the schema for these tables is optional for formats that support schema evolution: CREATE TABLE IF NOT EXISTS my_table [(col_1 col_1_type, col_2 col_2_type, ...)] [COMMENT ] [TBLPROPERTIES ()]; Note that to infer the schema with COPY INTO, you must pass additional options: COPY INTO my_table FROM '/path/to/files' FILEFORMAT = FORMAT_OPTIONS ('inferSchema' = 'true') COPY_OPTIONS ('mergeSchema' = 'true'); The following example creates a schemaless Delta table called my_pipe_data and loads a pipe-delimited CSV with a header: CREATE TABLE IF NOT EXISTS my_pipe_data; COPY INTO my_pipe_data FROM 's3://my-bucket/pipeData' FILEFORMAT = CSV FORMAT_OPTIONS ('mergeSchema' = 'true', 'delimiter' = '|', 'header' = 'true') COPY_OPTIONS ('mergeSchema' = 'true'); Load JSON data with COPY INTO Load JSON data with COPY INTO The following example loads JSON data from five files in Amazon S3 (S3) into the Delta table called my_json_data. This table must be created before COPY INTO can be executed. If any data was already loaded from one of the files, the data isn’t reloaded for that file. COPY INTO my_json_data FROM 's3://my-bucket/jsonData' FILEFORMAT = JSON FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json') -- The second execution will not copy any data since the first command already loaded the data COPY INTO my_json_data FROM 's3://my-bucket/jsonData' FILEFORMAT = JSON FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json') Load Avro data with COPY INTO Load Avro data with COPY INTO The following example loads Avro data in S3 using additional SQL expressions as part of the SELECT statement. COPY INTO my_delta_table FROM (SELECT to_date(dt) dt, event as measurement, quantity::double FROM 's3://my-bucket/avroData') FILEFORMAT = AVRO Load CSV files with COPY INTO Load CSV files with COPY INTO The following example loads CSV files from S3 under s3://bucket/base/path/folder1 into a Delta table at s3://bucket/deltaTables/target. COPY INTO delta.`s3://bucket/deltaTables/target` FROM (SELECT key, index, textData, 'constant_value' FROM 's3://bucket/base/path') FILEFORMAT = CSV PATTERN = 'folder1/file_[a-g].csv' FORMAT_OPTIONS('header' = 'true') -- The example below loads CSV files without headers in S3 using COPY INTO. -- By casting the data and renaming the columns, you can put the data in the schema you want COPY INTO delta.`s3://bucket/deltaTables/target` FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData FROM 's3://bucket/base/path') FILEFORMAT = CSV PATTERN = 'folder1/file_[a-g].csv' Ignore corrupt files while loading data Ignore corrupt files while loading data If the data you’re loading can’t be read due to some corruption issue, those files can be skipped by setting ignoreCorruptFiles to true in the FORMAT_OPTIONS. The result of the COPY INTO command returns how many files were skipped due to corruption in the num_skipped_corrupt_files column. This metric also shows up in the operationMetrics column under numSkippedCorruptFiles after running DESCRIBE HISTORY on the Delta table. Corrupt files aren’t tracked by COPY INTO, so they can be reloaded in a subsequent run if the corruption is fixed. You can see which files are corrupt by running COPY INTO in VALIDATE mode. COPY INTO my_table FROM '/path/to/files' FILEFORMAT = [VALIDATE ALL] FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true') Note ignoreCorruptFiles is available in Databricks Runtime 11.3 LTS and above."
25280,https://docs.databricks.com/en/ingestion/copy-into/generate-temporary-credentials.html,"Generate temporary credentials for ingestion This article describes how to create an IAM user in your AWS account that has just enough access to read data in an Amazon S3 (S3) bucket. Create an IAM policy Create an IAM policy Open the AWS IAM console in your AWS account, typically at https://console.aws.amazon.com/iam. Click Policies. Click Create Policy. Click the JSON tab. Replace the existing JSON code with the following code. In the code, replace: with the name of your S3 bucket. with the name of the folder within your S3 bucket. { ""Version"": ""2012-10-17"", ""Statement"": [ { ""Sid"": ""ReadOnlyAccessToTrips"", ""Effect"": ""Allow"", ""Action"": [ ""s3:GetObject"", ""s3:ListBucket"" ], ""Resource"": [ ""arn:aws:s3:::"", ""arn:aws:s3::://*"" ] } ] } Click Next: Tags. Click Next: Review. Enter a name for the policy and click Create policy. Create an IAM user Create an IAM user In the sidebar, click Users. Click Add users. Enter a name for the user. Select the Access key - Programmatic access box, and then click Next: Permissions. Click Attach existing policies directly. Select the box next to the policy, and then click Next: Tags. Click Next: Review. Click Create user. Copy the Access key ID and Secret access key values that appear to a secure location, as you need them to get the AWS STS session token. Create a named profile Create a named profile On your local development machine, use the AWS CLI to create a named profile with the AWS credentials that you copied in the previous step. See Named profiles for the AWS CLI on the AWS website. Test your AWS credentials. To do this, use the AWS CLI to run the following command, which displays the contents of the folder that contains your data. In the command, replace: with the name of your S3 bucket. with the name of the folder within your S3 bucket. with the name of your named profile. aws s3 ls s3://// --profile To get the session token, run the following command: aws sts get-session-token --profile Replace with the name of your named profile. Copy the AccessKeyId, SecretAccessKey, and SessionToken values that appear to a secure location."
25281,https://docs.databricks.com/en/ingestion/copy-into/index.html,"Get started using COPY INTO to load data The COPY INTO SQL command lets you load data from a file location into a Delta table. This is a re-triable and idempotent operation; files in the source location that have already been loaded are skipped. COPY INTO offers the following capabilities: Easily configurable file or directory filters from cloud storage, including S3, ADLS Gen2, ABFS, GCS, and Unity Catalog volumes. Support for multiple source file formats: CSV, JSON, XML, Avro, ORC, Parquet, text, and binary files Exactly-once (idempotent) file processing by default Target table schema inference, mapping, merging, and evolution Note For a more scalable and robust file ingestion experience, Databricks recommends that SQL users leverage streaming tables. See Load data using streaming tables in Databricks SQL. Warning COPY INTO respects the workspace setting for deletion vectors. If enabled, deletion vectors are enabled on the target table when COPY INTO runs on a SQL warehouse or compute running Databricks Runtime 14.0 or above. Once enabled, deletion vectors block queries against a table in Databricks Runtime 11.3 LTS and below. See What are deletion vectors? and Auto-enable deletion vectors. Requirements Requirements An account admin must follow the steps in Configure data access for ingestion to configure access to data in cloud object storage before users can load data using COPY INTO. Example: Load data into a schemaless Delta Lake table Example: Load data into a schemaless Delta Lake table Note This feature is available in Databricks Runtime 11.3 LTS and above. You can create empty placeholder Delta tables so that the schema is later inferred during a COPY INTO command by setting mergeSchema to true in COPY_OPTIONS: CREATE TABLE IF NOT EXISTS my_table [COMMENT ] [TBLPROPERTIES ()]; COPY INTO my_table FROM '/path/to/files' FILEFORMAT = FORMAT_OPTIONS ('mergeSchema' = 'true') COPY_OPTIONS ('mergeSchema' = 'true'); The SQL statement above is idempotent and can be scheduled to run to ingest data exactly-once into a Delta table. Note The empty Delta table is not usable outside of COPY INTO. INSERT INTO and MERGE INTO are not supported to write data into schemaless Delta tables. After data is inserted into the table with COPY INTO, the table becomes queryable. See Create target tables for COPY INTO. Example: Set schema and load data into a Delta Lake table"


#### Step 2: Test Semantic Chunking on a Sample Document

In [0]:
# Initialize semantic chunker
chunker = SemanticChunker(
    model_name='all-MiniLM-L6-v2',
    similarity_threshold=0.5,
    min_chunk_size=200,
    max_chunk_size=1000,
    overlap_sentences=2
)

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [0]:
# Test on a sample document
sample_doc = docs_df.first()
chunks = chunker.chunk_text(sample_doc['content'])


print(f"Created {len(chunks)} semantic chunks from sample document")
print("\nChunk Statistics:")
for i, chunk in enumerate(chunks[:3]):
    print(f"\nChunk {i+1}:")
    print(f"  - Characters: {chunk.char_count}")
    print(f"  - Sentences: {chunk.sentence_count}")
    print(f"  - Preview: {chunk.text[:200]}...")

Created 11 semantic chunks from sample document

Chunk Statistics:

Chunk 1:
  - Characters: 403
  - Sentences: 3
  - Preview: Handle bad records and files  
Databricks provides a number of options for dealing with files that contain bad records. Examples of bad data include:  
Incomplete or corrupt records: Mainly observed i...

Chunk 2:
  - Characters: 799
  - Sentences: 6
  - Preview: Examples of bad data include:  
Incomplete or corrupt records: Mainly observed in text based file formats like JSON and CSV. For example, a JSON record that doesn’t have a closing brace or a CSV recor...

Chunk 3:
  - Characters: 361
  - Sentences: 3
  - Preview: Corrupted files: When a file cannot be read, which might be due to metadata or data corruption in binary file types such as Avro, Parquet, and ORC. On rare occasion, might be caused by long-lasting tr...


In [0]:
chunks

[SemanticChunk(text='Handle bad records and files  \nDatabricks provides a number of options for dealing with files that contain bad records. Examples of bad data include:  \nIncomplete or corrupt records: Mainly observed in text based file formats like JSON and CSV. For example, a JSON record that doesn’t have a closing brace or a CSV record that doesn’t have as many columns as the header or first record of the CSV file.', start_idx=0, end_idx=403, sentence_count=3, char_count=403),
 SemanticChunk(text='Examples of bad data include:  \nIncomplete or corrupt records: Mainly observed in text based file formats like JSON and CSV. For example, a JSON record that doesn’t have a closing brace or a CSV record that doesn’t have as many columns as the header or first record of the CSV file. Mismatched data types: When the value for a column doesn’t have the specified or inferred data type. Bad field names: Can happen in all file formats, when the column name specified in the file or record has

Step 3: Test Metadata Extraction

In [0]:
# Initialize metadata extractor
extractor = MetadataExtractor()

# Extract metadata from sample document
metadata = extractor.extract_metadata(
    doc_id=sample_doc['id'],
    url=sample_doc['url'],
    content=sample_doc['content']
)

print("Extracted Metadata:")
print(f"  Title: {metadata['title']}")
print(f"  Document Type: {metadata['document_type']}")
print(f"  URL Category: {metadata['url_category']}")
print(f"  URL Path: {metadata['url_path']}")
print(f"\nHeaders ({len(metadata['headers'])}):")
for header in metadata['headers'][:5]:
    print(f"  {'#' * header['level']} {header['text']}")

print(f"\nCode Blocks: {len(metadata['code_blocks'])}")
for i, cb in enumerate(metadata['code_blocks'][:3]):
    print(f"  Block {i+1}: {cb['language']} ({cb['length']} chars)")

print(f"\nLinks: {len(metadata['links'])}")
for link in metadata['links'][:5]:
    print(f"  - {link['text']}: {link['url']}")

print(f"\nKeywords:")
for key, values in metadata['keywords'].items():
    print(f"  {key}: {values[:5]}")

print(f"\nStatistics:")
for key, value in metadata['statistics'].items():
    print(f"  {key}: {value}")



Extracted Metadata:
  Title: None
  Document Type: general
  URL Category: ingestion
  URL Path: bad-records.html

Headers (0):

Code Blocks: 0

Links: 0

Keywords:
  sql_keywords: ['Delete']
  databricks_objects: ['schema']
  api_endpoints: []

Statistics:
  char_count: 3224
  word_count: 465
  line_count: 25
  header_count: 0
  code_block_count: 0
  link_count: 0


#### Step 4: Process Single Document with Both Functions

In [0]:
processed_chunks = process_document(
    doc_id=sample_doc['id'],
    url=sample_doc['url'],
    content=sample_doc['content'],
    chunker=chunker,
    extractor=extractor
)

print(f"Processed document into {len(processed_chunks)} chunks with metadata")
print(f"\nFirst chunk details:")
chunk = processed_chunks[0]
for key, value in chunk.items():
    if key != 'text':  # Skip full text for brevity
        print(f"  {key}: {value}")
print(f"\nText preview: {chunk['text'][:300]}...")

Processed document into 11 chunks with metadata

First chunk details:
  chunk_id: 25277_chunk_0
  doc_id: 25277
  url: https://docs.databricks.com/en/ingestion/bad-records.html
  chunk_index: 0
  total_chunks: 11
  char_count: 403
  sentence_count: 3
  start_idx: 0
  end_idx: 403
  doc_title: None
  doc_type: general
  url_category: ingestion
  url_path: bad-records.html
  has_code: False
  keywords: {'sql_keywords': ['Delete'], 'databricks_objects': ['schema'], 'api_endpoints': []}

Text preview: Handle bad records and files  
Databricks provides a number of options for dealing with files that contain bad records. Examples of bad data include:  
Incomplete or corrupt records: Mainly observed in text based file formats like JSON and CSV. For example, a JSON record that doesn’t have a closing ...


#### Step 5: Process All Documents (Batch Processing)

In [0]:
from ingestion_pipeline import process_all_documents

docs_df = spark.table("databricks_databricks_documentation_dataset.v01.docs")

chunks_df = process_all_documents(
    docs_df=docs_df,
    output_table="default.databricks_docs_processed_chunks",
    batch_size=100
)

Input: 5169 documents
Initializing models in driver...
Loading sentence-transformers model: all-MiniLM-L6-v2
✓ Model loaded successfully
✓ Models initialized successfully

Processing 5169 documents in batches of 100...

Batch 1/52: Processing 100 documents...
  Doc 22693: Processing 2872 chars...
  Doc 22693: Generated 11 chunks
  Doc 22694: Processing 4557 chars...
  Doc 22694: Generated 16 chunks
  Doc 22695: Processing 3147 chars...
  Doc 22695: Generated 12 chunks
  Doc 22696: Processing 3171 chars...
  Doc 22696: Generated 16 chunks
  Doc 22697: Processing 2485 chars...
  Doc 22697: Generated 11 chunks
  Doc 22698: Processing 2645 chars...
  Doc 22698: Generated 10 chunks
  Doc 22699: Processing 1816 chars...
  Doc 22699: Generated 8 chunks
  Doc 22700: Processing 2252 chars...
  Doc 22700: Generated 14 chunks
  Doc 22701: Processing 4250 chars...
  Doc 22701: Generated 16 chunks
  Doc 22702: Processing 3825 chars...
  Doc 22702: Generated 15 chunks
  Doc 22703: Processing 3556 ch

#### Step 6: Analyze Results

In [0]:
# Display processing statistics
print("Document Type Distribution:")
chunks_df.groupBy('doc_type').count().orderBy('count', ascending=False).show()


print("Chunk Size Statistics:")
chunks_df.select('char_count', 'sentence_count').describe().show()


# Display sample chunks
print("Sample Processed Chunks:")
display(chunks_df.limit(10))

Document Type Distribution:


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-5650384099983296>, line 3[0m
[1;32m      1[0m [38;5;66;03m# Display processing statistics[39;00m
[1;32m      2[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mDocument Type Distribution:[39m[38;5;124m"[39m)
[0;32m----> 3[0m chunks_df[38;5;241m.[39mgroupBy([38;5;124m'[39m[38;5;124mdoc_type[39m[38;5;124m'[39m)[38;5;241m.[39mcount()[38;5;241m.[39morderBy([38;5;124m'[39m[38;5;124mcount[39m[38;5;124m'[39m, ascending[38;5;241m=[39m[38;5;28;01mFalse[39;00m)[38;5;241m.[39mshow()
[1;32m      6[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mChunk Size Statistics:[39m[38;5;124m"[39m)
[1;32m      7[0m chunks_df[38;5;241m.[39mselect([38;5;124m'[39m[38;5;124mchar_count[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124msentence_count[39m[38;5;124m'[

#### Testing scripts

In [0]:
"""
Simple test to verify chunking works without all the complexity.
Run this in your Databricks notebook to test if the basic chunking logic works.
"""

# Test 1: Can we create chunks with simple text splitting?
print("=" * 70)
print("TEST 1: Simple text splitting (no ML model)")
print("=" * 70)

def simple_chunk(text, chunk_size=500):
    """Simple chunking - split every N characters."""
    chunks = []
    for i in range(0, len(text), chunk_size):
        chunks.append({
            'chunk_id': f'chunk_{i}',
            'text': text[i:i+chunk_size],
            'char_count': len(text[i:i+chunk_size])
        })
    return chunks

# Get a sample document
sample = spark.table("databricks_databricks_documentation_dataset.v01.docs").limit(1).collect()[0]
print(f"Sample doc ID: {sample['id']}")
print(f"Content length: {len(sample['content'])} chars")

simple_chunks = simple_chunk(sample['content'])
print(f"✓ Simple chunking created {len(simple_chunks)} chunks")

# Test 2: Can we use the SemanticChunker?
print("\n" + "=" * 70)
print("TEST 2: Semantic chunking with sentence-transformers")
print("=" * 70)

try:
    from ingestion_pipeline import SemanticChunker
    
    print("Creating SemanticChunker...")
    chunker = SemanticChunker(
        similarity_threshold=0.5,
        min_chunk_size=200,
        max_chunk_size=1000,
        overlap_sentences=2
    )
    print("✓ SemanticChunker created")
    
    print(f"Chunking sample document...")
    semantic_chunks = chunker.chunk_text(sample['content'])
    print(f"✓ Semantic chunking created {len(semantic_chunks)} chunks")
    
    if len(semantic_chunks) > 0:
        print(f"\nFirst chunk preview:")
        print(f"  Text length: {semantic_chunks[0].char_count} chars")
        print(f"  Sentences: {semantic_chunks[0].sentence_count}")
        print(f"  Preview: {semantic_chunks[0].text[:150]}...")
    else:
        print("⚠️ WARNING: Semantic chunking returned 0 chunks!")
        print("This is the problem - let's debug why...")
        
        # Debug: Check sentence splitting
        sentences = chunker._split_into_sentences(sample['content'])
        print(f"\nDebug info:")
        print(f"  Total sentences: {len(sentences)}")
        if len(sentences) > 0:
            print(f"  First sentence: {sentences[0][:100]}...")
        
except Exception as e:
    print(f"✗ Error: {e}")
    import traceback
    traceback.print_exc()

print("\n" + "=" * 70)
print("TEST COMPLETE")
print("=" * 70)


TEST 1: Simple text splitting (no ML model)
Sample doc ID: 22693
Content length: 2872 chars
✓ Simple chunking created 6 chunks

TEST 2: Semantic chunking with sentence-transformers
Creating SemanticChunker...
Loading sentence-transformers model: all-MiniLM-L6-v2
✓ Model loaded successfully
✓ SemanticChunker created
Chunking sample document...
✓ Semantic chunking created 11 chunks

First chunk preview:
  Text length: 244 chars
  Sentences: 1
  Preview: Enable authentication to external Databricks services  
Databricks administrators can enable users to authenticate directly to external Databricks ser...

TEST COMPLETE


In [0]:
"""
Test the full process_document function to see if it works end-to-end.
"""

print("=" * 70)
print("TEST: Full process_document function")
print("=" * 70)

from ingestion_pipeline import SemanticChunker, MetadataExtractor, process_document

# Get sample document
sample = spark.table("databricks_databricks_documentation_dataset.v01.docs").limit(1).collect()[0]

print(f"Sample doc ID: {sample['id']}")
print(f"Content length: {len(sample['content'])} chars")

# Initialize chunker and extractor
print("\nInitializing models...")
chunker = SemanticChunker(
    similarity_threshold=0.5,
    min_chunk_size=200,
    max_chunk_size=1000,
    overlap_sentences=2
)
extractor = MetadataExtractor()
print("✓ Models initialized")

# Process the document
print("\nProcessing document...")
chunks = process_document(
    str(sample['id']),
    str(sample['url']),
    str(sample['content']),
    chunker,
    extractor
)

print(f"✓ Generated {len(chunks)} chunks")

if len(chunks) > 0:
    print(f"\nFirst chunk structure:")
    first_chunk = chunks[0]
    for key, value in first_chunk.items():
        if isinstance(value, str) and len(value) > 100:
            print(f"  {key}: {value[:100]}... ({len(value)} chars)")
        else:
            print(f"  {key}: {value}")
    
    # Try to create a DataFrame from chunks
    print("\nTrying to create Spark DataFrame...")
    try:
        chunks_df = spark.createDataFrame(chunks)
        print(f"✓ DataFrame created with {chunks_df.count()} rows")
        print("\nDataFrame schema:")
        chunks_df.printSchema()
        print("\nSample row:")
        chunks_df.show(1, truncate=50, vertical=True)
    except Exception as e:
        print(f"✗ Error creating DataFrame: {e}")
        import traceback
        traceback.print_exc()
else:
    print("⚠️ No chunks generated!")

print("\n" + "=" * 70)
print("TEST COMPLETE")
print("=" * 70)


TEST: Full process_document function
Sample doc ID: 22693
Content length: 2872 chars

Initializing models...
Loading sentence-transformers model: all-MiniLM-L6-v2
✓ Model loaded successfully
✓ Models initialized

Processing document...
✓ Generated 11 chunks

First chunk structure:
  chunk_id: 22693_chunk_0
  doc_id: 22693
  url: https://docs.databricks.com/en/admin/access-control/auth-external.html
  chunk_index: 0
  total_chunks: 11
  text: Enable authentication to external Databricks services  
Databricks administrators can enable users t... (243 chars)
  char_count: 244
  sentence_count: 1
  doc_title: 
  doc_type: general
  url_category: access-control
  url_path: auth-external.html
  has_code: false
  keywords: {"sql_keywords": ["from"], "databricks_objects": ["workspace"], "api_endpoints": []}

Trying to create Spark DataFrame...
✓ DataFrame created with 11 rows

DataFrame schema:
root
 |-- char_count: long (nullable = true)
 |-- chunk_id: string (nullable = true)
 |-- chunk_inde