In [21]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 25, Finished, Available)

#### 環境変数の設定

In [22]:
# キー値について、今回は張り付けていますが、安全性のためにKey-Vaultなどを使用してください。
# Azure AI Search
AI_SEARCH_NAME = "xxxxxxx"
AI_SEARCH_INDEX_NAME = "xxxxxxx"
AI_SEARCH_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

# Azure AI Services
AI_SERVICES_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
AI_SERVICES_LOCATION = "xxxxxxx"

# Azure OpenAI
AZURE_OPENAI_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" 
AZURE_OPENAI_ENDPOINT = "xxxxxxx"

StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 26, Finished, Available)

#### データの読み込み・Spark DataFrame への変換

In [23]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

document_path = f"Files/*.pdf"

df = spark.read.format("binaryFile").load(document_path).select("_metadata.file_name", "content").limit(10).cache()

display(df)


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 27, Finished, Available)

SynapseWidget(Synapse.DataFrame, 08680516-e63b-499c-bc60-6a6503bef793)

#### 

#### テキスト情報の抽出

In [24]:
from synapse.ml.services import AnalyzeDocument
from pyspark.sql.functions import col

analyze_document = (
    AnalyzeDocument()
    .setPrebuiltModelId("prebuilt-layout")
    .setSubscriptionKey(AI_SERVICES_KEY)
    .setLocation(AI_SERVICES_LOCATION)
    .setImageBytesCol("content")
    .setOutputCol("result")
)

analyzed_df = (
    analyze_document.transform(df)
    .withColumn("output_content", col("result.analyzeResult.content"))
    .withColumn("paragraphs", col("result.analyzeResult.paragraphs"))
).cache()


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 28, Finished, Available)

In [25]:
analyzed_df = analyzed_df.drop("content")
display(analyzed_df)


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 29, Finished, Available)

SynapseWidget(Synapse.DataFrame, 50a9b8a9-7fcb-4114-89e0-09721f37e6c6)

#### テキストのチャンク化

In [26]:
from synapse.ml.featurize.text import PageSplitter

ps = (
    PageSplitter()
    .setInputCol("output_content")
    .setMaximumPageLength(1000)
    .setMinimumPageLength(500)
    .setOutputCol("chunks")
)

splitted_df = ps.transform(analyzed_df)
display(splitted_df)


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 30, Finished, Available)

SynapseWidget(Synapse.DataFrame, fc80eca3-7e66-4253-a946-a983c6ca1f51)

In [27]:
from pyspark.sql.functions import posexplode, col, concat

# Each "chunks" column contains the chunks for a single document in an array
# The posexplode function will separate each chunk into its own row
exploded_df = splitted_df.select("file_name", posexplode(col("chunks")).alias("chunk_index", "chunk"))

# Add a unique identifier for each chunk
exploded_df = exploded_df.withColumn("unique_id", concat(exploded_df.file_name, exploded_df.chunk_index))

display(exploded_df)


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 31, Finished, Available)

SynapseWidget(Synapse.DataFrame, e6ad4871-2d58-48eb-9b9d-9e86370caf12)

#### 

#### チャンク化したテキストの Embeddings 計算 (F64以上)

In [28]:
from synapse.ml.services import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setDeploymentName("text-embedding-ada-002")
    .setTextCol("chunk")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

df_embeddings = embedding.transform(exploded_df)

display(df_embeddings)


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 32, Finished, Available)

Failed to fetch cluster details
Traceback (most recent call last):
  File "/home/trusted-service-user/cluster-env/clonedenv/lib/python3.10/site-packages/synapse/ml/fabric/token_utils.py", line 188, in _get_openai_mwc_token
    raise Exception(
Exception: get openai mwc token returns 403:b'{"Message":"FT1 SKU Not Supported","Source":"ML","error_code":"PERMISSION_DENIED"}'


SynapseWidget(Synapse.DataFrame, 9591853d-1371-4b77-b306-62da0ae508e2)

#### チャンク化したテキストの Embeddings 計算 (F64未満)

In [29]:
# If you got "FT1 SKU Not Supported","Source":"ML","error_code":"PERMISSION_DENIED" error, try below instead
# Create new workspace environment, and install openai latest ver
from openai import AzureOpenAI
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

def compute_embedding(text):
    aoai_client = AzureOpenAI(
        api_key = AZURE_OPENAI_KEY,
        api_version = "2024-02-01",
        azure_endpoint = AZURE_OPENAI_ENDPOINT
    )
    embedding_results = aoai_client.embeddings.create(input=text, model="text-embedding-ada-002").data[0].embedding
    return embedding_results

compute_embedding_udf = udf(compute_embedding, ArrayType(FloatType()))
df_embeddings = exploded_df.withColumn("embeddings", compute_embedding_udf("chunk"))
display(df_embeddings)

StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 33, Finished, Available)

SynapseWidget(Synapse.DataFrame, a70b5c0f-f1e3-4295-aa29-9f0a020a9278)

#### 

