In [1]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pandas as pd
from PyPDF2 import PdfFileReader
from io import BytesIO
from snowflake.snowpark import types as T
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType
from snowflake.snowpark import Session
from snowflake.snowpark.types import StringType, StructField, StructType

In [2]:
session = Session.builder.getOrCreate()

 * To change owner, run `chown $USER "/Users/cromano/.snowflake/config.toml"`.
 * To restrict permissions, run `chmod 0600 "/Users/cromano/.snowflake/config.toml"`.

  warn(f"Bad owner or permissions on {str(filep)}{chmod_message}")


In [4]:
session.use_database('cromano')
session.use_schema('demo')

In [3]:
session.sql("create stage if not exists RAG DIRECTORY = ( ENABLE = TRUE)").collect()

[Row(status='RAG already exists, statement succeeded.')]

In [4]:
session.file.put("documents/*.pdf", "@RAG", auto_compress=False, overwrite=True)

[PutResult(source='SnowflakeEarningsCall1123.pdf', target='SnowflakeEarningsCall1123.pdf', source_size=313773, target_size=313776, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [5]:
def readpdf(file_path):
    whole_text = ""
    with SnowflakeFile.open(file_path, "rb") as file:
        f = BytesIO(file.readall())
        pdf_reader = PdfFileReader(f)
        whole_text = ""
        for page in pdf_reader.pages:
            whole_text += page.extract_text()
    return whole_text

In [6]:
# Register the UDF.
session.udf.register(
    func=readpdf,
    return_type=StringType(),
    input_types=[StringType()],
    is_permanent=True,
    name="SNOWPARK_PDF",
    replace=True,
    packages=["snowflake-snowpark-python", "pypdf2", "PyCryptodome"],
    stage_location="RAG",
)

<snowflake.snowpark.udf.UserDefinedFunction at 0x7fa579c11ba0>

In [7]:
session.sql(
    """
CREATE OR REPLACE TABLE RAW_TEXT AS
SELECT
    relative_path
    , file_url
    , snowpark_pdf(build_scoped_file_url(@RAG, relative_path)) as raw_text
from directory(@RAG)
            """
).collect()

[Row(status='Table RAW_TEXT successfully created.')]

In [8]:
session.table("RAW_TEXT").show()

-------------------------------------------------------------------------------------------------------------------------------------------
|"RELATIVE_PATH"                |"FILE_URL"                                          |"RAW_TEXT"                                          |
-------------------------------------------------------------------------------------------------------------------------------------------
|SnowflakeEarningsCall1123.pdf  |https://xwb80589.snowflakecomputing.com/api/fil...  | Corrected Transcript                               |
|                               |                                                    |                                                    |
|                               |                                                    |                                                    |
|                               |                                                    |1-877-FACTSET   www.callstreet.com  Total Pages...  |
|                   

In [9]:
class text_chunker:

    def process(self, text):
        text_raw = []
        text_raw.append(text)

        text_splitter = RecursiveCharacterTextSplitter(
            separators=[
                "\n"
            ],  # Define an appropriate separator. New line is good typically!
            chunk_size=500,  # Adjust this as you see fit
            chunk_overlap=100,  # This let's text have some form of overlap. Useful for keeping chunks contextual
            length_function=len,
            add_start_index=True,  # Optional but useful if you'd like to feed the chunk before/after
        )

        chunks = text_splitter.create_documents(text_raw)
        df = pd.DataFrame(chunks, columns=["chunks", "meta"])

        yield from df.itertuples(index=False, name=None)

In [10]:
# Register the UDTF

schema = StructType(
    [
        StructField("chunk", StringType()),
        StructField("meta", StringType()),
    ]
)

session.udtf.register(
    handler=text_chunker,
    output_schema=schema,
    input_types=[StringType()],
    is_permanent=True,
    name="CHUNK_TEXT",
    replace=True,
    packages=["pandas", "langchain"],
    stage_location="RAG",
)

<snowflake.snowpark.udtf.UserDefinedTableFunction at 0x7fa579bb60b0>

In [11]:
session.sql(
    """
CREATE OR REPLACE TABLE CHUNK_TEXT AS
SELECT
        relative_path,
        func.*
    FROM raw_text AS raw,
         TABLE(chunk_text(raw_text)) as func;
            """
).collect()

[Row(status='Table CHUNK_TEXT successfully created.')]

Move on to the .sql file or you can write these in a Snowflake Notebook

In [6]:
session.file.put("images/*", "@rag_images")

[PutResult(source='Chunk.text.png', target='Chunk.text.png.gz', source_size=227474, target_size=202880, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message=''),
 PutResult(source='Embed_text.png', target='Embed_text.png.gz', source_size=243717, target_size=220240, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message=''),
 PutResult(source='Extract_Text.png', target='Extract_Text.png.gz', source_size=127844, target_size=104336, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message=''),
 PutResult(source='RAG_Arch.png', target='RAG_Arch.png.gz', source_size=83036, target_size=77968, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message=''),
 PutResult(source='embedding_functions.png', target='embedding_functions.png.gz', source_size=88266, target_size=81152, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message=''),
 PutResult(source='rag_query.png', t