In [None]:
import boto3
import openai
import os
import json
import psutil
import ray
import shutil

import pandas as pd

from bs4 import BeautifulSoup
from ebooklib import epub
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

2024-10-30 14:34:52,961	INFO worker.py:1601 -- Connecting to existing Ray cluster at address: 10.0.62.176:6379...


RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.

## Basic LLM example

In [None]:
# Run this interactively in your terminal to generate a config
rayllm gen-config

In [None]:
# Deploy the serve app to production with a given service name.
# Reference the serve file created in step 1
!anyscale service deploy -f serve_mistral_7b.yaml

In [None]:
def prompt_llm(service_url: str, prompt: str, model: str ="mistralai/Mistral-7B-Instruct-v0.1", temperature: float = 0, **kwargs):

    # Ensure URL has a trailing backslash
    if not service_url.endswith("/"):
        service_url += "/"
    
    if "/routes" in service_url:
        raise ValueError("service_url must end with '.com'")

    # Initialize a client to perform API requests
    client = openai.OpenAI(
        base_url=ANYSCALE_SERVICE_BASE_URL + "v1",
        api_key=ANYSCALE_API_KEY,
    )
    
    # Call the chat completions endpoint
    chat_completion = client.chat.completions.create(
        model=model,
        messages=[
            # Prime the system with a system message - a common best practice
            {"role": "system", "content": "You are a helpful assistant."},
            # Send the user message with the proper "user" role and "content"
            {"role": "user", "content": prompt},
        ],
        temperature=temperature,
        **kwargs,
    )

    return chat_completion

In [None]:
service_url = "https://wheel-of-time-rag-3136s.cld-8lqvbtr41isy21zu.s.anyscaleuserdata.com/"
prompt = "Tell me something about Wheel of Time"
response = prompt_llm("service_url", prompt)
print(response.choices[0].message.content)

## Create RAG

### Download epubs from S3

In [2]:
import boto3
import os

def download_s3_bucket(bucket_name, local_dir='/data'):
    # Create an S3 client
    s3 = boto3.client('s3')
    
    # Ensure local directory exists
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)
    
    # List all objects in the S3 bucket
    for obj in s3.list_objects_v2(Bucket=bucket_name)['Contents']:
        s3_key = obj['Key']
        local_path = os.path.join(local_dir, s3_key)
        
        # Create local directory structure if needed
        if not os.path.exists(os.path.dirname(local_path)):
            os.makedirs(os.path.dirname(local_path))
        
        # Download the file
        s3.download_file(bucket_name, s3_key, local_path)
        print(f"Downloaded {s3_key} to {local_path}")

# Usage
BUCKET_NAME = 'rag-wheel-of-time'
LOCAL_DATA_DIR = Path("./data/")

download_s3_bucket(BUCKET_NAME, LOCAL_DATA_DIR)

Downloaded library/00 - New Spring.epub to data/library/00 - New Spring.epub
Downloaded library/01 - The Eye of the World.epub to data/library/01 - The Eye of the World.epub
Downloaded library/02 - The Great Hunt.epub to data/library/02 - The Great Hunt.epub
Downloaded library/03 - The Dragon Reborn.epub to data/library/03 - The Dragon Reborn.epub
Downloaded library/04 - The Shadow Rising.epub to data/library/04 - The Shadow Rising.epub
Downloaded library/05 - The Fires of Heaven.epub to data/library/05 - The Fires of Heaven.epub
Downloaded library/06 - Lord of Chaos.epub to data/library/06 - Lord of Chaos.epub
Downloaded library/07 - A Crown of Swords.epub to data/library/07 - A Crown of Swords.epub
Downloaded library/08 - The Path of Daggers.epub to data/library/08 - The Path of Daggers.epub
Downloaded library/09 - Winter's Heart.epub to data/library/09 - Winter's Heart.epub
Downloaded library/10 - Crossroads of Twilight.epub to data/library/10 - Crossroads of Twilight.epub
Downloade

### Parse epubs to text

In [5]:
def parse_epub_content(epub_file: Path) -> list:
    """Extracts and returns text content from an EPUB file."""
    book = epub.read_epub(epub_file)
    content = []

    for item in book.get_items():
        # Check if the media type is HTML/XHTML
        if item.media_type == 'application/xhtml+xml':
            soup = BeautifulSoup(item.get_body_content(), 'html.parser')
            # Extract text from each HTML section
            content.append(soup.get_text())

    # Join all sections to return as a single string
    return '\n'.join(content)

def parse_epub_directory(directory_path: Path) -> dict:
    """Parses all EPUB files in a directory and returns a dictionary with file names and content."""
    epub_texts = {}
    
    for filename in os.listdir(directory_path):
        if filename.endswith('.epub'):
            epub_path = os.path.join(directory_path, filename)
            text = parse_epub_content(epub_path)
            epub_texts[filename] = text
            
    return epub_texts

