# Experiment Architecture
## RAG (Retrieval Augmented Generation)
#### Differences
- Delta is not used for data operations in this experiment
- The input data are PDF not XML files
#### Problem Statement
- We want to get Dolly knowledgeable about Amadeus' Data I/0 framework
#### Flow
<img src="https://raw.githubusercontent.com/databricks-demos/dbdemos-resources/main/images/product/llm-dolly/llm-dolly-full.png" width="1000" />


# Transforming in PDF format the DataIO official Documentation

In [None]:
from xhtml2pdf import pisa
import requests


def convert_url_to_pdf(url, pdf_path):
    response = requests.get(url)
    if response.status_code != 200:
        print(f"Failed to fetch URL: {url}")
        return False

    html_content = response.text

    with open(pdf_path, "wb") as pdf_file:
        pisa_status = pisa.CreatePDF(html_content, dest=pdf_file)

    return not pisa_status.err

# URL to fetch
urls_to_fetch = [
    "https://amadeusitgroup.github.io/dataio-framework/",
    "https://amadeusitgroup.github.io/dataio-framework/getting-started.html",
    "https://amadeusitgroup.github.io/dataio-framework/main-concepts.html",
    "https://amadeusitgroup.github.io/dataio-framework/writing-processors.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/configuration.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/processors.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/schema-definitions.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/paths.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/date-filters.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/pipes/pipes.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/pipes/storage.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/pipes/elk.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/pipes/kafka.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/distributors/distributors.html",
    "https://amadeusitgroup.github.io/dataio-framework/configuration/distributors/email.html",
    "https://amadeusitgroup.github.io/dataio-framework/advanced/custom-processor-traits.html"
]


for i in range(len(urls_to_fetch)):
  convert_url_to_pdf(urls_to_fetch[i], f"dataio_{i}.pdf")

In [None]:
import PyPDF2
from tqdm import tqdm


bodies = []

for i in range(len(urls_to_fetch)):
  with open(f"/content/dataio_{i}.pdf", "rb") as file:
      reader = PyPDF2.PdfReader(file)
      body = []
      for page in tqdm(reader.pages):
          body.append(page.extract_text())
      bodies.append(body)

# Embeddings
##### Multidimensional Vector Representation of Semantic Meanings
<img src="https://corpling.hypotheses.org/files/2018/04/Screen-Shot-2018-04-25-at-13.21.44.png" width="400" />

In [80]:
from sentence_transformers import SentenceTransformer
import itertools


model_name = "sentence-transformers/all-MiniLM-L12-v2"

model = SentenceTransformer(model_name, device='cuda')

sentences = list(itertools.chain(*bodies))

embeddings = [[float(x) for x in model.encode(s)] for s in sentences]

# Database
##### Creating a Vector Database of Embeddings using the open-source ChromaDB
<img src="https://www.mlq.ai/content/images/2023/08/1_admwyPyR6v_IZI0EYE--eA-1.webp" width="250" />

In [81]:
import chromadb
from chromadb.config import Settings
from chromadb.utils import embedding_functions


chroma_client = chromadb.Client()

default_ef = embedding_functions.DefaultEmbeddingFunction()

collection = chroma_client.get_or_create_collection(name="dataio_official_documentation", embedding_function=default_ef)

collection.add(
    documents=sentences,
    embeddings=embeddings,
    ids=[str(x) for x in range(len(embeddings))]
)

# Fine-Tuned Model
##### Based on the open-source Databricks' Dolly
<img src="https://www.databricks.com/sites/default/files/2023-04/Dolly-logo.png" width="300" />

In [82]:
from transformers import pipeline
import torch
from langchain import PromptTemplate
from langchain.llms import HuggingFacePipeline
from langchain.chains.question_answering import load_qa_chain


def build_qa_chain():

    model_name = "databricks/dolly-v2-3b" # Dolly smallest version (3 billion params)

    instruct_pipeline = pipeline(model=model_name, torch_dtype=torch.bfloat16, trust_remote_code=True,
                                 return_full_text=True, max_new_tokens=4096, top_p=0.95, top_k=50,
                                 device='cuda')

    template = """Below is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' Data I/O framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    {context}

    Question: {question}

    Response:
    """

    prompt = PromptTemplate(input_variables=['context', 'question'], template=template)

    hf_pipe = HuggingFacePipeline(pipeline=instruct_pipeline)

    return load_qa_chain(llm=hf_pipe, chain_type="stuff", prompt=prompt, verbose=True)

In [83]:
# Building the chain will load Dolly and can take several minutes depending on the model size
qa_chain = build_qa_chain()