#### インデックスの作成
- テキストデータのアナライザーを日本語に設定しています。
- セマンティック構成を追加し、ハイブリッドセマンティック検索を行っています。

In [30]:
import requests
import json

# Length of the embedding vector (OpenAI ada-002 generates embeddings of length 1536)
EMBEDDING_LENGTH = 1536

# Create index for AI Search with fields id, content, and contentVector
# Note the datatypes for each field below
url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}?api-version=2023-11-01"
payload = json.dumps(
    {
        "name": AI_SEARCH_INDEX_NAME,
        "fields": [
            # Unique identifier for each document
            {
                "name": "id",
                "type": "Edm.String",
                "key": True,
                "filterable": True,
            },
            # Text content of the document
            {
                "name": "content",
                "type": "Edm.String",
                "searchable": True,
                "retrievable": True,
                "analyzer":"ja.microsoft" # set Japanese analyzer
            },
            # Vector embedding of the text content
            {
                "name": "contentVector",
                "type": "Collection(Edm.Single)",
                "searchable": True,
                "retrievable": True,
                "dimensions": EMBEDDING_LENGTH,
                "vectorSearchProfile": "vectorConfig",
            },
        ],
        "semantic": {
            "configurations": [
                {
                    "name": "semantic-config-demo",
                    "prioritizedFields": {
                        "titleField":None,
                        "prioritizedContentFields": [
                            { "fieldName": "content" }
                        ],
                    }
                },
            ]
        },
        "vectorSearch": {
            "algorithms": [{"name": "hnswConfig", "kind": "hnsw", "hnswParameters": {"metric": "cosine"}}],
            "profiles": [{"name": "vectorConfig", "algorithm": "hnswConfig"}],
        },
    }
)
headers = {"Content-Type": "application/json", "api-key": AI_SEARCH_KEY}

response = requests.request("PUT", url, headers=headers, data=payload)
if response.status_code == 201:
    print("Index created!")
elif response.status_code == 204:
    print("Index updated!")
else:
    print(f"HTTP request failed with status code {response.status_code}")
    print(f"HTTP response body: {response.text}")


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 34, Finished, Available)

Index updated!


#### チャンク化したテキストデータと Embedding データをインデックスに登録

In [31]:
import re

from pyspark.sql.functions import monotonically_increasing_id


def insert_into_index(documents):
    """Uploads a list of 'documents' to Azure AI Search index."""

    url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}/docs/index?api-version=2023-11-01"

    payload = json.dumps({"value": documents})
    headers = {
        "Content-Type": "application/json",
        "api-key": AI_SEARCH_KEY,
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    if response.status_code == 200 or response.status_code == 201:
        return "Success"
    else:
        return f"Failure: {response.text}"

def make_safe_id(row_id: str):
    """Strips disallowed characters from row id for use as Azure AI search document ID."""
    return re.sub("[^0-9a-zA-Z_-]", "_", row_id)


def upload_rows(rows):
    """Uploads the rows in a Spark dataframe to Azure AI Search.
    Limits uploads to 1000 rows at a time due to Azure AI Search API limits.
    """
    BATCH_SIZE = 1000
    rows = list(rows)
    for i in range(0, len(rows), BATCH_SIZE):
        row_batch = rows[i : i + BATCH_SIZE]
        documents = []
        for row in rows:
            documents.append(
                {
                    "id": make_safe_id(row["unique_id"]),
                    "content": row["chunk"],
                    "contentVector": row["embeddings"], # .tolist()不要
                    "@search.action": "upload",
                },
            )
        status = insert_into_index(documents)
        yield [row_batch[0]["row_index"], row_batch[-1]["row_index"], status]

# Add ID to help track what rows were successfully uploaded
df_embeddings = df_embeddings.withColumn("row_index", monotonically_increasing_id())

# Run upload_batch on partitions of the dataframe
res = df_embeddings.rdd.mapPartitions(upload_rows)
display(res.toDF(["start_index", "end_index", "insertion_status"]))


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 35, Finished, Available)

SynapseWidget(Synapse.DataFrame, aab1eaaa-a14a-44b2-ab54-72625f79fc82)

#### ユーザーからの質問の Embeddings 計算 (F64以上)

In [32]:
def gen_question_embedding(user_question):
    """Generates embedding for user_question using SynapseML."""
    from synapse.ml.services import OpenAIEmbedding

    df_ques = spark.createDataFrame([(user_question, 1)], ["questions", "dummy"])
    # Update
    embedding = (
        OpenAIEmbedding()
        .setDeploymentName('text-embedding-ada-002')
        .setTextCol("questions")
        .setErrorCol("errorQ")
        .setOutputCol("embeddings")
    )
    df_ques_embeddings = embedding.transform(df_ques)
    row = df_ques_embeddings.collect()[0]
    question_embedding = row.embeddings.tolist()
    return question_embedding


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 36, Finished, Available)

#### ユーザーからの質問の Embeddings 計算 (F64未満)

