In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
# Thank you

StatementMeta(, 9fda3223-c832-4e0a-8a64-87657991b8c9, 3, Finished, Available)

## Uploading Articles to AI Search 

In [35]:
pip install python-dotenv

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 37, Finished, Available)

Note: you may need to restart the kernel to use updated packages.


In [85]:
from dotenv import dotenv_values
from pyspark.sql import SparkSession
from synapse.ml.cognitive import *
from pyspark.sql.functions import lit, date_format, col,posexplode, udf,monotonically_increasing_id
import requests
import json
import openai
import os
from synapse.ml.featurize.text import PageSplitter
from pyspark.sql.types import ArrayType, FloatType
import re



StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 87, Finished, Available)

In [37]:
config = dotenv_values('/lakehouse/default/Files/Credentials.env')

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 39, Finished, Available)

In [38]:
Ai_search_key = config['Ai_search_key']
Ai_search_location = config['Ai_search_location']
Ai_search_endpoint = config['Ai_search_endpoint']
Ai_search_index = 'fabrichackathonindex'
Ai_search_name = 'fabric-hackathon'

openai_key = config['openai_api_key'] 
openai_deployment_name = "gpt-35-turbo"
openai_url = config['open_ai_endpoint']


StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 40, Finished, Available)

In [67]:
#read articles data into a df
df = spark.sql("SELECT * FROM CanadianPRScores.CanadaPRArticles LIMIT 1000")
display(df)

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 69, Finished, Available)

SynapseWidget(Synapse.DataFrame, 01bb8449-0786-4573-ba19-10e43eee1dbc)

In [68]:
# add a column for search action
df = df.withColumn("SearchAction", lit("upload"))

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 70, Finished, Available)

In [69]:
display(df)

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 71, Finished, Available)

SynapseWidget(Synapse.DataFrame, e8032295-ba7f-4e5a-a114-f20a55bc2f3e)

In [70]:
# df id with Ai search type 
df = df.withColumn("id", col("id").cast("string"))


StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 72, Finished, Available)

In [71]:
display(df.dtypes)

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 73, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9144df21-986b-47fc-954b-9baaa395cb9c)

In [72]:
ps = (
    PageSplitter()
    .setInputCol("Article_body")
    .setMaximumPageLength(2000)
    .setMinimumPageLength(1000)
    .setOutputCol("chunks")
)

splitted_df = ps.transform(df)
display(splitted_df)


StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 74, Finished, Available)

SynapseWidget(Synapse.DataFrame, 33a41803-ca85-43b0-8236-7594cacefc22)

In [73]:
# Each "chunks" column contains the chunks for a single document in an array
# The posexplode function will separate each chunk into its own row
exploded_df = splitted_df.select("Article_name", posexplode(col("chunks")).alias("chunk_index", "chunk"))

# Add a unique identifier for each chunk
exploded_df = exploded_df.withColumn("unique_id", concat(exploded_df.Article_name, exploded_df.chunk_index))

display(exploded_df)


StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 75, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4c60af6a-3e9b-422d-86c5-072c2ef2875c)

In [74]:

def get_embeddings(text):
    openai.api_type = "azure"
    openai.api_key = openai_key
    openai.api_base = openai_url
    openai.api_version = "2023-09-01-preview"

    response = openai.Embedding.create(
        input= text,
        engine="text-embedding-ada-002"
    )
    embeddings = response['data'][0]['embedding']
    return embeddings

# Define a UDF (User Defined Function)
get_embeddings_udf = udf(get_embeddings, ArrayType(FloatType()))

# Apply the UDF to the 'chunk' column
df_embeddings = exploded_df.withColumn('embeddings', get_embeddings_udf(exploded_df['chunk']))

display(df_embeddings)

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 76, Finished, Available)

SynapseWidget(Synapse.DataFrame, 496e6692-6ff7-4504-b7d6-13a107a62530)

In [75]:
df_embeddings = df_embeddings.join(df.select('Date', 'Article_name'), on='Article_name', how='left')

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 77, Finished, Available)

In [80]:
df_embeddings = df_embeddings.join(df.select('id', 'Article_name'), on='Article_name', how='left')

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 82, Finished, Available)

In [81]:
display(df_embeddings.dtypes)

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 83, Finished, Available)

SynapseWidget(Synapse.DataFrame, c40e4478-4636-4c65-8e0c-73132b381619)

