# **MODULE TO CREATE AGENT BUILDER AND FINE TUNING**

# **1. Data Preparation**

The initial step involves preparing the dataset, splitting it into query and corpus data, and enriching the corpus with categories.

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()




**1.1. import data from bucket**

Retrieve the dataset from a Google Cloud Storage bucket.
The dataset is stored in a dataset.jsonl file, where each line represents a JSON object.


In [None]:
!pip install google-cloud-storage
from google.cloud import storage
import pandas as pd
# バケット名とファイル名を指定
bucket_name = 'not_confidencial_data_for_share' # @param {type:"string"}
file_name = 'qa_pairs_for_rag_1107.jsonl'# @param {type:"string"}
local_file_path = 'qa_pairs_for_rag_1107.jsonl'  # @param {type:"string"}

# Storageクライアントを初期化
client = storage.Client()

# バケットとBlobオブジェクトを取得
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(file_name)

# ファイルをローカルにダウンロード
blob.download_to_filename(local_file_path)



**1.2 Split Dataset**

The dataset is divided into two parts:

**a. Query Data:**

Extract the first portion of the dataset (e.g., the first 700 entries).
Save the extracted data as query_data.jsonl in JSONL format.

**b. Corpus Data:**

Use the remaining entries from the dataset as the corpus.
Save the corpus data as corpus_data.jsonl in JSONL format.


In [None]:
#Installation
!pip install google-cloud-discoveryengine
!pip install --upgrade --user --quiet google-cloud-aiplatform
!pip install jsonlines

Collecting google-cloud-discoveryengine
  Downloading google_cloud_discoveryengine-0.13.5-py3-none-any.whl.metadata (5.3 kB)