In [None]:
# book00 = parse_epub_content('data/library/00 - New Spring.epub')
book01 = parse_epub_content('data/library/01 - The Eye of the World.epub')
# book02 = parse_epub_content('data/library/02 - The Great Hunt.epub')
# book03 = parse_epub_content('data/library/03 - The Dragon Reborn.epub')
# book04 = parse_epub_content('data/library/04 - The Shadow Rising.epub')
# book05 = parse_epub_content('data/library/05 - The Fires of Heaven.epub')
# book06 = parse_epub_content('data/library/06 - Lord of Chaos.epub')
# book07 = parse_epub_content('data/library/07 - A Crown of Swords.epub')
# book08 = parse_epub_content('data/library/08 - The Path of Daggers.epub')
# book09 = parse_epub_content('data/library/09 - Winter\'s Heart.epub')
# book10 = parse_epub_content('data/library/10 - Crossroads of Twilight.epub')
# book11 = parse_epub_content('data/library/11 - Knife of Dreams.epub')
# book12 = parse_epub_content('data/library/12 - The Gathering Storm.epub')
# book13 = parse_epub_content('data/library/13 - Towers of Midnight.epub')
# book14 = parse_epub_content('data/library/14 - A Memory of Light.epub')

print(book01)

In [6]:
directory_path = LOCAL_DATA_DIR / 'library'
epub_data = parse_epub_directory(directory_path)

  for root_file in tree.findall('//xmlns:rootfile[@media-type]', namespaces={'xmlns': NAMESPACES['CONTAINERNS']}):


### Split books into chunks

In [8]:
chunk_size = 128  #  Chunk size is usually specified in tokens
words_to_tokens = 1.2  # Heuristic for converting tokens to words
chunk_size_in_words = int(chunk_size // words_to_tokens)

splitter = RecursiveCharacterTextSplitter(
    chunk_size=chunk_size_in_words,
    length_function=lambda x: len(x.split()),
    chunk_overlap=0,
)

chunks = []

for title, text in epub_data.items():
    for chunk in splitter.split_text(text):
        chunks.append(
            {
                "text": chunk,
                "book_title": title,
            }
        )

In [10]:
# Print sample
print(chunks[0:10])

[{'text': 'Praise for THE WHEEL OF TIME®\n“Unlike some of the authors of mega-sagas, Jordan chooses his words with care, creating people and events that have earned him an enormous readership. For sheer imagination and storytelling skill . . . The Wheel of Time now rivals Tolkien’s The Lord of the Rings.”\n—Publishers Weekly (starred review)\n“Jordan succeeds in carrying forward his stunning world-building in this detailed story of a struggle between good and evil. The story continues with its myriad threads and subplots, carrying the reader inexorably toward an unpredictable conclusion.”\n—SF Site', 'book_title': '08 - The Path of Daggers.epub'}, {'text': '“The battle scenes have the breathless urgency of firsthand experience, and the . . . evil laced into the forces of good, the dangers latent in any promised salvation, the sense of the unavoidable onslaught of unpredictable events bear the marks of American national experience during the last three de cades.”\n—The New York Times', 

### Generate embeddings from chunks

In [19]:
model = SentenceTransformer('thenlper/gte-large', device='cpu')

In [21]:
# library_path = "." / LOCAL_DATA_DIR / "library"
# file_paths = [str(f"/{file}") for file in library_path.rglob('*') if file.is_file()]

# print(file_paths)

books = ray.data.read_binary_files(
    paths = "s3://rag-wheel-of-time/",
    )
books.count()

2024-10-30 14:24:08,852	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-30_13-15-29_786163_2344/logs/ray-data
2024-10-30 14:24:08,852	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ExpandPaths] -> TaskPoolMapOperator[ReadFiles]


Running 0: 0.00 row [00:00, ? row/s]

- ExpandPaths 1: 0.00 row [00:00, ? row/s]

- ReadFiles 2: 0.00 row [00:00, ? row/s]

15

In [27]:
def parse_epub_content(epub_file) -> str:
    """Extracts and returns text content from an EPUB file."""
    book = epub.read_epub(epub_file)
    content = []

    for item in book.get_items():
        # Check if the media type is HTML/XHTML
        if item.media_type == 'application/xhtml+xml':
            soup = BeautifulSoup(item.get_body_content(), 'html.parser')
            # Extract text from each HTML section
            content.append(soup.get_text())

    # Join all sections to return as a single string
    return '\n'.join(content)

books_parsed = books.flat_map(parse_epub_content)
books_parsed.take(2)

2024-10-30 14:37:18,593	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-30_13-15-29_786163_2344/logs/ray-data
2024-10-30 14:37:18,593	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ExpandPaths] -> TaskPoolMapOperator[ReadFiles] -> TaskPoolMapOperator[FlatMap(parse_epub_content)] -> LimitOperator[limit=2]


Running 0: 0.00 row [00:00, ? row/s]

- ExpandPaths 1: 0.00 row [00:00, ? row/s]

- ReadFiles 2: 0.00 row [00:00, ? row/s]

- FlatMap(parse_epub_content) 3: 0.00 row [00:00, ? row/s]

- limit=2 4: 0.00 row [00:00, ? row/s]

2024-10-30 14:37:18,690	ERROR streaming_executor_state.py:469 -- An exception was raised from a task of operator "ExpandPaths". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
2024-10-30 14:37:18,699	ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2024-10-30 14:37:18,700	ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/_internal/plan.py", line 433, in execute_to_iterator
    bundle_iter = itertools.chain([next(gen)], gen)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/data/_intern

RayTaskError(RaySystemError): [36mray::ExpandPaths()[39m (pid=7795, ip=10.0.13.245)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RaySystemError: System error: No module named 'ebooklib'
traceback: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/cloudpickle/cloudpickle.py", line 457, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'ebooklib'