In [7]:
# Azure AI Search
AI_SEARCH_NAME = "docuemtsearch"
AI_SEARCH_INDEX_NAME = "rag-demo-index"
AI_SEARCH_KEY = "3lfjU312TfrlIeu2ZHSZaNHgtfv0McJMcQKphoPZN6AzSeAgxYQZ"

# Azure AI Services
AI_SERVICES_KEY = "205527ea2da24acd95500cad1c09dcbb"
AI_SERVICES_LOCATION = "eastus2"

StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 9, Finished, Available)

In [8]:
import requests
import os

url = "https://github.com/PascalBurume/Economic-Horizons-Unveiling-Income-Patterns-through-Machine-Learning/raw/main/Data/ManagingYourPersonalFinance.pdf"
response = requests.get(url)

# Specify your path here
path = "/lakehouse/default/Files/"

# Ensure the directory exists
os.makedirs(path, exist_ok=True)

# Write the content to a file in the specified path
filename = url.rsplit("/")[-1]
with open(os.path.join(path, filename), "wb") as f:
    f.write(response.content)


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 10, Finished, Available)

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

document_path = f"Files/{filename}"

df = spark.read.format("binaryFile").load(document_path).select("_metadata.file_name", "content").limit(10).cache()

display(df)


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, d3c13b05-4f00-49d7-a441-12604da8f83e)

In [10]:
from synapse.ml.services import AnalyzeDocument
from pyspark.sql.functions import col

analyze_document = (
    AnalyzeDocument()
    .setPrebuiltModelId("prebuilt-layout")
    .setSubscriptionKey(AI_SERVICES_KEY)
    .setLocation(AI_SERVICES_LOCATION)
    .setImageBytesCol("content")
    .setOutputCol("result")
)

analyzed_df = (
    analyze_document.transform(df)
    .withColumn("output_content", col("result.analyzeResult.content"))
    .withColumn("paragraphs", col("result.analyzeResult.paragraphs"))
).cache()


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 12, Finished, Available)

In [11]:
analyzed_df = analyzed_df.drop("content")
display(analyzed_df)


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4daabec4-bda7-4340-9ed7-dae0332af8b7)

In [12]:
from synapse.ml.featurize.text import PageSplitter

ps = (
    PageSplitter()
    .setInputCol("output_content")
    .setMaximumPageLength(4000)
    .setMinimumPageLength(3000)
    .setOutputCol("chunks")
)

splitted_df = ps.transform(analyzed_df)
display(splitted_df)


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 76defd0f-02a4-4d18-84ad-74d3756e4ced)

In [13]:
from pyspark.sql.functions import posexplode, col, concat

# Each "chunks" column contains the chunks for a single document in an array
# The posexplode function will separate each chunk into its own row
exploded_df = splitted_df.select("file_name", posexplode(col("chunks")).alias("chunk_index", "chunk"))

# Add a unique identifier for each chunk
exploded_df = exploded_df.withColumn("unique_id", concat(exploded_df.file_name, exploded_df.chunk_index))

display(exploded_df)


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4c888dbc-aea4-46a2-824d-94afe6f88921)

In [14]:
from synapse.ml.services import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setDeploymentName("text-embedding-ada-002")
    .setTextCol("chunk")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

df_embeddings = embedding.transform(exploded_df)

display(df_embeddings)


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1b8cfd5a-2762-4730-9869-156341333118)

In [15]:
import requests
import json

# Length of the embedding vector (OpenAI ada-002 generates embeddings of length 1536)
EMBEDDING_LENGTH = 1536

# Create index for AI Search with fields id, content, and contentVector
# Note the datatypes for each field below
url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}?api-version=2023-11-01"
payload = json.dumps(
    {
        "name": AI_SEARCH_INDEX_NAME,
        "fields": [
            # Unique identifier for each document
            {
                "name": "id",
                "type": "Edm.String",
                "key": True,
                "filterable": True,
            },
            # Text content of the document
            {
                "name": "content",
                "type": "Edm.String",
                "searchable": True,
                "retrievable": True,
            },
            # Vector embedding of the text content
            {
                "name": "contentVector",
                "type": "Collection(Edm.Single)",
                "searchable": True,
                "retrievable": True,
                "dimensions": EMBEDDING_LENGTH,
                "vectorSearchProfile": "vectorConfig",
            },
        ],
        "vectorSearch": {
            "algorithms": [{"name": "hnswConfig", "kind": "hnsw", "hnswParameters": {"metric": "cosine"}}],
            "profiles": [{"name": "vectorConfig", "algorithm": "hnswConfig"}],
        },
    }
)
headers = {"Content-Type": "application/json", "api-key": "3lfjU312TfrlIeu2ZHSZaNHgtfv0McJMcQKphoPZN6AzSeAgxYQZ"}

response = requests.request("PUT", url, headers=headers, data=payload)
if response.status_code == 201:
    print("Index created!")
