In [None]:
# First, let's explore the data. By now you should have downloaded some pdf files, so let's look at one of them.
# This also tests that you have poppler installed.
from error_messages import *
from IPython.display import display
from pathlib import Path
import os

repo_root = Path.cwd()
pdf_dir = repo_root / "files" / "actual_files"
one_pdf = next(pdf_dir.iterdir())
one_pdf

In [None]:
from pdf2image import convert_from_path

try:
    ims = convert_from_path(one_pdf)
    display(ims[0])
except Exception as e:
    poppler_failed_error()
    print(e)

## Partitioning

We've found that in order to process documents well, it is important to break the document up into cohesive _elements_. There are a number of strategies for doing this, but we've found success with visual models. For this workshop we'll use Aryn DocParse for this first step. Get started with aryn_sdk. This will also make sure your credentials are set correctly.

In [None]:
from aryn_sdk.partition import partition_file

try:
    data = partition_file(one_pdf)
    elements = data['elements']
except Exception as e:
    aryn_no_api_key_error()
    

In [None]:
# We can visualize the elements by drawing the bounding boxes onto the pdf. aryn_sdk has a function for that.
from aryn_sdk.partition import draw_with_boxes

images = draw_with_boxes(one_pdf, data)
images[0]

Each element contains a bunch of information. Core information includes `type`, `bbox`, and `text_representation`. Additional information is stored in a `properties` dict, such as the page number the element is on.

Let's have a quick quiz to introduce elements. I've created a bunch of functions that operate on the list of elements returned by the partitioner. Your job is to implement them.

In [None]:
def number_of_page_headers(elts: list[dict]) -> int:
    """Return the number of elements of type 'Page-header'"""
    return 64
    raise NotImplementedError("Finish this yourself")
    
def number_of_elements_after_page_4(elts: list[dict]) -> int:
    """Return the number of elements that fall after page 4. Note that page numbers are 1-indexed."""
    return 285
    raise NotImplementedError("Finish this yourself")

def number_of_jassy_mentions(elts: list[dict]) -> int:
    """Return the number of elements that contain the string 'Jassy' (Andy Jassy is the CEO of AWS).
    Note: some elements do not have a 'text_representation' key."""
    return 11
    raise NotImplementedError("Finish this yourself")

def number_of_elements_that_cover_a_third_of_the_page(elts: list[dict]) -> int:
    """For this you'll need the bbox property. bboxes are represented as 4 floats, [x1, y1, x2, y2]. Each 
    coordinate ranges from 0 to 1, representing the fraction of the page (left-to-right for x, top-to-bottom for y) 
    where the point lies. So [0, 0, 1, 1] is the whole page, and [0, 0.5, 0.5, 1] is the lower-left quadrant.
    
    Return the number of elements that cover at least half of the page. An element covers half the page if its 
    area is greater than 1/3"""

    raise NotImplementedError("Finish this yourself")
    return len([e for e in elts if (e['bbox'][2] - e['bbox'][0]) * (e['bbox'][3] - e['bbox'][1]) > 1/3])


assert number_of_page_headers(elements) == 64, f"Got {number_of_page_headers(elements)}. Make sure your capitalization is correct."

assert number_of_elements_after_page_4(elements) == 285, f"Got {number_of_elements_after_page_4(elements)}. If you got 295, 'after page 4' does not include page 4, and page numbers are 1-indexed."

assert number_of_jassy_mentions(elements) == 11, f"Got {number_of_jassy_mentions(elements)}. A 'jassy mention' is defined as an element whose text contains the string 'Jassy'."

assert number_of_elements_that_cover_a_third_of_the_page(elements) == 4, f"Got {number_of_elements_that_cover_a_third_of_the_page(elements)}"

## Sycamore basics

