In [35]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from collections import deque
import logging
import os

In [36]:
class Crawler:
    def __init__(self, max_depth=2, max_pages=1):
        self.max_depth = max_depth
        self.max_pages = max_pages
        self.seen = set()
        self.pages = []
        self.queue = deque()

    def add_to_queue(self, url, depth):
        self.queue.append({'url': url, 'depth': depth})

    def should_continue_crawling(self):
        return self.queue and len(self.pages) < self.max_pages

    def is_too_deep(self, depth):
        return depth > self.max_depth

    def is_already_seen(self, url):
        return url in self.seen

    def fetch_page(self, url):
        try:
            response = requests.get(url)
            response.raise_for_status()
            return response.text
        except requests.RequestException as e:
            print(f"Failed to fetch {url}: {e}")
            return ""

    def parse_html(self, html):
        soup = BeautifulSoup(html, 'html.parser')
        for a in soup.find_all('a'):
            a.attrs = {}
        text = soup.get_text(separator=' ', strip=True)
        return text

    def extract_urls(self, html, base_url):
        urls = list()
        soup = BeautifulSoup(html, 'html.parser')
        relative_urls = [a.get('href') for a in soup.find_all('a') if a.get('href')]
        for relative_url in relative_urls:
            if "zh" in relative_url.split("/"):
                continue
            if "#" in relative_url:
                continue
            if  relative_url.startswith("//nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/"):
                urls.append(urljoin("https:", relative_url))
                # logging.error(f" -- {relative_url}, {urls[-1]}")
            elif relative_url.startswith("https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/"):
                urls.append(relative_url)
                # logging.error(f" -- {relative_url}, {urls[-1]}")
            elif "https://nightlies.apache.org/flink/flink-docs-release-1.16/" not in relative_url:
                #urls.append(urljoin("https://nightlies.apache.org/flink/flink-docs-release-1.16/", relative_url))
                # logging.error(f"{base_url} -- {relative_url}")
                pass
        return urls

    def crawl(self, start_url):
        self.add_to_queue(start_url, 0)

        while self.should_continue_crawling():
            current = self.queue.popleft()
            url, depth = current['url'], current['depth']

            if self.is_too_deep(depth) or self.is_already_seen(url):
                continue

            self.seen.add(url)
            html = self.fetch_page(url)
            # print(f"Fetched {url}")
            if html:
                print(f"Parsing {url}")
                self.pages.append({'url': url, 'content': self.parse_html(html)})
                new_urls = self.extract_urls(html, url)
                for new_url in new_urls:
                    self.add_to_queue(new_url, depth + 1)

        return self.pages




In [37]:
crawler = Crawler(max_depth=5, max_pages=2500)
new_pages = crawler.crawl('https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/')
for page in new_pages:
    print(page['url'])

Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpoints/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpoints_vs_savepoints/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/state_backends/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/large_state_tuning/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/task_failure_recovery/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/metrics/
Parsing https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/
Parsing https://nightlies.apache.

In [38]:
len(new_pages)

19

In [39]:
len(new_pages[0]['content'].split(" "))

772

In [40]:
from langchain_text_splitters import CharacterTextSplitter, TokenTextSplitter, RecursiveCharacterTextSplitter


In [41]:
from langchain_core.documents import Document
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)

In [42]:
documents = list()
for page in new_pages:
    chunks = splitter.split_text(page['content'])
    for chunk in chunks:
        documents.append(Document(
            page_content=chunk,
            metadata={'url': page['url'], 'type':"document"}
        ))


In [43]:
len(documents)

514

In [44]:
documents[0].to_json()