elif response.status_code == 204:
    print("Index updated!")
else:
    print(f"HTTP request failed with status code {response.status_code}")
    print(f"HTTP response body: {response.text}")


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 17, Finished, Available)

Index created!


In [16]:
pip install azure-search-documents

StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 18, Finished, Available)

Collecting azure-search-documents
  Downloading azure_search_documents-11.4.0-py3-none-any.whl (283 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m283.8/283.8 kB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
Collecting azure-common~=1.1 (from azure-search-documents)
  Downloading azure_common-1.1.28-py2.py3-none-any.whl (14 kB)
Collecting typing-extensions>=4.6.0 (from azure-core<2.0.0,>=1.28.0->azure-search-documents)
  Downloading typing_extensions-4.10.0-py3-none-any.whl (33 kB)
Installing collected packages: azure-common, typing-extensions, azure-search-documents
  Attempting uninstall: typing-extensions
    Found existing installation: typing_extensions 4.5.0
    Uninstalling typing_extensions-4.5.0:
      Successfully uninstalled typing_extensions-4.5.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sentence-transformers 2.0.0 requi

In [17]:
import re

from pyspark.sql.functions import monotonically_increasing_id


def insert_into_index(documents):
    """Uploads a list of 'documents' to Azure AI Search index."""

    url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}/docs/index?api-version=2023-11-01"

    payload = json.dumps({"value": documents})
    headers = {
        "Content-Type": "application/json",
        "api-key": "3lfjU312TfrlIeu2ZHSZaNHgtfv0McJMcQKphoPZN6AzSeAgxYQZ",
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    if response.status_code == 200 or response.status_code == 201:
        return "Success"
    else:
        return f"Failure: {response.text}"

def make_safe_id(row_id: str):
    """Strips disallowed characters from row id for use as Azure AI search document ID."""
    return re.sub("[^0-9a-zA-Z_-]", "_", row_id)


def upload_rows(rows):
    """Uploads the rows in a Spark dataframe to Azure AI Search.
    Limits uploads to 1000 rows at a time due to Azure AI Search API limits.
    """
    BATCH_SIZE = 1000
    rows = list(rows)
    for i in range(0, len(rows), BATCH_SIZE):
        row_batch = rows[i : i + BATCH_SIZE]
        documents = []
        for row in rows:
            documents.append(
                {
                    "id": make_safe_id(row["unique_id"]),
                    "content": row["chunk"],
                    "contentVector": row["embeddings"].tolist(),
                    "@search.action": "upload",
                },
            )
        status = insert_into_index(documents)
        yield [row_batch[0]["row_index"], row_batch[-1]["row_index"], status]

# Add ID to help track what rows were successfully uploaded
df_embeddings = df_embeddings.withColumn("row_index", monotonically_increasing_id())

# Run upload_batch on partitions of the dataframe
res = df_embeddings.rdd.mapPartitions(upload_rows)
display(res.toDF(["start_index", "end_index", "insertion_status"]))


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, f3ed533e-8270-4f17-9f84-fec56ebd8ba5)

In [18]:
# Azure AI Search
AI_SEARCH_NAME = 'docuemtsearch'
AI_SEARCH_INDEX_NAME = 'rag-demo-index'
AI_SEARCH_API_KEY = '3lfjU312TfrlIeu2ZHSZaNHgtfv0McJMcQKphoPZN6AzSeAgxYQZ'


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 20, Finished, Available)

In [19]:
def gen_question_embedding(user_question):
    """Generates embedding for user_question using SynapseML."""
    from synapse.ml.services import OpenAIEmbedding

    df_ques = spark.createDataFrame([(user_question, 1)], ["questions", "dummy"])
    embedding = (
        OpenAIEmbedding()
        .setDeploymentName('text-embedding-ada-002')
        .setTextCol("questions")
        .setErrorCol("errorQ")
        .setOutputCol("embeddings")
    )
    df_ques_embeddings = embedding.transform(df_ques)
    row = df_ques_embeddings.collect()[0]
    question_embedding = row.embeddings.tolist()
    return question_embedding


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 21, Finished, Available)

In [20]:
import json 
import requests

def retrieve_top_chunks(k, question, question_embedding):
    """Retrieve the top K entries from Azure AI Search using hybrid search."""
    url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}/docs/search?api-version=2023-11-01"

    payload = json.dumps({
        "search": question,
        "top": k,
        "vectorQueries": [
            {
                "vector": question_embedding,
                "k": k,
                "fields": "contentVector",
                "kind": "vector"
            }
        ]
    })

    headers = {
        "Content-Type": "application/json",
        "api-key": AI_SEARCH_API_KEY,
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    output = json.loads(response.text)
    return output


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 22, Finished, Available)