In [84]:
class Document():
    def __init__(self, content):
        self.page_content = content
        self.metadata = {"metadata": "Data I/O documentation"}

def get_similar_docs(question):
    results = collection.query(
        query_embeddings=[float(x) for x in model.encode(question)],
        n_results=1
    )
    return results["documents"]

def answer_question(question):
    similar_docs = [Document(x) for x in get_similar_docs(question)]
    result = qa_chain({"input_documents": similar_docs, "question": question})
    return result

In [85]:
import os
os.environ['CURL_CA_BUNDLE'] = ''


question = "Can you shortly describe me the Data I/O framework?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['Data I/O\n\x7fHome\n\x7fGetting started\n\x7fMain concepts\n\x7fWriting your processors\nConfiguration\n\x7fProcessors\n\x7fSchema definitions\n\x7fPaths\n\x7fDate filters\nPipes\n\x7fFile system storage\n\x7fElasticsearch\n\x7fKafka\nDistributors\n\x7fEmail\nAdvanced\n\x7fCustom processor traits\nAmadeus IT Group\n\x7fGitHub\nMain Concepts\nTable of contents\nMain Concepts\nBuilding Blocks\n1.Pipeline\n2.Pipes\n3.Distributors\n4.HandlerAccessor\n5.Processors\n2.Lifecycle of a Data I/O application\nThis page provides an

In [104]:
question = "In which programming language is the Data I/O framework written?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['Data I/O\n\x7fHome\n\x7fGetting started\n\x7fMain concepts\n\x7fWriting your processors\nConfiguration\n\x7fProcessors\n\x7fSchema definitions\n\x7fPaths\n\x7fDate filters\nPipes\n\x7fFile system storage\n\x7fElasticsearch\n\x7fKafka\nDistributors\n\x7fEmail\nAdvanced\n\x7fCustom processor traits\nAmadeus IT Group\n\x7fGitHub\nData I/O\nAutomated handling of inputs, outputs and files distribution through a unifed interface.\nGet started View it on GitHub\nWelcome to the Data I/O documentation!\nData I/O is a Spark SQL S




