# ➡️ Pre-Reqs

### Mount GDrive


In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


Verify that the credential files are accessible. Make sure you've created a shortcut to the '_credentials' folder in your GDrive.

In [None]:
import os

# 📋 Getting Setup


### Install OpenAI

In [None]:
!pip install openai --quiet

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Install & Import Featureform

In [None]:
!pip -q install featureform

import featureform as ff

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m26.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.5/11.5 MB[0m [31m118.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.5/101.5 kB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m110.5/110.5 kB[0m [31m14.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m98.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m103.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m99.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━



### Register host

In [None]:
client = ff.Client("hackathon.featureform.com")

# Register Providers & Sources

## Register Infrastructure Provider

In [None]:
variant_number = 900
variant = f"ucb_miki_{variant_number}"

databricks = ff.DatabricksCredentials(
    host="https://adb-6520988078656996.16.azuredatabricks.net",
    token="dapi7d392de090887f5c4f35e9f65c4c081a-3",
    cluster_id="0609-193914-29dlog2f",
)

azure_blob = ff.register_blob_store(
    name=f"blob_{variant}",
    description="An azure blob store provider to store offline and inference data",
    container_name="ucbhackathoncontainer",
    root_path="ucbhackathoncontainer",
    account_name="ucbhackathonstoreaccount",
    account_key="xuZJB7cyQNxPqI6p4GBxS5IFd50FoIRy7qTlQyeOXr96cbBiTxSR9sLWGZfFrKDTvaq8SECs6Nd7+ASt7Wr70g==",
)

redis = ff.register_redis(
    name=f"redis_{variant}",
    host="10.0.209.196",
    port=6379,
)

spark = ff.register_spark(
    name=f"spark_{variant}",
    executor=databricks,
    filestore=azure_blob,
)

In [None]:
client.apply()

Applying Run: practical_bardeen
Creating user default_user
Creating provider blob_ucb_miki_900
Creating provider redis_ucb_miki_900
Creating provider spark_ucb_miki_900


## Register Data Sources (Files)

In [None]:
# TODO: Register raw messages as data source
messages = spark.register_file(
    name="messages",
    variant=f"{variant}",
    description="This dataset contains all the messages from the MLOps Community.",
    file_path="abfss://ucbhackathoncontainer@ucbhackathonstoreaccount.dfs.core.windows.net/messages.csv",
)

In [None]:
client.apply()

Applying Run: practical_bardeen
Creating source messages ucb_miki_900


# Define batch transformations

###  Transformation 1: Group Messages By Thread

Concatenate messages into parent Slack threads

In [None]:
# Input: Flat, raw data source file
# Output: Dataframe with rows grouped by thread_id & aggregated text column

@spark.df_transformation(name="combine_messages_thread",
                         variant=f"{variant}",
                         inputs=[("messages", f"{variant}")],
                         description="Group all messages by parent threads into a single column")
def combine_messages_thread(df):
    from pyspark.sql.functions import collect_list, concat_ws
    # Filter messages before grouping
    df = df.filter((df.Channel_Name == "mlops-questions-answered") | (df.Channel_Name == "discussions"))
    df = df.filter(df.Thread_Timstamp.isNotNull())


    # Concatenate columns before grouping
    df = df.withColumn("concat_columns", concat_ws(' ', df['Message_Timestamp'], df['User_ID'], df['__Text']))

    # Use collect_list and groupBy
    df_grouped = df.groupBy("Channel_Name", "Thread_Timstamp").agg(concat_ws("", collect_list("concat_columns")).alias("Combined_Thread"))

    return df_grouped

In [None]:
client.apply()

Applying Run: practical_bardeen
Creating source combine_messages_thread ucb_miki_900


## Transformation 2: Create Embeddings

Of concatenated Slack thread

In [None]:
@spark.df_transformation(inputs=[("combine_messages_thread", f"{variant}")], variant=f"{variant}")
def embed_text(df):
  from pyspark.sql.functions import udf, col
  from pyspark.sql.types import StringType, FloatType, ArrayType

  def get_embed(txt):
    import openai
    openai.organization = "org-dspDi7B6opG0YQzN9mtOXBZE"
    openai.api_key = "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"
    resp = openai.Embedding.create(model="text-embedding-ada-002",input=txt[:500])
    return resp["data"][0]["embedding"]

  embedding_udf = udf(get_embed, ArrayType(FloatType()))

  df = df.limit(100)
  df = df.withColumn("embedding", embedding_udf(col("Combined_Thread").cast(StringType())))

  return df

In [None]:
client.apply()

Applying Run: practical_bardeen
Creating source embed_text ucb_miki_900


# Register entity: QAThread

In [None]:
@ff.entity
class QAThread:
    raw_message = ff.Feature(embed_text["Thread_Timstamp", "Combined_Thread"], variant=f"{variant}", type=ff.String, inference_store=redis)
    embedding = ff.Embedding(embed_text["Thread_Timstamp", "embedding"], variant=f"{variant}", dims=1536, vector_db=redis)

In [None]:
client.apply()

Applying Run: practical_bardeen
Creating entity qathread
Creating feature embedding ucb_miki_900
Creating feature raw_message ucb_miki_900


# ⚠️ [TODO] Define On-demand Functions ⚠️

## ⚠️ [TODO] On-demand Function 1: Embed Question
Create embedding on the fly

In [None]:
# TODO: Make the function below an on-demand function
# Input: Question
# Output: Question_Embedding
# The questions embedding thats been generated will be used to search
# then retrieve the top related threads


@ff.ondemand_transformation(name="get_question_embedding", variant=f"{variant}")
def get_question_embedding(serving_client, params, entities):
    import openai

    question = params[0]

    openai.organization = "org-dspDi7B6opG0YQzN9mtOXBZE"
    openai.api_key = "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"
    resp = openai.Embedding.create(model="text-embedding-ada-002", input=question)
    return resp["data"][0]["embedding"]

In [None]:
# TODO: Logic that needs to be converted into on-demand syntax above

def get_question_embedding(question):
    import openai
    openai.organization = "org-dspDi7B6opG0YQzN9mtOXBZE"
    openai.api_key = "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"
    resp = openai.Embedding.create(model="text-embedding-ada-002",input=question)
    return resp["data"][0]["embedding"]

# Example
print(get_question_embedding("What are some good ways to deploy models on Kubernetes?"))





[0.001241530291736126, -0.0037452257238328457, 0.028751228004693985, -0.0024331931490451097, -0.003989404998719692, 0.017030632123351097, -0.027114195749163628, 0.010537531226873398, 0.0011297581950202584, -0.019726919010281563, 0.015338573604822159, 0.008797325193881989, 4.336543133831583e-05, 0.016796769574284554, -0.016934335231781006, 0.0005438143271021545, 0.012690434232354164, 0.014169265516102314, 0.0015906032640486956, -0.016906822100281715, -0.026797795668244362, -0.0018382214475423098, 0.017966078594326973, -0.018254965543746948, -0.0196856502443552, 0.019314222037792206, 0.029411543160676956, -0.009670867584645748, 0.018640149384737015, -0.014568205922842026, 0.03986653685569763, -0.002061765640974045, -0.008322724141180515, -0.022340666502714157, -0.01270419079810381, 0.014747041277587414, 0.005595484282821417, -0.0019431152613833547, -0.00021290438598953187, -0.017292005941271782, 0.012085145339369774, 0.010929593816399574, 0.0104206008836627, -0.034584011882543564, 0.0057

## ⚠️ [TODO] On-demand function 2: Fetch Nearest Neighbors

In [None]:
# Input: Question_Embedding
# Output: Corresponding Raw_Messages
# The corresponding raw_messages will be summarized using on-demand function no3
# summarize

@ff.ondemand_feature(name="nearest_neighbors_fetch", variant=f"{variant}")
def nearest_neighbors_fetch(client, params, entities):
  pass

In [None]:
# TODO: Logic that needs to be converted into on-demand syntax above

def nearest_neighbors_fetch(question):
  # Get the questions embedding
  question_embedding = get_question_embedding(question)

  # Find the 2 nearest neighbors
  # i.e. threads with highest cosine similairty value to the question_embedding
  # This should be native to Redis but I'm iffy on what the actual syntax is
  # Ask Erik?
  threads = client.nearest(QAThread.embedding, question_embedding, 2)

  # Return raw_messages corresponding to those threads
  # Note: Not sure this actually works
  # This was Simba pseudo code
  return [client.features([QAThread.summary], entities={"qathread": thread}) for thread in threads]


# Example
# print(nearest_neighbors_fetch("What are some good ways to deploy models on Kubernetes?"))
# => ["messages blah blah blah", "other messages blah blah blah"] =

## ⚠️ [TODO] On-Demand Function 3: Summarize Threads

In [None]:
# Input: Raw Messages List
# Output: Summarize Messages List
# The generated summaries will be fed into the final prompt constructed outside of Featureform

@ff.ondemand_feature()
def summarize(client, params, entities):
    import openai
    openai.organization = "org-dspDi7B6opG0YQzN9mtOXBZE"
    openai.api_key = "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"

    raw_messages_list = params[3]
    summaries_list=[]
    prompt = "Summarize the following conversation on the MLOps.community slack channel. " \
              "Do not use the usernames in the summary. ```"

    # Append the generated summary
    for message in raw_messages_list:
      content = f"{prompt} {message[:500]} ```"
      resp = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=[{"role": "user", "content": content}])
      summaries_list.append(resp["choices"][0]["message"]["content"])

    return summaries_list

In [None]:
# TODO: Logic that needs to be converted into on-demand syntax above

def summarize(raw_messages_list):
    import openai
    openai.organization = "org-dspDi7B6opG0YQzN9mtOXBZE"
    openai.api_key = "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"

    summaries_list=[]
    prompt = "Summarize the following conversation on the MLOps.community slack channel. " \
              "Do not use the usernames in the summary. ```"

    # Append the generated summary
    for message in raw_messages_list:
      content = f"{prompt} {message[:500]} ```"
      resp = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=[{"role": "user", "content": content}])
      summaries_list.append(resp["choices"][0]["message"]["content"])

    return summaries_list

In [None]:
client.apply()

# Apply & Connect Host

In [None]:
serving = ff.ServingClient("hackathon.featureform.com")

## ⚠️ [TODO] Q & A Setup

In [None]:
# TODO: This needs to be completed


# Note: This is the final prompt.
# Essentially a question is posed,
# which kicks off the on-demand functions to
# i. generate the question_embedding
# ii. find the 2 most similar threads stashed in Redis
# iii. summarize the raw threads
# iv. use those summarizations as part of the final prompt
# which answers the initial question posed but using the information from the summaries.



def get_answer(question):

    # Should kick off Steps 1-3 (get_question_embedding, nearest_neighbors_fetch, summarize)
    summaries_list = summarize(question)

    # Prompt that will generate answer
    prompt = "Use the following summaries of conversations on the \
              MLOps.community slack channel backtics to generate an \
              answer for the user question."

    # Add summaries to list
    for i, summary in enumerate(summaries_list):
      # print(f"Getting summary for conversation {i+1}")
      prompt += f"\nConversation {i+1} Summary:\n```\n{summary}```"

    # Make it a question
    if not question.endswith("?"):
      question = question + "?"
    prompt+= f"\nQuestion: {question}"

    # Getting answer for the question
    print(f"Getting answer for the question.")

    # The final prompt that includes summaries in the context window
    # Grab the resulting content
    completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}])
    content = completion.choices[0].message.content

    return content

# Examples

### Example 1: `"What are some good ways to deploy models on Kubernetes?"`

In [None]:
# Original hackathon code

question="What are some good ways to deploy models on Kubernetes?"
answer = get_answer(question)
print(f"\n\nQuestion: {question}\nAnswer: {answer}")

### Example 2: `"How can I structure a good Data Science team?"`

In [None]:
# Original hackathon code

question="How can I structure a good Data Science team?"
answer = get_answer(question)
print(f"\n\nQuestion: {question}\nAnswer: {answer}")

### Example 3: `"What is the best way to train models for tabular data?"`

In [None]:
# Original hackathon code

question="What is the best way to train models for tabular data?"
answer = get_answer(question)
print(f"\n\nQuestion: {question}\nAnswer: {answer}")