In [33]:
def gen_question_embedding(user_question):
    """Generates embedding for user_question using SynapseML."""
    aoai_client = AzureOpenAI(
        api_key = AZURE_OPENAI_KEY,
        api_version = "2024-02-01",
        azure_endpoint = AZURE_OPENAI_ENDPOINT
    )
    question_embedding = aoai_client.embeddings.create(input=user_question, model="text-embedding-ada-002").data[0].embedding
    return question_embedding

StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 37, Finished, Available)

#### 検索結果の取得
- セマンティック構成の指定

In [34]:
import json 
import requests

def retrieve_top_chunks(k, question, question_embedding):
    """Retrieve the top K entries from Azure AI Search using hybrid search."""
    url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}/docs/search?api-version=2023-11-01"

    # ハイブリッド＋セマンティック検索
    payload = json.dumps({
        "queryType": "semantic",
        "search": question,
        "top": k,
        "semanticConfiguration": "semantic-config-demo",
        "vectorQueries": [
            {
                "vector": question_embedding,
                "k": k,
                "fields": "contentVector",
                "kind": "vector"
            }
        ]
    })

    headers = {
        "Content-Type": "application/json",
        "api-key": AI_SEARCH_KEY,
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    output = json.loads(response.text)
    return output


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 38, Finished, Available)

In [35]:
def get_context(user_question, retrieved_k = 5):
    # Generate embeddings for the question
    question_embedding = gen_question_embedding(user_question)

    # Retrieve the top K entries
    output = retrieve_top_chunks(retrieved_k, user_question, question_embedding)

    # concatenate the content of the retrieved documents
    context = [chunk["content"] for chunk in output["value"]]

    return context


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 39, Finished, Available)

#### プロンプトと回答の生成 (F64以上)

In [36]:
from pyspark.sql import Row
from synapse.ml.services.openai import OpenAIChatCompletion


def make_message(role, content):
    return Row(role=role, content=content, name=role)

def get_response(user_question):
    context = get_context(user_question)

    # Write a prompt with context and user_question as variables 
    prompt = f"""
    context: {context}
    Answer the question based on the context above.
    If the information to answer the question is not present in the given context then reply "I don't know".
    """

    chat_df = spark.createDataFrame(
        [
            (
                [
                    make_message(
                        "system", prompt
                    ),
                    make_message("user", user_question),
                ],
            ),
        ]
    ).toDF("messages")

    chat_completion = (
        OpenAIChatCompletion()
        .setDeploymentName("gpt-35-turbo-16k") # deploymentName could be one of {gpt-35-turbo, gpt-35-turbo-16k}
        .setSubscriptionKey(AZURE_OPENAI_KEY)
        .setMessagesCol("messages")
        .setErrorCol("error")
        .setOutputCol("chat_completions")
    )

    result_df = chat_completion.transform(chat_df).select("chat_completions.choices.message.content")

    result = []
    for row in result_df.collect():
        content_string = ' '.join(row['content'])
        result.append(content_string)

    # Join the list into a single string
    result = ' '.join(result)
    
    return result

StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 40, Finished, Available)

#### プロンプトと回答の生成 (F64未満)
- システムプロンプトは適宜変更

In [37]:
from pyspark.sql import Row
from synapse.ml.services.openai import OpenAIChatCompletion


def make_message(role, content):
    return Row(role=role, content=content, name=role)

def get_response(user_question):
    context = get_context(user_question)

    # Write a prompt with context and user_question as variables 
    prompt = f"""
    context: {context}
    あなたはユーザーからの質問に答えるアシスタントです。次の情報に基づいて質問に答えてください。
    質問に答えるための情報が十分ににない場合は、「わかりません」と答えてください。
    """

    messages = [{'role': 'system', 'content': prompt}, {'role': 'user', 'content': user_question}]
    #print(messages)

    # Updated
    aoai_client = AzureOpenAI(
        api_key = AZURE_OPENAI_KEY,
        api_version = "2024-02-01",
        azure_endpoint = AZURE_OPENAI_ENDPOINT
    )
    result = aoai_client.chat.completions.create(messages=messages, model="gpt-35-turbo-16k").choices[0].message.content
    
    return result

StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 41, Finished, Available)

In [38]:
user_question = "予約の方法を教えてください。"
response = get_response(user_question)
print(response)


StatementMeta(, 94422cd1-c706-47a1-bd51-8ebdee3e504e, 42, Finished, Available)

Contoso Real Estateでの予約方法は以下の通りです。

1. ウェブサイトまたはモバイルアプリにアクセスします。
2. 目的地、チェックイン日、チェックアウト日、宿泊人数などの予約条件を入力します。
3. 検索結果から滞在先を選択し、リストを閲覧します。
4. 選択したリスティングの詳細を見て、価格やアメニティ、ハウスルールなどを確認します。
5. リスティングページの「今すぐ予約」ボタンをクリックします。
6. 支払い情報を入力し、予約を確定します。
7. ホストが予約を承認すると、予約確認書が届きます。

以上が予約の手順です。予約に関する具体的な問題がある場合は、予約IDを教えていただければ、より具体的な回答ができます。