[1m> Finished chain.[0m

[1m> Finished chain.[0m

The Data I/O framework is written in the language of Spark SQL Scala.


In [114]:
question = "Can Data I/O read data from Elasticsearch?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['No streaming input is currently available for Elasticsearch in Data I/O.\nOutput\nType: com.amadeus.dataio.pipes.elk.streaming.ElkOutput\n Name\n Mandatory\n Description\n Example\n Default\n \nDuration\nYes\nSets the trigger for the stream\nquery. Controls the trigger()\nSpark function.\nDuration = "60 seconds"\nTimeout\nYes\nControls the amount of time\nbefore returning from the\nstreaming query, in hours. It can\nbe a String or an Int.\nTimeout = 24\nMode\nYes\nThe Spark Structured Streaming\noutput mode.\nMode = "co




[1m> Finished chain.[0m

[1m> Finished chain.[0m

No


In [113]:
question = "Can Data I/O write data to Elasticsearch?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['Data I/O\n\x7fHome\n\x7fGetting started\n\x7fMain concepts\n\x7fWriting your processors\nConfiguration\n\x7fProcessors\n\x7fSchema definitions\n\x7fPaths\n\x7fDate filters\nPipes\n\x7fFile system storage\n\x7fElasticsearch\n\x7fKafka\nDistributors\n\x7fEmail\nAdvanced\n\x7fCustom processor traits\nAmadeus IT Group\n\x7fGitHub\n1.Configuration\n2.Pipes\n3.Elasticsearch\nElasticsearch\nAllows the connection to Elasticsearch nodes to automatically read and write data to an index in batch and streaming.\nCode repository: ht




[1m> Finished chain.[0m

[1m> Finished chain.[0m

Yes. Data I/O provides the possibility to write data to Elasticsearch.


In [92]:
question = "How to add Amadeus Data I/O Maven dependency in my pom.xml?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['}\nA same schema can be used by several inputs, but only one schema can be defined for a given input.\nRegistering the schema\nThe schemas must be registered via the SchemaRegistry, which centralizes all available schemas for the Data I/O inputs. To do so, call the\nregisterSchema method within the main function of your application, as shown below:\npackage myproject  \nimport com.amadeus.dataio.{Pipeline, SchemaRegistry}\nimport myproject.models.{ComplexQuery, Query}\nobject MyApplicationPipeline extends App {\n   val 

In [111]:
question = "List me the names of the mandatory field needed to send an Email with Data I/O?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['Data I/O\n\x7fHome\n\x7fGetting started\n\x7fMain concepts\n\x7fWriting your processors\nConfiguration\n\x7fProcessors\n\x7fSchema definitions\n\x7fPaths\n\x7fDate filters\nPipes\n\x7fFile system storage\n\x7fElasticsearch\n\x7fKafka\nDistributors\n\x7fEmail\nAdvanced\n\x7fCustom processor traits\nAmadeus IT Group\n\x7fGitHub\n1.Configuration\n2.Distributors\n3.Email\nEmail\nAllows to send files by email.\nCode repository: https://github.com/AmadeusITGroup/dataio-framework/tree/main/src/main/scala/com/amadeus/dataio/dis




[1m> Finished chain.[0m

[1m> Finished chain.[0m

Name
Mandatory
Description
Example
Default
SmtpHost
Yes
The SMTP host to use to send emails.
SmtpUser
Yes
The SMTP username to use to authenticate
SmtpPassword
Yes
The SMTP password to use to authenticate.
From
Yes
Specifies the sender's email address.
me@mycompany.com
To
Yes
Specifies the recipient's email address.
["john@smith.com"]
Cc
No
Allows you to include additional recipients in the "Cc" (carbon copy) field of the email.
["john@smith.com"]
Bcc
No
Enables the inclusion of recipients in the "Bcc" (blind carbon copy) field of the email.
["john@smith.com"]
Subject
Yes
Sets the subject line of the email.
Body
No
Provides the body content of the email.
Compressed
No
A boolean flag indicating whether the email attachments are compressed. true
false


In [109]:
question = "List me the steps to create my own Amadeus Data I/O processor?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['Data I/O\n\x7fHome\n\x7fGetting started\n\x7fMain concepts\n\x7fWriting your processors\nConfiguration\n\x7fProcessors\n\x7fSchema definitions\n\x7fPaths\n\x7fDate filters\nPipes\n\x7fFile system storage\n\x7fElasticsearch\n\x7fKafka\nDistributors\n\x7fEmail\nAdvanced\n\x7fCustom processor traits\nAmadeus IT Group\n\x7fGitHub\n1.Configuration\n2.Processors\nConfiguring Your Processors\nThe Data I/O framework allows you to configure and define your data processors in the application configuration file. This provides flex




[1m> Finished chain.[0m

[1m> Finished chain.[0m

Step 1: Gather all the needed concepts and definitions

Step 2: Create a processor configuration file

Step 3: Implement your processor with your own custom code

Step 4: Write a proper doc in the official documentation for your new processor


In [110]:
question = "What are Pipes in Data I/O?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['Data I/O\n\x7fHome\n\x7fGetting started\n\x7fMain concepts\n\x7fWriting your processors\nConfiguration\n\x7fProcessors\n\x7fSchema definitions\n\x7fPaths\n\x7fDate filters\nPipes\n\x7fFile system storage\n\x7fElasticsearch\n\x7fKafka\nDistributors\n\x7fEmail\nAdvanced\n\x7fCustom processor traits\nAmadeus IT Group\n\x7fGitHub\n1.Configuration\n2.Pipes\nPipes\nPipes in the Data I/O framework are responsible for handling the reading and writing of data within your pipeline. They provide a convenient and configurable way\n




[1m> Finished chain.[0m

[1m> Finished chain.[0m

Pipes in the Data I/O framework are responsible for handling the reading and writing of data within your pipeline. They provide a convenient and configurable way to interact with different data sources and destinations.


In [112]:
question = "What are Distributors in Data I/O?"

answer = answer_question(question)

print(answer["output_text"])



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3mBelow is an instruction that describes a task. Write a response that appropriately completes the request.

    Instruction:
    You are an expert about the Amadeus' DataIO framework.
    You use a simple language to explain concepts.
    You reply using only textual descriptions and code examples taken from Data I/O documentation.

    ['Data I/O\n\x7fHome\n\x7fGetting started\n\x7fMain concepts\n\x7fWriting your processors\nConfiguration\n\x7fProcessors\n\x7fSchema definitions\n\x7fPaths\n\x7fDate filters\nPipes\n\x7fFile system storage\n\x7fElasticsearch\n\x7fKafka\nDistributors\n\x7fEmail\nAdvanced\n\x7fCustom processor traits\nAmadeus IT Group\n\x7fGitHub\n1.Configuration\n2.Distributors\nDistributors\nData I/O introduces a feature known as Distributors. Distributors play a crucial role in the framework by facilitating the efficient transfer of file




[1m> Finished chain.[0m

[1m> Finished chain.[0m

Distributors are a key component of Data I/O framework that facilitates the efficient transfer of files to their designated destinations.