By now you have a basic sense of the data model - a Document is made up of Elements which represent logical chunks of the Document, and contain additional metadata about themselves.
The next step is to scale this past one document to many, and this is where Sycamore comes in. Sycamore adds a data structure called a DocSet, which is a set of Documents.
Each Document in the DocSet contains the list of Elements that it comprises, and a bunch of metadata as well (for instance, the name of the file the document came from).

Now -- and this is the tricky bit -- a DocSet is not really a container type like you might expect in most systems. It is better to think about a DocSet as a program. Because it is
intended to operate at very large scale, we can't necessarily hold all of the documents in memory. This is a tool for writing to databases, and so if you could just have everything
in memory at once then you wouldn't need a database in the first place! This is where the program view of a DocSet comes in. Instead of being a materialized set of Document objects
(potentially hundreds of thousands of them, each rather large), a DocSet contains a list of processing instructions to generate the Documents in a pipelined, streaming fashion. This
means you don't have things you might expect like random access.

So how do you build this program? This is covered more in-depth in the next segment of the workshop, but the 10,000 mile view is that DocSet has a bunch of methods, that you can call
to add steps to the program. DocSet programs are technically immutable, so you write them like so
```python
docset = docset.partition(...).map(...)
docset = docset.map_elements(...)
```
instead of something like
```python
# This doesn't do what you think it does
docset.partition(...)
docset.map(...)
docset.map_elements(...)
```

In order to execute a DocSet, there are a couple of methods that do that. 

- `docset.execute()` executes the docset and does nothing with the resulting Documents. Most production pipelines use this to run.
- `docset.take_all()` (and its friend `docset.take(n)`) executes the docset and returns the Documents in a plain python list. This is useful for debugging and development, when datasets are still small.
- `docset.show()` executes and prints the first couple Documents - good for development
- `docset.write.<some_target>()` executes the docset and writes the documents out to some target - could be a database like opensearch, or just the filesystem. Most of these writers have an `execute` flag that determines whether to execute the write (and return nothing) or just return a DocSet with the write in the plan.

