# Les Misér-AI-bles: Batching Through the Barricades
## Ray Profiling

One of the concerns with parallel execution is that we can quickly exhaust cluster memory with large tables. In order to evaluate this, we are going to multiply our LesMis table by a factor of 10. We also choose the small possible cluster in terms of memory / core - using drivers and workers that only have 2 GB per core. If we distribute our LesMis table to each worker directly, it will quickly exhaust the memory. We setup a simple task in Ray - to count 

We are going to hijack the Spark cluster immediately here to ensure we are in full Ray mode.

In [0]:
import ray
import time

In [0]:
# call `ray.util.spark.shutdown_ray_cluster()` before initiating a new Ray cluster on spark
# ray.util.spark.shutdown_ray_cluster()

In [0]:
from ray.util.spark import setup_ray_cluster
import ray

setup_ray_cluster(
  min_worker_nodes=3,
  max_worker_nodes=3,
  num_cpus_head_node=0,
  num_gpus_worker_node=0,
  num_cpus_worker_node=4,
  num_gpus_head_node=0
)
ray.init(ignore_reinit_error=True)

The Les Mis dataframe is tiny (13 MB in memory). So we want to make it much, much larger. We multiply the dataframe by 30 or 300 to make it larger for testing and performance loading.

In [0]:
import pandas as pd
les_mis = pd.read_parquet('./les_mis_w_prompt.parquet')
print(les_mis.shape[0])
les_mis.memory_usage(deep=True).sum() / (1024 * 1024)

In [0]:
mid_mis = pd.concat([les_mis] * 30, ignore_index=True)
print(mid_mis.shape[0])
mid_mis.memory_usage(deep=True).sum() / (1024 * 1024)

In [0]:
more_mis = pd.concat([les_mis] * 300, ignore_index=True)
print(more_mis.shape[0])
more_mis.memory_usage(deep=True).sum() / (1024 * 1024)

## Sentiment Analysis Many Tasks

In order to test pure performance or many tasks, we use a small function that does sentiment analysis from the NLTK library. This is a very fast function that we want to run on every row of the dataframe, similar to an external model call. In a single thread, it takes around 5 seconds for the original `Les Mis` dataset and 146 seconds for the `Mid Mis` dataset. We don't even try the `More Mis`.

In [0]:
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')
sia = SentimentIntensityAnalyzer()
sia.polarity_scores(les_mis.iloc[0]['page_content'])

In [0]:
import timeit
def analyze_sentiment(row):
    return sia.polarity_scores(row['page_content'])

In [0]:
# les_mis_exec_time
timeit.timeit(lambda: les_mis.apply(analyze_sentiment, axis=1), number=1)

In [0]:
# mid_mis_exec_time
timeit.timeit(lambda: mid_mis.apply(analyze_sentiment, axis=1), number=1)

This is where we create single references to the data using `ray.put()` This is essential for memory management, as if we pass the dataframes directly in the `ray.remote()` call, it can cause major issues on the cluster. Even with the `More Mis` dataset, we had no issues with the worker nodes due to passing small chunks of the large dataset.

In [0]:
les_mis_ref = ray.put(les_mis)
mid_mis_ref = ray.put(mid_mis)
more_mis_ref = ray.put(more_mis)

### Approach 1 - Simple Map
This simple map is the well, the simplest way to create a Ray job, and it works surprisingly well. The Ray Futures mechanisms deals with the data shuffling and avoids overloading the memory. It also is very fast and universally applicable.

In [0]:
@ray.remote(num_cpus=1)
def map_analyze_sentiment(data_dict):
    data_dict['sentiment'] = analyze_sentiment(data_dict)
    return data_dict

In [0]:
import time
start_time = time.time()
futures = [
  map_analyze_sentiment.remote(data) 
  for data in more_mis.to_dict(orient='records')
  ]
results = ray.get(futures)
print(f"Processing time: {time.time() - start_time:.2f} seconds")

## Approach 2 - put() with Reference
This code is meant to demonstrate the hazard of refering to an external object within a remote. If we use a `ray.put()` reference, it works well (although not as fast as the simple map above), but if we directly use `remote(i, df)`, then we start to see a lot of memory problems. This is generally a poor way to reference Ray objects, but gets the point across.

In [0]:
@ray.remote(num_cpus=1)
def analyze_sentiment_row_idx(row_idx, df_ref):
  row = df_ref.iloc[row_idx]
  return analyze_sentiment(row)

In [0]:
ray.get(analyze_sentiment_row_idx.remote(0, more_mis_ref))

In [0]:
start_time = time.time()
row_indices = list(range(len(more_mis)))
results_entire_df = ray.get(
  [analyze_sentiment_row_idx.remote(i, more_mis_ref) 
   for i in row_indices
   ])
print(f"Processing time: {time.time() - start_time:.2f} seconds")

## Approach 3 - Ray Dataset
This approach uses a Ray Dataset to manage mapping. There are a couple tricks here to make this work. First, we need to set the .map() concurrency to number of our cores with `num_cpus` = 1. You also need to repartition the dataset to make this work. I recommend very small partitions, for example size/(num_workers x 10).

In [0]:
def analyze_sentiment(row):
    return sia.polarity_scores(row['page_content'])

In [0]:
partitions = int(more_mis.shape[0]/(12*10))
ray_dataset = ray.data.from_pandas(more_mis).repartition(partitions)
ray_dataset

In [0]:
start_time = time.time()
results = ray_dataset.map(
  analyze_sentiment, 
  concurrency=12, 
  num_cpus=1,
  ).take_all()
print(f"Processing time: {time.time() - start_time:.2f} seconds")

## LLM Calls
Now that we've shown some of the options for controlling Ray memory and compute, let's make some LLM calls.

In [0]:
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

workspace_client = WorkspaceClient()
workspace_url = workspace_client.config.host

# Check if running in Databricks
import os

if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
else:
    token = workspace_client.config.token

In [0]:
import time
import pandas as pd
from openai import OpenAI

#@ray.remote(num_cpus=1, num_gpus=1)
def extract_data_from_passage_ray(data_dict):
    try:
        start_time = time.time()
    
        client = OpenAI(
            api_key=token,
            base_url=f"{workspace_url}/serving-endpoints",
        )
        
        response = client.chat.completions.create(
            model='azure-gpt-4o-mini',
            messages=[
                {"role": "user", "content": data_dict['extraction_prompt']}
            ],
        )
        
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        data_dict['extracted_data'] = response.choices[0].message.content.replace("json\n","").replace("","") + f"time: {elapsed_time:.2f}"
    except: 
        data_dict['extracted_data'] = "ERROR"

    return data_dict

In [0]:
futures = [
  extract_data_from_passage_ray.remote(data) 
  for data in les_mis.to_dict(orient='records')
  ] 
results = ray.get(futures)

In [0]:
result_df = pd.DataFrame(results)
display(result_df)

In [0]:
# Save as files since we've hijacked our spark clusters
result_df.to_csv("/Volumes/shm/default/llm_profiling/les_mis_df-azure-o1-ray.csv")