In [21]:
def get_context(user_question, retrieved_k = 5):
    # Generate embeddings for the question
    question_embedding = gen_question_embedding(user_question)

    # Retrieve the top K entries
    output = retrieve_top_chunks(retrieved_k, user_question, question_embedding)

    # concatenate the content of the retrieved documents
    context = [chunk["content"] for chunk in output["value"]]

    return context


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 23, Finished, Available)

In [22]:
from pyspark.sql import Row
from synapse.ml.services.openai import OpenAIChatCompletion


def make_message(role, content):
    return Row(role=role, content=content, name=role)

def get_response(user_question):
    context = get_context(user_question)

    # Write a prompt with context and user_question as variables 
    prompt = f"""
    context: {context}
    Answer the question based on the context above.
    If the information to answer the question is not present in the given context then reply "I don't know".
    """

    chat_df = spark.createDataFrame(
        [
            (
                [
                    make_message(
                        "system", prompt
                    ),
                    make_message("user", user_question),
                ],
            ),
        ]
    ).toDF("messages")

    chat_completion = (
        OpenAIChatCompletion()
        .setDeploymentName("gpt-35-turbo-16k") # deploymentName could be one of {gpt-35-turbo, gpt-35-turbo-16k}
        .setMessagesCol("messages")
        .setErrorCol("error")
        .setOutputCol("chat_completions")
    )

    result_df = chat_completion.transform(chat_df).select("chat_completions.choices.message.content")

    result = []
    for row in result_df.collect():
        content_string = ' '.join(row['content'])
        result.append(content_string)

    # Join the list into a single string
    result = ' '.join(result)
    
    return result


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 24, Finished, Available)

In [23]:
user_question = "What are some strategies for managing income effectively?"
response = get_response(user_question)
print(response)


StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 25, Finished, Available)

One strategy for managing income effectively is to create a budget or spending plan. This allows you to track your income and expenses and make sure that you are not spending more than you earn. Another strategy is to save a portion of your income for future goals or emergencies. This can be done by setting up automatic savings deductions or manually transferring money into a savings account. Additionally, it is important to prioritize your expenses and make sure that your spending aligns with your financial goals. This might involve cutting back on unnecessary expenses or finding ways to save money on essential items.


In [25]:
user_question = "What are some ways to reduce expenses and save money?"
response = get_response(user_question)
print(response)

StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 27, Finished, Available)

Some ways to reduce expenses and save money include:

1. Keep track of your expenses: By monitoring and recording your expenses, you can identify areas where you can cut back and save money.

2. Create a budget: Establish a spending plan that outlines your income and expenses, and allocate a certain amount for savings each month.

3. Cut back on discretionary spending: Reduce expenses on non-essential items such as eating out, entertainment, and unnecessary purchases.

4. Make use of coupons and discounts: Look for coupons, deals, and discounts when shopping for groceries, clothing, and other essential items.

5. Cook meals at home: Eating out can be expensive, so try cooking meals at home more often. This can save a significant amount of money.

6. Reduce utility costs: Cut back on energy usage by turning off lights when not in use, unplugging electronics, and adjusting thermostat settings.

7. Shop for better deals: Compare prices and shop around for better deals on insurance policie

In [26]:
user_question = "How can I create a budget to manage my income?"
response = get_response(user_question)
print(response)

StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 28, Finished, Available)

To create a budget to manage your income, you can follow these steps:

Step 1: Establish financial goals - Determine what you want to achieve financially. Set short-term goals (such as saving for a vacation) and long-term goals (such as saving for retirement).

Step 2: Estimate your income - Calculate your total income from all sources, including salaries, bonuses, investments, and any other sources of income.

Step 3: Estimate your expenses - Track your expenses for a month to get an idea of how much you spend on different categories such as housing, transportation, groceries, entertainment, etc. Categorize your expenses and estimate how much you spend in each category.

Step 4: Compare income and expenses - Compare your estimated income with your estimated expenses. If your income is higher than your expenses, you have a surplus. If your expenses are higher than your income, you have a deficit.

Step 5: Adjust your expenses - If you have a deficit, review your expenses and identify a

In [27]:
user_question = "How can I invest my income to achieve financial goals?"
response = get_response(user_question)
print(response)

StatementMeta(, c643af9f-1b6d-45a2-a752-8d51704ba4ed, 29, Finished, Available)

To invest your income and achieve financial goals, you can follow these steps:

1. Develop a personal financial plan: Start by setting clear financial goals for yourself, such as saving for a down payment on a house or retirement. Create a budget to determine how much money you can allocate towards investments.

2. Establish an emergency fund: Before you start investing, it's important to have a safety net. Set aside some money in an emergency fund that can cover your expenses for at least three to six months.

3. Determine your risk tolerance: Assess how comfortable you are with taking risks and adjust your investment strategy accordingly. Generally, higher risk investments have the potential for higher returns, but they also carry greater uncertainty.

4. Diversify your investments: Spread your investments across different asset classes, such as stocks, bonds, mutual funds, and real estate. This helps mitigate risk and maximize potential returns.

5. Start early and invest consistent