Each docset is bound to a Sycamore Context, which is the execution engine that actually runs the program. We've implemented 2 execution modes, `LOCAL` and `RAY`. `RAY` mode executes the DocSet on a [ray](https://www.ray.io/) cluster,
creating one locally if it does not find an existing ray cluster. This mode scales well, running transforms on Documents in parallel across processes (and nodes if you've set it up), but it can be tricky to debug - distributed 
processing jobs can be like that. `LOCAL` mode runs in single-threaded python in the process and is generally better for debugging, but you lose the distributed/parallel aspect. For the beginning of the workshop, we will run in 
`LOCAL` mode, and then transition to `RAY` when we have ironed out the DocSet plan.

In [None]:
import sycamore
from sycamore import ExecMode

context = sycamore.init(exec_mode = ExecMode.LOCAL)
assert context.exec_mode == ExecMode.LOCAL, "Change the exec mode in the init to LOCAL to use local mode"

To create the DocSet from nothing, we need to tell sycamore how to read in the initial data.

In [None]:
pdf_docset = context.read.binary(paths=str(one_pdf), binary_format="pdf")

# Let's see what that gave us
pdf_docset.show()

Our docset has a single Document in it, with a 'properties' dict containing some metadata, an 'elements' list containing an empty list of elements, a doc_id, lineage_id, type, and binary_representation, which contains the binary of the original PDF.
To get the elements as before, we'll want to run the `partition` transform.

In [None]:
from sycamore.transforms.partition import ArynPartitioner

# If you did not see the error message about API keys, ignore this comment.
# You might need to add aryn_api_key="<YOUR KEY>" if the environment didn't pick it up correctly. 
partitioned_docset = pdf_docset.partition(ArynPartitioner())

# We'll limit the number of elements to show because otherwise this produces an obnoxiously large output cell
partitioned_docset.show(num_elements=5)

We can explore the Document and Elements in much the same way as before with the raw outputs. I won't make you write a bunch of code this time though. Now that we're in sycamore, the Document and Elements are actual classes with attributes, which makes them a little easier to work with and also explains why I've been capitalizing them the whole time.

In [None]:
docs = partitioned_docset.take_all()
my_doc = docs[0]

print(f"doc id: {my_doc.doc_id}")
print(f"properties: {my_doc.properties}")
print(f"number of elements: {len(my_doc.elements)}")
print(f"number of elements that take up more than 1/3 of the page: {len([e for e in my_doc.elements if e.bbox.area > 1/3])}")
print(f"final element text: {my_doc.elements[-1].text_representation}")

Wait a second.

Running `show` and running `take_all` ran the whole program all over again! This could get really cumbersome to work with. I have a solution for you: `materialize`.

Doing everything all over again is by design. As explained previously, we can't always hold everything in memory, so we have to re-execute stuff when we want to reference it again.
However, we can effectively use disk as a cache with the `materialize` method. When sycamore compiles a DocSet into an execution plan, it starts from the end of the DocSet and works
toward the beginning. When it sees a `materialize`, it looks at the location where the `materialize` thinks its cache lives, and if it finds data, it finishes compiling and reads the
data in, essentially truncating the program to only what comes after it. If it doesn't find data, it continues compiling, and adds a step to write to the location. 

TLDR; `docset.materialize(path="filesystem/directory", source_mode=MaterializeSourceMode.USE_STORED)` makes the docset up until the materialize execute only once and then cache the data 
at that point in the execution for any future runs.

In [None]:
from sycamore.materialize import MaterializeSourceMode

materialize_dir = repo_root / "materialize"

materialized_ds = partitioned_docset.materialize(path = materialize_dir / "onedoc-partitioned", source_mode = MaterializeSourceMode.USE_STORED)

materialized_ds.execute()
print("Finished executing the first time")
print("=" * 80)
# Note that the second time this is fast
materialized_ds.execute()
print("Finished executing the second time")

Sometimes, you'll want to redo a step that's been materialized. The simplest option is to remove the directory with all the cached data, e.g. `rm -rd materialize/onedoc-partitioned`

Now that we have the docset materialized, let's use a utility function to visualize the bounding boxes like we did with `aryn_sdk`

In [None]:
from sycamore.utils.pdf_utils import show_pages

show_pages(materialized_ds)

I don't know who Lou and Travis are but they're getting some good exposure here. Anyway.

## Property Extraction

One of sycamore's biggest benefits is its ability to interact with LLMs in this kind of data-flow-y way. LLMs are good at understanding unstructured data, so for processing unstructured
documents, they're a very useful tool. They make it easy to extract common metadata properties from documents, and with sycamore we can very easily apply this to all documents in a docset.

First, we'll want to clean up some of the elements that are not going to be useful. For example, we have all these page headers and footers and captions. Let's get rid of them using `docset.filter_elements`

In [None]:
from sycamore.data import Element

def filter_out_annoying_elements(elt: Element) -> bool:
    return elt.type not in ("Page-header", "Page-footer", "Caption")

# docset.filter_elements takes a predicate function that maps Elements to bools. 
# For each element in a document, keep the element only if predicate(element) is True.
filtered_ds = materialized_ds.filter_elements(filter_out_annoying_elements)

In [None]:
from sycamore.llms.openai import OpenAI, OpenAIModels
from sycamore.transforms.extract_schema import LLMPropertyExtractor

# You might need to explicitly set an api key here if it's not picked up from the environment variables
# Add api_key = "<key>"
gpt4o = OpenAI(OpenAIModels.GPT_4O)

schema = {
    "type": "object",
    "properties": {
        "quarter": {
            "type": "string",
            "description": "Quarter of the earnings call, it should be in the format of Q1, Q2, Q3, Q4",
        },
        "date":{"type": "string", "description": "The date of the earnings call"}
    },
}

# Quiz: As is, this property extraction will never run, even if I do something like `filtered_ds.execute()`. Why?
filtered_ds.extract_properties(LLMPropertyExtractor(llm=gpt4o, schema=schema, schema_name="earnings_call"))

Now see if you can add a `company_name` and `company_ticker` property to this schema and extract properties into a docset named `extracted_ds`:

In [None]:
schema = {
    "type": "object",
    "properties": {
        "quarter": {
            "type": "string",
            "description": "Quarter of the earnings call, it should be in the format of Q1, Q2, Q3, Q4",
        },
        "date":{"type": "string", "description": "The date of the earnings call"},

... # Fill in the rest!

In [None]:
# Test that the schema is right. We'll reference these properties later.
for doc in extracted_ds.take(1):
    assert 'earnings_call' in doc.properties
    ec = doc.properties['earnings_call']
    assert 'date' in ec
    assert 'quarter' in ec
    assert 'company_name' in ec
    assert 'company_ticker' in ec

Great! Now is there anything you can do to prevent yourself from making this LLM call over and over?

## Chunking

Another operation you'll probably want to run a lot when doing unstructured data preparation is chunking. That is, we've broken the text up into a whole lot of tiny little bits, but when we actually 
embed it and index it for search we'll likely want slightly larger chunks. Sycamore implements a number of chunking strategies (documentation [here](https://sycamore.readthedocs.io/en/stable/sycamore/APIs/low_level_transforms/merge_elements.html)). 
For this workshop we will use the `MarkedMerger` as it is the most customizable.

Our data is an earnings call, so there is a very natural way to chunk that - for each speaker 'block' we should get a chunk. In our partitioning we have split the text into paragraphs, but we'd like 
to squish all those paragraphs together, breaking the blocks wherever there's a new speaker. With a little bit of effort we can detect the lines that introduce speakers with regexes - one for external
speakers and one for internal speakers, as the formatting is very consistent (this applies across all the documents in the dataset, don't worry):

```python
external_re = '([^ ]*[^\S\n\t]){1,4}--[^\S\n\t].*--' # A name (1-4 words long) followed by -- followed by anything followed by --
internal_re = '([^ ]*[^\S\n\t]){1,4}--.*'            # A name (1-4 words long) followed by -- followed by anything
```

The `MarkedMerger` is set up perfectly to work with this. It will step through the elements, merging them together one by one, unless it sees one of two 'marks' in the data:

- on a "_drop" mark it drops the element and continues merging
- on a "_break" mark it finalizes the merged element and uses this one to start merging up a new element

In [None]:
import re
from sycamore.transforms.merge_elements import MarkedMerger

def markSpeakers(elt: Element) -> Element:
    if not elt.text_representation:
        return elt

    external_speaker = re.match('([^ ]*[^\S\n\t]){1,4}--[^\S\n\t].*--', elt.text_representation)
    internal_speaker = re.match('([^ ]*[^\S\n\t]){1,4}--.*', elt.text_representation)
    if elt.text_representation.strip() == 'Operator':
        # The operator is also a speaker! In this case, we should set
        # the 'speaker' property to True and the 'speaker_role' and 
        # 'speaker_name' properties to 'Operator'. We should also tell
        # the MarkedMerger to break.
        raise NotImplementedError("I thought operators were an algebra thing!")
    elif external_speaker:
        location = elt.text_representation.find('--')
        location2 = location + elt.text_representation[location+2:].find('--')
        elt.properties['speaker_name'] = elt.text_representation[:location].strip()
        elt.properties['speaker_external_org'] = elt.text_representation[location+2:location2+1].strip()
        elt.properties['speaker_role'] = elt.text_representation[location2+4:].strip()
        elt.properties['speaker'] = True
        elt.data["_break"] = True
    elif internal_speaker:
        location = elt.text_representation.find('--')
        elt.properties['speaker_name'] = elt.text_representation[:location].strip()
        elt.properties['speaker_role'] = elt.text_representation[location+2:].strip()
        elt.properties['speaker'] = True
        elt.data["_break"] = True
    return elt

speakers_marked_ds = filtered_docset.map_elements(markSpeakers)
merged_ds = speakers_merged_ds.merge_elements(MarkedMerger())

## Embedding

Embedding data with sycamore is fairly simple, so I'm going to give you all the information you need to do it and let you write it out.
There is a method on DocSets called `embed()`. It takes an `Embedder` as its parameter. We'll use the `OpenAIEmbedder`, which you can
import from `sycamore.transforms.embed`. It takes a `model_name` parameter which we'll set to `'text-embedding-3-small'`.

In [None]:
# Your code here

embedded_ds = ...

## Ingestion

We'll be writing our data to Aryn (because what kind of workshop would this be if we didn't stand behind our own data warehouse). Sycamore can
also write to a number of other systems, such as OpenSearch, ElasticSearch, Weaviate, etc. Each one has some small idiosyncracies in what's 
needed to write successfully, and Aryn is no different. I'm going to fix a lot of these issues when I get home but for now just bear with me.

The unit of storage in Aryn equivalent to an index in OpenSearch or a table in a SQL DB is a 'DocSet.' While a Sycamore DocSet is usually best 
understood as a program, an Aryn DocSet is actually a container. We can create one using aryn_sdk, and then write our (sycamore) docset to it.

However, first we need to move around some properties. This is one of those kinda ugly things I intend to fix. This is a literary technique 
known as lampshading. That said, there is some learning to be had here.

Probably the most useful docset method is `map` (and it's corollary `map_elements`). `map` accepts a function with a `Document` input and `Document` 
output, and calls that function on every Document in the DocSet. (`map_elements` does the same but for all the Elements of every Document). Basically
every transform is a map. 

Here, I'll create a higher-order function that captures a list of properties, and returns a `map` function (`Document`->`Document`) that moves all of
those properties into a nested dictionary `properties['entity']`. In fact I've been a little sneaky - since python is dynamically typed and `Document`s 
and `Element`s both have a `properties` dict, I can use this in a `map_elements` as well... just as long as I don't run mypy on it!

I'll also add in a `spread_properties` transform, which copies properties from every Document to each of its Elements, so that all the elements have
the `earnings_call` and `path` metadata associated with the document.

In [None]:
# This transform moves a bunch of properties to a nested dict properties.entity. 
# Useful for making sure some downstream processing works correctly
def prop_to_entity(props: list[str]):
    def prop_to_entity_inner(doc_or_elt):
        if "entity" not in doc_or_elt.properties:
            doc_or_elt.properties["entity"] = {}
        if not isinstance(doc_or_elt.properties["entity"], dict):
            doc_or_elt.properties["entity"] = {"original_entity": doc_or_elt.properties.pop("entity")}
        for p in props:
            if p in doc_or_elt.properties:
                doc_or_elt.properties["entity"][p] = doc_or_elt.properties.pop(p)
        return doc_or_elt
    return prop_to_entity_inner

# Also here's a nice way of writing chained pipelines
rejiggered_ds = (
    embedded_ds
    .map(prop_to_entity(['earnings_call']))
    .spread_properties(['path', 'entity'])
    .map_elements(prop_to_entity(['speaker', 'speaker_name', 'speaker_role', 'speaker_external_org']))
)

Now let's create our docset target (give it a name) and write to it.

In [None]:
from aryn_sdk.client.client import Client

# You may need to specify aryn_api_key="<YOUR KEY>" here
aryn_client = Client()

docset_name = 
aryn_docset = aryn_client.create_docset(name = docset_name)

print(aryn_docset.value.docset_id)

In [None]:
# You may need to specify aryn_api_key="<YOUR KEY>" here too.
rejiggered_ds.write.aryn(docset_id=docset.value.docset_id, autoschema=True)