Downloading google_cloud_discoveryengine-0.13.5-py3-none-any.whl (2.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m26.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-cloud-discoveryengine
Successfully installed google-cloud-discoveryengine-0.13.5


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/6.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.6/6.9 MB[0m [31m19.1 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━[0m [32m4.2/6.9 MB[0m [31m59.5 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m6.9/6.9 MB[0m [31m77.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m59.4 MB/s[0m eta [36m0:00:00[0m
[0m

In [None]:
import json
import random

# Load the JSONL dataset
file_path = "qa_pairs_for_rag_1107.jsonl" # @param {type:"string"}
with open(file_path, "r", encoding="utf-8") as f:
    dataset = [json.loads(line.strip()) for line in f]

# Shuffle the dataset
random.shuffle(dataset)

# Split into query and corpus
query_count = 700
query_data = dataset[:query_count]
corpus_data = dataset[query_count:]

# Save query and corpus data to separate files
query_file = "query_data.jsonl"
corpus_file = "corpus_data.jsonl"

with open(query_file, "w", encoding="utf-8") as f_query, open(corpus_file, "w", encoding="utf-8") as f_corpus:
    for query in query_data:
        f_query.write(json.dumps(query, ensure_ascii=False) + "\n")
    for corpus in corpus_data:
        f_corpus.write(json.dumps(corpus, ensure_ascii=False) + "\n")

print(f"Query data saved to: {query_file}")
print(f"Corpus data saved to: {corpus_file}")


Query data saved to: query_data.jsonl
Corpus data saved to: corpus_data.jsonl


**1.3 Add Categories to Corpus**


*   Enrich each entry in the corpus with a category.
*   A function get_category(question) processes each question to assign a category.
*   The enriched corpus is saved in categorized_corpus.jsonl and includes question, answer, and category fields.

In [None]:
# Import necessary libraries
import vertexai
from vertexai.generative_models import GenerativeModel
import json

# Define project information
PROJECT_ID = "monobrain-development"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

# Initialize Vertex AI
try:
    vertexai.init(project=PROJECT_ID, location=LOCATION)
    print("Vertex AI initialized successfully.")
except Exception as e:
    print(f"Error initializing Vertex AI: {e}")

# Load the generative model
def initialize_model():
    try:
        # Load the Gemini model
        model = GenerativeModel("gemini-1.0-pro")
        print("Model loaded successfully.")
        return model
    except Exception as e:
        print(f"Error loading the model: {e}")
        return None

# Assign the model to a global variable
model = initialize_model()

# Categorization function
def get_categorize(text):
    if model is None:
        print("Model is not initialized.")
        return "その他"  # Default category for errors or fallback

    prompt = f"[{text}] はお客様から届いた連絡内容です。私たちはカスタマーサポートセンターの運営をしており、お客様からの連絡内容を解析しています。\
    先ほどの文章を15文字程度に要約して、どんな目的でお客様が連絡したのかを完結にまとめてください。それ以外の出力は一切しないでください。\
    また、内容が明確なお問い合わせではない場合や判別がつかない場合はその他と出力してください。作ってもらったデータはお問い合せをカテゴライズする時に使います。"

    try:
        # Generate content using the model
        responses = model.generate_content(prompt, stream=False)
        category = responses.candidates[0].content.parts[0].text.strip()
        print(f"Generated category: {category}")
        return category
    except Exception as e:
        print(f"Error in get_categorize: {e}")
        return "その他"

# Processing JSONL with categories
def process_jsonl_with_categories():
    input_file = '/content/corpus_data.jsonl'  # @param {type:"string"}
    output_file = '/content/output_with_categories_corpus_data.jsonl'  # Output file

    try:
        with open(input_file, 'r', encoding='utf-8') as f_in, \
             open(output_file, 'w', encoding='utf-8') as f_out:

            for line in f_in:
                try:
                    # Parse JSON from the input file
                    data = json.loads(line.strip())
                    question = data.get('question')

                    if not question:
                        print("Skipping line with no question.")
                        continue

                    # Categorize the question
                    category = get_categorize(question)
                    data['category'] = category

                    # Write updated JSON to the output file
                    f_out.write(json.dumps(data, ensure_ascii=False) + '\n')
                except Exception as e:
                    print(f"Error processing line: {e}")
    except FileNotFoundError as fnf_error:
        print(f"File not found: {fnf_error}")
    except Exception as e:
        print(f"Error in process_jsonl_with_categories: {e}")

# Run the script
process_jsonl_with_categories()


Vertex AI initialized successfully.
Model loaded successfully.
Generated category: お問い合わせ
Generated category: その他
Generated category: イベント時間について問い合わせ。16時以降のみ参加可能か確認希望。
Generated category: ダウンロードに関する問い合わせと推測されます。
Generated category: 週6-1の課題提出
Generated category: Figmaイベントについて、種類や再度参加する可能性についての問い合わせです。
Generated category: 受講キャンセル・解約
Generated category: 「課題③ができました」
目的：課題の提出、完了報告
カテゴリー：課題提出
Generated category: バナー修正のフィードバック依頼
Generated category: 体調不良のため、制作ができませんでした。
Generated category: 課題提出
Generated category: ## 3-3を提出、次の進捗は不明

## お問い合わせ内容：提出完了報告、進捗確認
Generated category: 「連絡しました！」とのことで、お問い合わせ内容の詳細は不明です。その他に分類されます。
Generated category: 課題提出 完了・報告
Generated category: お客様は料金についてカスタマーサポートセンターに連絡しました。
Generated category: HerTech受講に関する相談: 平日の残業が多く、計画的に受講できるか心配。
Generated category: 2-3の修正後、明日から再開します。 (お客様からの連絡内容要約)
Generated category: その他
Generated category: 課題2-4の確認依頼
Error in get_categorize: list index out of range
Generated category: クレジットカードに関する再試行の要請
Generated category: オリジナルバナー作成時の商品画像について問

# **2. Create a Search Data Store**
A search data store is required to manage the categorized corpus data.

**2.1 Create Data Store**

*   Create a new data store with a unique identifier (data_store_id).
*   Import the categorized corpus (categorized_corpus.jsonl) from a local path into the data store.
*   displayName need to be change to differentiate data name from the other data store id

In [None]:
import requests
import subprocess
import json

# Replace with your actual project ID
PROJECT_ID = "monobrain-development" # @param {type:"string"}
DATA_STORE_ID = "indah_data_1" # @param {type:"string"}
DISPLAY_NAME = "sample_module4" # @param {type:"string"}

# Get an access token
access_token = subprocess.run(
    ["gcloud", "auth", "print-access-token"],
    capture_output=True,
    text=True,
).stdout.strip()

# Define the API endpoint
url = f"https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/global/collections/default_collection/dataStores?dataStoreId={DATA_STORE_ID}"

# Define the request payload
payload = {
    "displayName": DISPLAY_NAME ,
    "industryVertical": "GENERIC",
    "solutionTypes": ["SOLUTION_TYPE_SEARCH"]
}

# Define the headers
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json",
    "X-Goog-User-Project": PROJECT_ID,
}

# Make the POST request
response = requests.post(url, headers=headers, data=json.dumps(payload))

# Print the response
if response.status_code == 200:
    print("Data store created successfully!")
    print(response.json())
else:
    print("Failed to create data store.")
    print(f"Status Code: {response.status_code}")
    print(response.text)


Data store created successfully!
{'name': 'projects/954273464710/locations/global/collections/default_collection/operations/create-data-store-14924246761857000088', 'done': True, 'response': {'@type': 'type.googleapis.com/google.cloud.discoveryengine.v1alpha.DataStore', 'name': 'projects/954273464710/locations/global/collections/default_collection/dataStores/indah_data_1', 'displayName': 'sample_module4', 'industryVertical': 'GENERIC', 'solutionTypes': ['SOLUTION_TYPE_SEARCH'], 'defaultSchemaId': 'default_schema', 'servingConfigDataStore': {}}}


**2.1 import the corpus data with category from local path.**



1.   Ensure the data_store_id is correctly defined. This is the unique identifier for the data store where the corpus will be uploaded.
2.   Ensure the enriched corpus data (categorized_corpus.jsonl) is available on your local path.

In [None]:
import requests
import subprocess
import json

# Replace with your actual project ID and data store ID
PROJECT_ID = "monobrain-development" # @param {type:"string"}
DATA_STORE_ID = "indah_data_1" # @param {type:"string"}
JSONL_FILE_PATH = "/content/output_with_categories_corpus_data.jsonl"  # @param {type:"string"}

# Get an access token using gcloud CLI
access_token = subprocess.run(
    ["gcloud", "auth", "print-access-token"],
    capture_output=True,
    text=True,
).stdout.strip()

# Read JSONL file and upload each document
def upload_documents(jsonl_file_path):
    with open(jsonl_file_path, "r", encoding="utf-8") as file:
        for line in file:
            document = json.loads(line.strip())  # Read each line as JSON
            document_id = document["id"]  # Extract document ID from JSON
            url = f"https://discoveryengine.googleapis.com/v1beta/projects/{PROJECT_ID}/locations/global/collections/default_collection/dataStores/{DATA_STORE_ID}/branches/0/documents?documentId={document_id}"

            # Prepare the payload using the original document structure
            document_data = {
                "structData": document
            }

            # Define the request headers
            headers = {
                "Authorization": f"Bearer {access_token}",
                "Content-Type": "application/json",
            }

            # Make the POST request to upload the document
            response = requests.post(url, headers=headers, data=json.dumps(document_data))

            # Print the response for each document
            if response.status_code == 200:
                print(f"Document {document_id} uploaded successfully!")
                print(response.json())
            else:
                print(f"Failed to upload document {document_id}.")
                print(f"Status Code: {response.status_code}")
                print(response.text)

# Call the function to upload documents
upload_documents(JSONL_FILE_PATH)


Document qa_1484 uploaded successfully!
{'name': 'projects/954273464710/locations/global/collections/default_collection/dataStores/indah_data_1/branches/0/documents/qa_1484', 'id': 'qa_1484', 'schemaId': 'default_schema', 'structData': {'question': '井関様1-3の実践課題、オススメバナー初級をトレースして提出しました♪', 'id': 'qa_1484', 'answer': '取り組み報告ありがとうございます！今日もしっかり進められていて素晴らしいです👏バナートレースはやればやるほどスキルアップできるので、紹介しているもの以外でもご自身の好きなテーマなどでぜひトレース挑戦してみてください✨今日も1日お疲れ様でした^^', 'category': 'お問い合わせ'}, 'parentDocumentId': 'qa_1484'}
Document qa_1459 uploaded successfully!
{'name': 'projects/954273464710/locations/global/collections/default_collection/dataStores/indah_data_1/branches/0/documents/qa_1459', 'id': 'qa_1459', 'schemaId': 'default_schema', 'structData': {'category': 'その他', 'id': 'qa_1459', 'answer': '【🎉任意課題「模擬案件に応募してみよう」を追加しました🎉】こんにちは！HerTech運営事務局です😌マイページ\u30004-2：案件の取り方ページに任意課題「模擬案件に応募してみよう」を追加しました！▼内容は？本物のクラウドソーシングサービスそっくり！の架空案件で、クライアントとのやりとりや納品までの流れを体験できます✨▼こんな人におすすめ・実際の案件に挑戦する前に練習をしてみたい・実際の案件の流れを体験してみたい・今すぐに副業はできな

# **3. Create Agent Builder**
The Agent Builder serves as the interface between the search system and the user.

**3.1 Create the Agent**

*   Create an agent (engine) linked to the data store using a **unique engine_id.**
*   Ensure the search system is configured with Advanced LLM Features enabled for better relevance matching.
*   Ensure the **data store id**
*   **display_name** need to be change

**3.2 Wait for Initialization**

Allow 5-10 minutes for the agent and data store to initialize fully before proceeding to training.

In [None]:
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine_v1 as discoveryengine

# Configuration
PROJECT_ID = "monobrain-development" # @param {type:"string"}
LOCATION = "global" # @param {type:"string"}
ENGINE_ID = "indah_app_1" # @param {type:"string"}
DATA_STORE_IDS = "indah_data_1" # @param {type:"string"}
DISPLAY_APP_NAME = "Sample1" # @param {type:"string"}

def create_engine_with_enterprise():
    client_options = ClientOptions(api_endpoint=f"{LOCATION}-discoveryengine.googleapis.com")
    client = discoveryengine.EngineServiceClient(client_options=client_options)

    parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection"

    # Creating the Engine
    engine = discoveryengine.Engine(
        display_name= DISPLAY_APP_NAME,
        industry_vertical=discoveryengine.IndustryVertical.GENERIC,
        solution_type=discoveryengine.SolutionType.SOLUTION_TYPE_SEARCH,
        search_engine_config=discoveryengine.Engine.SearchEngineConfig(
            search_tier=discoveryengine.SearchTier.SEARCH_TIER_ENTERPRISE,
            search_add_ons=[discoveryengine.SearchAddOn.SEARCH_ADD_ON_LLM],  # Advanced LLM Features
        ),
        data_store_ids=DATA_STORE_IDS,
    )

    request = discoveryengine.CreateEngineRequest(
        parent=parent,
        engine=engine,
        engine_id=ENGINE_ID,
    )

    operation = client.create_engine(request=request)
    print(f"Creating engine: {operation.operation.name}")
    response = operation.result()
    print("Engine created successfully:", response)

    # Enable Enterprise Features
    enable_enterprise_features(PROJECT_ID, LOCATION, ENGINE_ID)

def enable_enterprise_features(project_id, location, engine_id):
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine.EngineServiceClient(client_options=client_options)

    engine_name = f"projects/{project_id}/locations/{location}/collections/default_collection/engines/{engine_id}"

    # Attempting to patch the engine with Enterprise features enabled
    engine_patch = discoveryengine.Engine(
        name=engine_name,
        search_engine_config=discoveryengine.Engine.SearchEngineConfig(
            search_tier=discoveryengine.SearchTier.SEARCH_TIER_ENTERPRISE,
            search_add_ons=[discoveryengine.SearchAddOn.SEARCH_ADD_ON_LLM]
        )
    )

    update_mask = {"paths": ["search_engine_config.search_tier"]}

    request = discoveryengine.UpdateEngineRequest(
        engine=engine_patch,
        update_mask=update_mask,
    )

    try:
        response = client.update_engine(request=request)
        print("Updated engine to enable Enterprise Edition successfully:", response)
    except Exception as e:
        print("Failed to enable Enterprise Edition:", e)


if __name__ == "__main__":
    create_engine_with_enterprise()


Creating engine: projects/954273464710/locations/global/collections/default_collection/operations/create-engine-15774122029334750032
Engine created successfully: search_engine_config {
  search_tier: SEARCH_TIER_ENTERPRISE
  search_add_ons: SEARCH_ADD_ON_LLM
}
name: "projects/954273464710/locations/global/collections/default_collection/engines/indah_app_1"
display_name: "Sample1"
data_store_ids: "indah_data_1"
solution_type: SOLUTION_TYPE_SEARCH
industry_vertical: GENERIC

Updated engine to enable Enterprise Edition successfully: search_engine_config {
  search_tier: SEARCH_TIER_ENTERPRISE
  search_add_ons: SEARCH_ADD_ON_LLM
}
name: "projects/954273464710/locations/global/collections/default_collection/engines/indah_app_1"
display_name: "Sample1"
data_store_ids: "indah_data_1"
solution_type: SOLUTION_TYPE_SEARCH
industry_vertical: GENERIC



# **4. Generate Training Dataset**
This step prepares the training data by pairing queries with the most relevant corpus entries.

**4.1 Load Query Data:**

Ensure the query data is uploaded to the specified location in your environment, such as a local file path.

**4.2 Set Engine ID:**

The engine_id uniquely identifies the search engine created in the earlier steps.


**4.3 Match Query with Relevant Corpus Entries**

*   For each query, evaluate similarity with the top 5 corpus entries.
*   Use a similarity model to calculate scores for each pairing.
*   Select the corpus entry with the highest similarity score for each query.

**4.4 Generate Training Data**

Save the query-corpus pairs and their similarity scores in a TSV file (train_data.tsv). The file includes the following columns:
*   query_id
*   corpus_id
*   similarity_score

In [None]:
import json
import pandas as pd
import time
from google.cloud import discoveryengine_v1
from google.api_core.client_options import ClientOptions
from vertexai.generative_models import GenerativeModel


PROJECT_ID = "monobrain-development" # @param {type:"string"}
LOCATION = "global" # @param {type:"string"}
ENGINE_ID = "search_sample_1" # @param {type:"string"}
OUTPUT_FILE = "train_data.tsv"  # @param {type:"string"}
QUERY_DATA_PATH = "/content/query_data.jsonl"  # @param {type:"string"}

def search_relevant_docs(client, serving_config, query_text, top_k=5):
    """
    Fetch top-k most relevant documents for a given query using Discovery Engine.
    """
    request = discoveryengine_v1.SearchRequest(
        serving_config=serving_config,
        query=query_text,
        page_size=top_k,
        offset=0
    )
    try:
        response = client.search(request)
        docs = []
        for result in response.results:
            doc_id = result.document.struct_data.get("id", "N/A")
            docs.append({
                "doc_id": doc_id,
                "text": result.document.struct_data.get("question", "No question available"),
            })
        return docs
    except Exception as e:
        print(f"Error in search: {e}")
        return []

def similarity_assessment_model(query_text, doc_text, model):
    """
    Evaluate semantic similarity between query and document using Gemini.
    Args:
        query_text: Input query text
        doc_text: Document text to compare against
        model: Gemini model instance
    Returns:
        float: Similarity score (0-5)
    """
    generation_config = {
        "max_output_tokens": 8192,
        "temperature": 1,
        "top_p": 0.95,
        "response_mime_type": "application/json"
    }
    # Prompt engineering for semantic similarity assessment
    prompt = f"""Evaluate the semantic similarity between two Japanese texts on a 5-point scale:
Scoring Criteria:
5: Highly similar in content, details, and expression.
4: Similar overall, with minor differences in details or tone.
3: Moderately similar; same general topic but different specifics.
2: Slightly similar; some overlap in vocabulary or ideas.
1: Barely similar; weak connection in style or context.
0: No similarity in content, expression, or context.
Provide the output in the following JSON format:
{{
    "similarity": <score>
}}
Text 1: {query_text}
Text 2: {doc_text}"""
    try:
        response = model.generate_content(
            prompt,
            generation_config=generation_config,
        )
        response_data = json.loads(response.text)
        return float(response_data.get("similarity", 0.0))
    except Exception as e:
        print(f"Error in similarity assessment: {e}")
        return 0.0

def create_train_data(project_id, location, engine_id, queries):
    """
    Create training data by finding the most similar document for each query.
    """
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine_v1.SearchServiceClient(client_options=client_options)
    serving_config = f"projects/{project_id}/locations/{location}/collections/default_collection/engines/{engine_id}/servingConfigs/default_config"
    model = GenerativeModel("gemini-1.5-flash-001")
    train_data = []

    for query in queries:
        query_id = query["id"]
        query_text = query["question"]

        # Fetch top-5 relevant documents
        relevant_docs = search_relevant_docs(client, serving_config, query_text)
        if not relevant_docs:
            print(f"No documents found for query {query_id}")
            continue

        # Find the most similar document
        best_score = 0
        best_doc_id = None
        for doc in relevant_docs:
            similarity_score = similarity_assessment_model(query_text, doc["text"], model)
            if similarity_score > best_score:
                best_score = similarity_score
                best_doc_id = doc["doc_id"]
            time.sleep(1)  # Rate limiting

        # Save the best match
        if best_doc_id:
            train_data.append({
                "query-id": query_id,
                "corpus-id": best_doc_id,
                "score": round(best_score, 2),
            })

    return train_data

def main():
    """Main execution function."""
    # Load query data
    with open(QUERY_DATA_PATH , "r", encoding="utf-8") as f:
        queries = [json.loads(line.strip()) for line in f]

    # Create training data
    train_data = create_train_data(
        project_id= PROJECT_ID ,
        location= LOCATION,
        engine_id= ENGINE_ID,
        queries=queries
    )

    # Save training data as TSV
    train_df = pd.DataFrame(train_data)
    train_df.to_csv(OUTPUT_FILE, sep="\t", header=True, index=False)
    print(train_df.head())

if __name__ == "__main__":
    main()


  query-id corpus-id  score
0   qa_790    qa_892    3.0
1  qa_1171    qa_949    2.0
2  qa_1221   qa_1220    3.0
3  qa_1335   qa_1334    4.0
4   qa_002    qa_004    3.0


# **5. Adjust Dataset Fields for Fine Tuning**

Ensure that dataset fields meet the requirements for training.

**5.1 Corpus Data**

Each entry must include _id, title, and text fields.

Example: {"_id": "doc1", "text": "Content of doc"}

In [None]:
import json
import re

# Input JSONL file
input_file = "/content/corpus_data.jsonl" # @param {type:"string"}
output_file = "/content/corpus_data1.jsonl"

# Function to reformat JSONL
def reformat_jsonl(input_path, output_path):
    with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", encoding="utf-8") as outfile:
        for line in infile:
            data = json.loads(line)
            # Extract numeric part of the id field
            id_number = re.search(r"\d+", data["id"]).group()
            new_data = {
                "_id": f"doc{id_number}",
                "text": data["question"]
            }
            outfile.write(json.dumps(new_data, ensure_ascii=False) + "\n")

# Call the function
reformat_jsonl(input_file, output_file)


**5.2 Query Data**

Each entry must include _id and text fields.

Example: {"_id": "query1", "text": "Query content"}

In [None]:
import json
import re

# Input JSONL file
input_file = "/content/query_data.jsonl" # @param {type:"string"}
output_file = "/content/query_data1.jsonl"

# Function to reformat JSONL
def reformat_jsonl(input_path, output_path):
    with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", encoding="utf-8") as outfile:
        for line in infile:
            data = json.loads(line)
            # Extract numeric part of the id field
            id_number = re.search(r"\d+", data["id"]).group()
            new_data = {
                "_id": f"query{id_number}",
                "text": data["question"]
            }
            outfile.write(json.dumps(new_data, ensure_ascii=False) + "\n")

# Call the function
reformat_jsonl(input_file, output_file)


**5.3 Train Data**

Each line in the training data (TSV file) must include:
query_id, corpus_id, and score.

In [None]:
import json
import re
# Input TSV file
input_tsv = "/content/train_data.tsv" # @param {type:"string"}
output_tsv = "/content/train_data1.tsv"

# Function to reformat TSV
def reformat_tsv(input_path, output_path):
    with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", encoding="utf-8") as outfile:
        header = infile.readline()  # Skip the header
        outfile.write("query_id\tcorpus_id\tscore\n")
        for line in infile:
            parts = line.strip().split("\t")
            query_id = parts[0].replace("qa_", "query")
            corpus_id = parts[1].replace("qa_", "doc")
            score = parts[2]
            outfile.write(f"{query-id}\t{corpus-id}\t{score}\n")

# Call the functions
reformat_jsonl(input_file, output_file)
reformat_tsv(input_tsv, output_tsv)

# **6. Fine-Tuning**
Train a custom search-tuning model using the prepared datasets.

**6.1 Configure Training**

Use the prepared paths for corpus_data, query_data, and train_data.
Define the test and validation data paths if needed. The data store id must be changed

**6.2 Initiate Fine-Tuning**

Execute the training process on the Discovery Engine platform.
Monitor progress and ensure the model completes training successfully.

**6.3 Validate the Tuned Model**

Use evaluation metrics to confirm the model’s effectiveness for the intended use case.

In [None]:
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

# Replace these with your actual values
MODEL_TYPE = "search-tuning"

PROJECT_ID = "monobrain-development" # @param {type:"string"}
LOCATION = "global" # @param {type:"string"}
DATA_STORE_ID = "indah_data_1" # @param {type:"string"}
CORPUS_DATA_PATH = "gs://tuning_category/corpus_data1.jsonl" # @param {type:"string"}
QUERY_DATA_PATH = "gs://tuning_category/query_data1.jsonl" # @param {type:"string"}
TRAIN_DATA_PATH = "gs://tuning_category/train_data1.tsv" # @param {type:"string"}
TEST_DATA_PATH = None  # Optional: Auto-split if None

def train_and_monitor_model(
    project_id: str,
    location: str,
    data_store_id: str,
    corpus_data_path: str,
    query_data_path: str,
    train_data_path: str,
    test_data_path: str,
    model_type: str,
):
    """
    Train and monitor a custom model in Google Discovery Engine.

    Args:
        project_id (str): Your GCP project ID.
        location (str): The location of the data store (e.g., "global").
        data_store_id (str): The ID of the data store.
        corpus_data_path (str): Path to the corpus data in GCS.
        query_data_path (str): Path to the query data in GCS.
        train_data_path (str): Path to the training data in GCS.
        test_data_path (str): Path to the test data in GCS.
        model_type (str): The type of model (e.g., "search-tuning").

    Returns:
        None
    """
    # Initialize the client
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine.SearchTuningServiceClient(client_options=client_options)

    # Define the full data store path
    data_store = f"projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{data_store_id}"

    # Create the training request
    print("Starting the training operation...")
    request = discoveryengine.TrainCustomModelRequest(
        gcs_training_input=discoveryengine.TrainCustomModelRequest.GcsTrainingInput(
            corpus_data_path=corpus_data_path,
            query_data_path=query_data_path,
            train_data_path=train_data_path,
            test_data_path=test_data_path,
        ),
        data_store=data_store,
        model_type=model_type,
    )

    # Start the training operation
    operation = client.train_custom_model(request=request)
    operation_name = operation.operation.name
    print(f"Training operation started: {operation_name}")

    # Monitor the training operation
    print("Monitoring the training operation...")
    monitor_training_operation(location, operation_name)


def monitor_training_operation(location: str, operation_name: str):
    """
    Monitor the training operation until it completes.

    Args:
        location (str): The location of the data store (e.g., "global").
        operation_name (str): The full name of the operation to monitor.

    Returns:
        None
    """
    # Initialize the client
    client_options = ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    client = discoveryengine.SearchTuningServiceClient(client_options=client_options)

    # Poll the operation
    print(f"Checking operation status for: {operation_name}")
    while True:
        operation = client.transport.operations_client.get_operation(operation_name)

        if operation.done:
            if operation.response:
                print("Training completed successfully!")
                print(operation.response)
            else:
                print("Operation finished with errors.")
                print(operation.error)
            break
        else:
            print("Operation is still in progress. Checking again in 60 seconds...")
            import time
            time.sleep(60)  # Wait for 60 seconds before polling again


# Run the training and monitoring process
if __name__ == "__main__":
    train_and_monitor_model(
        project_id=PROJECT_ID,
        location=LOCATION,
        data_store_id=DATA_STORE_ID,
        corpus_data_path=CORPUS_DATA_PATH,
        query_data_path=QUERY_DATA_PATH,
        train_data_path=TRAIN_DATA_PATH,
        test_data_path=TEST_DATA_PATH,
        model_type=MODEL_TYPE,
    )


Starting the training operation...
Training operation started: projects/954273464710/locations/global/collections/default_collection/dataStores/indah_data_1/operations/train-custom-model-14408795834615834426
Monitoring the training operation...
Checking operation status for: projects/954273464710/locations/global/collections/default_collection/dataStores/indah_data_1/operations/train-custom-model-14408795834615834426
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Operation is still in progress. Checking again in 60 seconds...
Ope