In [77]:
# df date type with Ai search type 
df_embeddings = df_embeddings.withColumn("Date", date_format("Date", "yyyy-MM-dd"))

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 79, Finished, Available)

In [82]:
display(df_embeddings)

StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 84, Finished, Available)

SynapseWidget(Synapse.DataFrame, 97d9a68f-37d3-49da-9a4a-5ca47ee903f3)

In [83]:
# Length of the embedding vector (OpenAI ada-002 generates embeddings of length 1536)
EMBEDDING_LENGTH = 1536

# Create index for AI Search with fields id, content, and contentVector
# Note the datatypes for each field below
url = f"https://{Ai_search_name}.search.windows.net/indexes/{Ai_search_index}?api-version=2023-11-01"
payload = json.dumps(
    {
        "name": Ai_search_index,
        "fields": [
            # Unique identifier for each document
            {
                "name": "id",
                "type": "Edm.String",
                "key": True,
                "filterable": True,
                "sortable": True,
                "retrievable": True,
            },
            # Article name of the document
            {
                "name": "Article_name",
                "type": "Edm.String",
                "filterable": True,
                "sortable": True,
                "retrievable": True,
            },
            # Text content of the document
            {
                "name": "Article_body",
                "type": "Edm.String",
                "filterable": True,
                "sortable": True,
                "retrievable": True,
            },
             #  Date of the document
            {
                "name": "Posted_date",
                "type": "Edm.String",
                "filterable": True,
                "sortable": True,
                "retrievable": True,
            },

            # Vector embedding of the text content
            {
                "name": "contentVector",
                "type": "Collection(Edm.Single)",
                "searchable": True,
                "retrievable": True,
                "dimensions": EMBEDDING_LENGTH,
                "vectorSearchProfile": "vectorConfig",
            },
        ],
        "vectorSearch": {
            "algorithms": [{"name": "hnswConfig", "kind": "hnsw", "hnswParameters": {"metric": "cosine"}}],
            "profiles": [{"name": "vectorConfig", "algorithm": "hnswConfig"}],
        },
    }
)
headers = {"Content-Type": "application/json", "api-key": Ai_search_key}

response = requests.request("PUT", url, headers=headers, data=payload)
if response.status_code == 201:
    print("Index created!")
elif response.status_code == 204:
    print("Index updated!")
else:
    print(f"HTTP request failed with status code {response.status_code}")
    print(f"HTTP response body: {response.text}")


StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 85, Finished, Available)

Index created!


In [91]:

def insert_into_index(documents):
    """Uploads a list of 'documents' to Azure AI Search index."""

    url = f"https://{Ai_search_name}.search.windows.net/indexes/{Ai_search_index}/docs/index?api-version=2023-11-01"

    payload = json.dumps({"value": documents})
    headers = {
        "Content-Type": "application/json",
        "api-key": Ai_search_key,
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    if response.status_code == 200 or response.status_code == 201:
        return "Success"
    else:
        return f"Failure: {response.text}"

def make_safe_id(row_id: str):
    """Strips disallowed characters from row id for use as Azure AI search document ID."""
    return re.sub("[^0-9a-zA-Z_-]", "_", row_id)


def upload_rows(rows):
    """Uploads the rows in a Spark dataframe to Azure AI Search.
    Limits uploads to 1000 rows at a time due to Azure AI Search API limits.
    """
    BATCH_SIZE = 1000
    rows = list(rows)
    for i in range(0, len(rows), BATCH_SIZE):
        row_batch = rows[i : i + BATCH_SIZE]
        documents = []
        for row in rows:
            documents.append(
                {
                    "id":row["id"],
                    "Article_name" : row["Article_name"],
                    "Article_body": row["chunk"],
                    "Posted_date" : row["Date"],
                    "contentVector": row["embeddings"],
                    "@search.action": "upload",
                },
            )
        status = insert_into_index(documents)
        yield [row_batch[0]["row_index"], row_batch[-1]["row_index"], status]

# Add ID to help track what rows were successfully uploaded
df_embeddings = df_embeddings.withColumn("row_index", monotonically_increasing_id())

# Run upload_batch on partitions of the dataframe
res = df_embeddings.rdd.mapPartitions(upload_rows)
display(res.toDF(["start_index", "end_index", "insertion_status"]))


StatementMeta(, 5d03dfdf-4b70-42d4-ba15-2df796e157f1, 93, Finished, Available)

SynapseWidget(Synapse.DataFrame, c3e07921-eef6-420a-a12c-9c6de8ff1b22)