{'lc': 1,
 'type': 'constructor',
 'id': ['langchain', 'schema', 'document', 'Document'],
 'kwargs': {'page_content': '<div style="font-weight:450;margin-bottom:0.5em"><i class="fa fa-cogs title maindish" aria-hidden="true"></i>\xa0\xa0Operations</div> | Apache Flink v1.16.2 Try Flink â\x96¾ First steps Fraud Detection with the DataStream API Real Time Reporting with the Table API Flink Operations Playground Learn Flink â\x96¾ Overview Intro to the DataStream API Data Pipelines & ETL Streaming Analytics Event-driven Applications Fault Tolerance Concepts â\x96¾ Overview Stateful Stream Processing Timely Stream Processing Flink Architecture Glossary Application Development â\x96¾ Project Configuration â\x96¾ Overview Using Maven Using Gradle Connectors and Formats Test Dependencies Advanced Configuration DataStream API â\x96¾ Overview Execution Mode (Batch/Streaming) Event Time â\x96¾ Generating Watermarks Builtin Watermark Generators State & Fault Tolerance â\x96¾ Working with State The

In [45]:
MILVUS_URL = os.environ['MILVUS_URL']
MILVUS_KEY = os.environ['MILVUS_URL']
DIMS = 1024
EMBEDDING_MODEL = "embed-english-v3.0"
COHERE_KEY=os.environ['COHERE_KEY']

In [46]:
# from langchain_cohere.embeddings import CohereEmbeddings

In [47]:
# import cohere

# co = cohere.Client(COHERE_KEY)

# response = co.tokenize(text=new_pages[0]['content'], model=EMBEDDING_MODEL)  # optional
# print(response)

In [48]:
splitter.split_text(new_pages[0]['content'])

['<div style="font-weight:450;margin-bottom:0.5em"><i class="fa fa-cogs title maindish" aria-hidden="true"></i>\xa0\xa0Operations</div> | Apache Flink v1.16.2 Try Flink â\x96¾ First steps Fraud Detection with the DataStream API Real Time Reporting with the Table API Flink Operations Playground Learn Flink â\x96¾ Overview Intro to the DataStream API Data Pipelines & ETL Streaming Analytics Event-driven Applications Fault Tolerance Concepts â\x96¾ Overview Stateful Stream Processing Timely Stream Processing Flink Architecture Glossary Application Development â\x96¾ Project Configuration â\x96¾ Overview Using Maven Using Gradle Connectors and Formats Test Dependencies Advanced Configuration DataStream API â\x96¾ Overview Execution Mode (Batch/Streaming) Event Time â\x96¾ Generating Watermarks Builtin Watermark Generators State & Fault Tolerance â\x96¾ Working with State The Broadcast State Pattern Checkpointing Queryable State State Backends Data Types & Serialization â\x96¾ Overview Stat

In [49]:
len(new_pages[0]['content'].split(" "))

772

In [50]:
# embedding_fn = CohereEmbeddings(model=EMBEDDING_MODEL, cohere_api_key=COHERE_KEY)

In [51]:
from langchain_huggingface import HuggingFaceEmbeddings

model_name = "Alibaba-NLP/gte-large-en-v1.5"
model_kwargs = {'device': 'mps', "trust_remote_code": True}
encode_kwargs = {'normalize_embeddings': False}
hf = HuggingFaceEmbeddings(
    model_name=model_name,
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs)



In [52]:
from langchain_community.vectorstores.zilliz import Zilliz

zilliz = Zilliz(
    embedding_function = hf,
    collection_name="Flink",
    connection_args={"uri": MILVUS_URL, "token": MILVUS_KEY},
    auto_id=True
)

In [53]:
indexes = list(range(len(documents)))

In [54]:
len(indexes)

514

In [55]:
start = 1

In [56]:
for index, doc in zip(indexes[start:], documents[start:]):
    print(index)
    zilliz.add_documents([doc], batch_size=1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277


In [57]:
retriever = zilliz.as_retriever(search_kwargs={"k": 10})

In [58]:
retriever.invoke("WHat is flink")

[Document(page_content='is that Flink might immediately build an incremental checkpoint on top of the restored one. Therefore,', metadata={'url': 'https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/', 'type': 'document', 'pk': 450143955092273016}),
 Document(page_content='memory usage of RocksDB instance(s), Flink leverages a shared cache and write buffer manager among all instances in a single slot.', metadata={'url': 'https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/state_backends/', 'type': 'document', 'pk': 450143955092273094}),
 Document(page_content='This change does not affect the runtime implementation or characteristics of Flink’s state backend or checkpointing process; it is simply to communicate intent better.', metadata={'url': 'https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/state_backends/', 'type': 'document', 'pk': 450143955092273130}),
 Document(page_content='}\n          }\n        },

In [59]:
import pathlib
import textwrap

import google.generativeai as genai

from IPython.display import display
from IPython.display import Markdown


def to_markdown(text):
  text = text.replace('•', '  *')
  return Markdown(textwrap.indent(text, '> ', predicate=lambda _: True))

In [60]:
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank

In [61]:
compressor = CohereRerank(top_n=5, cohere_api_key=COHERE_KEY)

In [62]:
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

In [63]:
compression_retriever.invoke("What is flink")

[Document(page_content='memory usage of RocksDB instance(s), Flink leverages a shared cache and write buffer manager among all instances in a single slot.', metadata={'url': 'https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/state_backends/', 'type': 'document', 'pk': 450143955092273094, 'relevance_score': 0.94966936}),
 Document(page_content='parallelism when re-scaling the program (via a savepoint). Flink’s internal bookkeeping tracks parallel state in the granularity of max-parallelism-many key groups .\nFlink’s design strives to make it efficient to have a very high value for the maximum parallelism, even if\nexecuting the program with a low parallelism. Compression # Flink offers optional compression (default: off) for all checkpoints and savepoints. Currently, compression always uses\nthe snappy compression algorithm (version 1.1.10.x) but we are planning to support\ncustom compression algorithms in the future. Compression works on the granularity of key-g

In [64]:
from langchain import hub
prompt = hub.pull("rlm/rag-prompt")


In [65]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

from langchain_cohere import ChatCohere

In [66]:
llm = ChatCohere(model="command-r-plus", temperature=0.0, cohere_api_key=COHERE_KEY)

In [67]:
def format_docs(docs: list[Document]):
    
    text = ""

    for doc in docs:
        xml_tag_start = f"<{doc.metadata['url'].lower()}>"
        xml_tag_end = f"</{doc.metadata['url'].lower()}>"
        content = doc.page_content
        text += f"{xml_tag_start}\n{content}\n{xml_tag_end}\n\n"

    return text

In [68]:
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)



In [69]:
response = rag_chain.invoke("List down all the commands used in the flink documenatation along with explanation of the command")

In [70]:
print(response)

I apologize, but I cannot find the necessary information in the provided context to answer your question.


In [71]:
GEMINI_KEY = "AIzaSyBVI2jAHepUzLwWoK6qwXCOYxD0NFzZIns"

In [72]:
from langchain_google_genai import ChatGoogleGenerativeAI

In [73]:
from langchain_google_genai import ChatGoogleGenerativeAI

In [74]:
google_llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", google_api_key=GEMINI_KEY, temperature=0.0)

In [75]:
from langchain_core.prompts import PromptTemplate
example_prompt = PromptTemplate.from_template("""You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.Keep the answer concise and to the point. Write down the citation at the end of the answer that you have taken reference from. The citation names are in form of urls, that are provided in the xml tags.
Follow below mention format for citation
Citation:
        (1) Source URL 1
        (2) Source URL 2
Question: {question} \nContext: {context} \nAnswer""")

In [76]:
google_rag = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | example_prompt
    | google_llm
    | StrOutputParser()
)

In [77]:
google_rag.get_graph().print_ascii()

              +---------------------------------+           
              | Parallel<context,question>Input |           
              +---------------------------------+           
                    ****                ****                
                 ***                        ***             
               **                              ***          
+----------------------+                          **        
| VectorStoreRetriever |                           *        
+----------------------+                           *        
            *                                      *        
            *                                      *        
            *                                      *        
+---------------------+                     +-------------+ 
| Lambda(format_docs) |                     | Passthrough | 
+---------------------+                     +-------------+ 
                    ****                ****                
                        

In [78]:
response = google_rag.invoke("List down all the commands used in the flink documenatation along with explanation of the command.")

In [79]:
to_markdown(response)

> This document does not contain the answer to this question. It provides information about the structure of JSON objects related to Flink's REST API, not commands and their explanations. Citation: (1) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ (2) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ (3) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ (4) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ (5) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/state_backends/ (6) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ (7) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ (8) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ (9) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/ 


> Here are some of the commands mentioned in the provided Flink documentation:
> 
> * `tar -xzf flink-*.tgz`: This command is used to extract the contents of a tar.gz file named "flink-*.tgz". 
> 
> * `cd flink-* && ls -l`: This command navigates to the directory named "flink-*" and lists the contents of the directory in long format. 
> 
> * `./bin/start-cluster.sh`: This command starts a local Flink cluster in the background. 
> 
> * `ps aux | grep flink`: This command is used to check the status of the Flink cluster. It lists all the processes running on the system and filters the output to show only the processes related to Flink. 
> 
> * `./bin/stop-cluster.sh`: This command stops the local Flink cluster and all its running components. 
> 
> * `./bin/flink run examples/streaming/WordCount.jar`: This command submits a Flink job to the running cluster. In this case, it deploys an example word count job located in the "examples/streaming/" directory. 
> 
> * `tail log/flink-*-taskexecutor-*.out`: This command displays the last few lines of the log file for the Flink task executor. This is useful for verifying the output of the Flink job.
> 
> * `docker-compose build`: This command builds the Docker image for the Flink playground.
> 
> * `mkdir -p /tmp/flink-checkpoints-directory`: This command creates a directory for Flink checkpoints.
> 
> * `mkdir -p /tmp/flink-savepoints-directory`: This command creates a directory for Flink savepoints.
> 
> * `docker-compose up -d`: This command starts the Flink playground in detached mode.
> 
> * `docker-compose ps`: This command lists the running Docker containers for the Flink playground.
> 
> * `docker-compose run --no-deps client flink list`: This command lists the running Flink jobs.
> 
> * `curl localhost:8081/jobs`: This command retrieves information about running jobs from the Flink REST API.
> 
> Citation:
>  (1) <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/try-flink/local_installation/>
>  (2) <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/try-flink/flink-operations-playground/> 


In [50]:
prompt

ChatPromptTemplate(input_variables=['context', 'question'], metadata={'lc_hub_owner': 'rlm', 'lc_hub_repo': 'rag-prompt', 'lc_hub_commit_hash': '50442af133e61576e74536c6556cefe1fac147cad032f4377b60c436e6cdcb6e'}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['context', 'question'], template="You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.\nQuestion: {question} \nContext: {context} \nAnswer:"))])