# Intermediate ETL with Sycamore

This tutorial is meant to show how to create an ETL pipeline with Sycamore to load a Pinecone vector database. It walks through an intermediate ETL flow: partitioning, extraction, cleaning, chunking, embedding, and loading. 

To start our Sycamore pipeline, we initialize the Sycamore context.

In [None]:
import sycamore
context = sycamore.init()

## Reading data

Next, let's get some data to process. We will use a dataset of 73 NTSB airline report files in PDF format from a public bucket in Amazon S3. To read these PDF files, we can use the `read.binary` method.

Sycamore reads these documents into a [DocSet](https://sycamore.readthedocs.io/en/stable/sycamore/get_started/concepts.html#docsets), which is a collection of documents and their associated elements. DocSets maintain this lineage as you break documents apart, and allow you to do transfrom the entire dataset versus needing to iterate document by document.

In [None]:
paths = ["s3://aryn-public/ntsb/"]

initial_docset = context.read.binary(paths = paths, binary_format = "pdf")
initial_docset

Hold on, hold on, did that do anything? Well, not really. Sycamore uses "lazy exectution," meaning that it won't actually execute the processing steps in a pipeline until you execute a method that mandates it. This enables Sycamore to create efficient pipeline execution plans.

In order to make Sycamore actually read the files, we need to use a DocSet method that forces execution. For example, `show`:

In [None]:
initial_docset.show()

`show` gets the first 10 (configurable by parameter) documents from the docset and prints them. So we can see 10 document objects, of type PDF, with some metadata in the `properties` object, and bytes in the `binary_representation` key. 

## Partitioning data

We can now continue with our ETL pipeline. The next step is to partition the DocSet, which chunks each document into smaller elements using labeled bounding boxes. Each element could be labeled as a title, table, image, text, and more. This also extracts the actual information from the element (e.g. the information from the table), or does OCR. We use the [Aryn Partitioning Service (APS)](https://sycamore.readthedocs.io/en/stable/aryn_cloud/aryn_partitioning_service.html) to do this, and you can [get a free API key here](https://www.aryn.ai/get-started).

In this example, we've set APS to extract table structures using a table structure recognition model, and to use OCR to get text from the document (but not to extract image contents). Using OCR takes a bit more time, but it can yield higher quality text extraction in some cases.

In [None]:
from sycamore.transforms.partition import ArynPartitioner
# Set your Aryn API key. See https://sycamore.readthedocs.io/en/stable/aryn_cloud/accessing_the_partitioning_service.html#using-sycamore-s-partition-transform

partitioned_docset = (
        initial_docset.partition(partitioner=ArynPartitioner(extract_images=False,  extract_table_structure=True, use_ocr=True))
        .materialize(path="./pc-tutorial/partitioned", source_mode=sycamore.materialize_config.MaterializeSourceMode.IF_PRESENT)
)
partitioned_docset.execute()

We make a remote call to APS for each document. APS will take a few minutes to partition these documents, as there are a few hundred pages across the PDF doucments. 

Notice that we include a `materialize` operation and force execution with `execute`. `materialize` writes the DocSet out to disk, and it is configured to be used as a checkpoint in the pipeline. Subsequent operations will read from this materialized DocSet. In this notebook, we will use commands like `show_pages` to visualize the DocSet for educational purposes (a production pipeline wouldn't do this), and these commands force recomputation of the DocSet. If present, Sycamore will use the materialized DocSet instead of recomputing it (and avoiding running the APS step again).

We can visualize the pages of the PDF using the `show_pages` function, so you can see how APS chunked each page into elements.

Note that this requires the `poppler` library to be installed on your system. You can install it on macOS with `brew install poppler`, or in Linux with `apt-get install poppler-utils`.

In [None]:
from sycamore.utils.pdf_utils import show_pages

show_pages(partitioned_docset)

## Cleaning data

We often find that there's a little excess whitespace in the textual representation of documents, which can be bad for our subsequent LLM transforms and embeddings. So, we apply a regex on the text representation of each element to coalesce it with the `regex_replace` transform.

In [None]:
from sycamore.transforms.regex_replace import COALESCE_WHITESPACE

regex_docset = partitioned_docset.regex_replace(COALESCE_WHITESPACE)
print(COALESCE_WHITESPACE)

## Metadata extraction

For many use cases, extracting additional metadata from each document (and storing it as document metadata/properties) is important. To do this, we can apply the `extract_properties` transform, which sends the documents to an LLM to extract properties determined by the schema we provide in the prompt. (The LLM could also create the schema with the `extract_batch_schema` transform) 

We create the schema using the `with_property` transform and attach it to each document as a property. Then, we `extract_properties` using OpenAI's GPT. You'll need to set your OpenAI key in your in your env or in the OpenAI constructor. The `num_of_elements` parameter determines how many elements from each document will be sent to the LLM for extraction.

In [None]:
from sycamore.transforms.extract_schema import OpenAIPropertyExtractor
from sycamore.llms import OpenAI, OpenAIModels

llm = OpenAI(OpenAIModels.GPT_4O.value)

enriched_docset = (regex_docset
    .with_property('_schema_class', lambda d: 'FlightAccidentReport')
    .with_property('_schema', lambda d: {
            'type': 'object',
            'properties': {'accidentNumber': {'type': 'string'},
                           'dateAndTime': {'type': 'string'},
                           'location': {'type': 'string'},
                           'aircraft': {'type': 'string'},
                           'aircraftDamage': {'type': 'string'},
                           'injuries': {'type': 'string'},
                           'definingEvent': {'type': 'string'}},
            'required': ['accidentNumber',
                         'dateAndTime',
                         'location',
                         'aircraft',
                         'aircraftDamage',
                         'injuries',
                         'definingEvent']})
    .extract_properties(property_extractor=OpenAIPropertyExtractor(llm=llm, num_of_elements=35,))
)

enriched_docset.show(limit=1)
#show the additional properties extracted and included as metadata

## Chunking

Chunking is the process of combining (or splitting) the elements of your documents into larger (or smaller) elements (or referred to as chunks). This is an important step before creating embeddings, which are generated for each chunk.

Sycamore can implement a few different chunking strategies. In this example, we'll use the `MarkedMerger` which relies on "marks" placed on the elements to decide what elements to merge into larger chunks, and what elements to drop completely. We'll use the `mark_bbox_preset` transform to add these marks to the elements in our DocSet. We'll also need to incorporate some information about the embedding model we're planning on using in order to limit context lengths appropriately. 

After merging, we'll also split any elements that are too big using `split_elements`.

In [None]:
from sycamore.transforms.merge_elements import MarkedMerger
from sycamore.functions.tokenizer import OpenAITokenizer

embedding_model = "text-embedding-3-small"
embedding_dim = 1536
max_tokens = 8192
tokenizer = OpenAITokenizer(embedding_model)

chunked_docset = (enriched_docset
     .mark_bbox_preset(tokenizer=tokenizer, token_limit=max_tokens)
     .merge(merger=MarkedMerger())
     .split_elements(tokenizer=tokenizer, max_tokens=max_tokens)
)

## Data formatting

Sycamore enables you to apply user-defined functions over every document using the `map` and `map_batches` transforms. As an example, we might want to parse the date/time strings extracted earlier into proper date information, and we can use this code to do it:

In [None]:
from sycamore.data.document import Document
from dateutil import parser
def convert_timestamp(doc: Document) -> Document:
    try:
        if "dateAndTime" not in doc.properties['entity'] and "dateTime" not in doc.properties['entity']:
            return doc
        raw_date: str = doc.properties['entity'].get('dateAndTime') or doc.properties['entity'].get('dateTime')
        raw_date = raw_date.replace("Local", "")
        parsed_date = parser.parse(raw_date, fuzzy=True)
        extracted_date = parsed_date.date()
        doc.properties['entity']['day'] = extracted_date.day
        doc.properties['entity']['month'] = extracted_date.month
        doc.properties['entity']['year'] = extracted_date.year
        if parsed_date.utcoffset():
            doc.properties['entity']['dateTime'] = parsed_date.isoformat()
        else:
            doc.properties['entity']['dateTime'] = parsed_date.isoformat() + "Z"
    except:
        pass
    return doc

formatted_docset = chunked_docset.map(convert_timestamp)

Let's pause and look at what we've done and what's yet to do. So far, we've read a set of PDF documents from S3, partitioned them with the Aryn Partitioning Service, extracted some metadata from them, chunked them into different size elements, and performed some data cleaning.

What's left is to embed our documents and write them to Pinecone. First, we'll copy the properties from each document to each element of that document using the `spread_properties` transform. Next, we will promote these elements to top-level document objects using the `explode` transform. We do this because Sycamore's `embed` transform only operates on documents, not elements.

In [None]:
exploded_docset = formatted_docset.spread_properties(["path", "entity"]).explode()

Now we'll embed each document with the `embed` transform, and generate term frequency tables for sparse vector search with the `term_frequency` transform. Pinecone requires this latter table if you want to do hybrid search (you do not need it if you are just running vector search).

In [None]:
from sycamore.transforms.embed import OpenAIEmbedder

embedded_docset = (exploded_docset
    .embed(OpenAIEmbedder(model_name=embedding_model))
    .term_frequency(tokenizer=tokenizer, with_token_ids=True)
)

Finally, we're ready to write our DocSet to Pinecone using Sycamore's Pinecone connector. Because of lazy execution, this will execute the code from all of the cells since `materialize`. If you haven't set your Pinecone API key in your env, you will need to specify it in the options in the `write` transform. Also, if your Pinecone database isn't in `us-east-1`, update the region as well.

In [None]:
import pinecone
import time

start = time.time()
embedded_docset.write.pinecone(
    index_name="aryn-etl-tutorial",
    index_spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1"),
    dimensions=embedding_dim,
    distance_metric="dotproduct",
    namespace="ntsbdocs",
)

## Conclusion and full pipeline 

Congratulations! You just executed an intermediate ETL pipeline with Sycamore and loaded a Pinecone vector database. You partitioned, materialized, extracted, cleaned, chunked, embedded, and loaded your data. 

For reference, without the tutorial parts, here's what the pipeline looks like:

In [None]:
context.read.binary(paths = paths, binary_format = "pdf", filesystem = s3_fs)\
    .partition(partitioner=ArynPartitioner(extract_images=False,  extract_table_structure=True))\
    .regex_replace(COALESCE_WHITESPACE)\
    .with_property('_schema_class', lambda d: 'FlightAccidentReport')\
    .with_property('_schema', lambda d: {
            'type': 'object',
            'properties': {'accidentNumber': {'type': 'string'},
                           'dateAndTime': {'type': 'string'},
                           'location': {'type': 'string'},
                           'aircraft': {'type': 'string'},
                           'aircraftDamage': {'type': 'string'},
                           'injuries': {'type': 'string'},
                           'definingEvent': {'type': 'string'}},
            'required': ['accidentNumber',
                         'dateAndTime',
                         'location',
                         'aircraft',
                         'aircraftDamage',
                         'injuries',
                         'definingEvent']})\
    .extract_properties(property_extractor=OpenAIPropertyExtractor(llm=llm, num_of_elements=35,))\
    .mark_bbox_preset(tokenizer=tokenizer, token_limit=max_tokens)\
    .merge(merger=MarkedMerger())\
    .split_elements(tokenizer=tokenizer, max_tokens=max_tokens)\
    .map(convert_timestamp)\
    .spread_properties(["path", "entity"])\
    .explode()\
    .embed(embedder=embedder)\
    .term_frequency(tokenizer=tokenizer, with_token_ids=True)\
    .write.pinecone(
        index_name="ntsb-live",
        index_spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1"),
        dimensions=embedding_dim,
        distance_metric="dotproduct",
        namespace="aryntutorial",
    )