Polled Operation

In [1]:
# import functions_framework
import requests
from google.oauth2 import service_account
import google.auth.transport.requests
import time
from google.auth.transport.requests import Request


# Get authentication
def get_access_token():
    # Path to your service account key file
    service_account_file = "ccai-dev-project.json"

    # Define the required scopes
    scopes = ["https://www.googleapis.com/auth/cloud-platform"]

    # Authenticate and obtain the access token
    credentials = service_account.Credentials.from_service_account_file(
        service_account_file, scopes=scopes
    )

    auth_request = google.auth.transport.requests.Request()
    credentials.refresh(auth_request)

    return credentials.token


def get_operation(operation_id: str) -> str:
    # Authenticate and get the access token
    credentials, project_id = google.auth.default()
    credentials.refresh(Request())
    access_token = credentials.token

    operation_id = operation_id
    # Define the request URL
    url = f"https://contactcenterinsights.googleapis.com/v1/projects/ccai-dev-project/locations/us-central1/operations/{operation_id}"

    # Define the headers
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json; charset=utf-8",
    }

    # Send the POST request
    response = requests.post(url, headers=headers, json={})

    # Check the response
    if response.status_code == 200:
        print("Poll request successful")
        operation_name = response.json().get("name")
        if operation_name:
            print(f"Operation Name: {operation_name}")
            return operation_name
        else:
            print("Operation name not found in the response")
            return None
    else:
        print("Poll request failed")
        print(response.status_code)
        print(response.text)
        return None


def poll_operation(operation_name: str, interval: int = 30) -> None:
    while True:
        operation = get_operation(operation_name)
        if operation.done:
            print("Operation is done")
            break
        else:
            print("Operation is in progress")
            time.sleep(interval)


# Analyse conversation
def analyze_conversation(conversation_id: str) -> str:
    # Authenticate and get the access token
    credentials, project_id = google.auth.default()
    credentials.refresh(Request())
    access_token = credentials.token

    conversation_id = conversation_id
    # Define the request URL
    url = f"https://contactcenterinsights.googleapis.com/v1/projects/ccai-dev-project/locations/us-central1/conversations/{conversation_id}/analyses"

    # Define the headers
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json; charset=utf-8",
    }
    # Define the JSON body for the request
    data = {
        "annotatorSelector": {
            "run_interruption_annotator": True,
            "run_silence_annotator": True,
            "run_phrase_matcher_annotator": True,
            "run_sentiment_annotator": True,
            "run_entity_annotator": True,
            "run_intent_annotator": True,
            "run_issue_model_annotator": True,
        }
    }

    # Send the POST request
    response = requests.post(url, headers=headers, json=data)

    # Check the response
    if response.status_code == 200:
        print("Analysis request successful")
        operation_name = response.json().get("name")
        if operation_name:
            print(f"Analysis Operation Name: {operation_name}")
            return operation_name
        else:
            print("Anlysis operation name not found in the response")
            return None
    else:
        print("Analysis request failed")
        print(response.status_code)
        print(response.text)
        return None


def export_conversation() -> str:
    # Authenticate and get the access token
    credentials, project_id = google.auth.default()
    credentials.refresh(Request())
    access_token = credentials.token

    # Define the request URL
    url = "https://contactcenterinsights.googleapis.com/v1/projects/ccai-dev-project/locations/us-central1/insightsdata:export"

    # Define the headers
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json; charset=utf-8",
    }

    # Define the JSON body for the request
    data = {
        "bigQueryDestination": {
            "projectId": "ccai-dev-project",
            "dataset": "ccai_bq_looker",
            "table": "ccai_insights",
        },
        "filter": "latest_analysis:*",
        "writeDisposition": "WRITE_APPEND",
    }

    # Send the POST request
    response = requests.post(url, headers=headers, json=data)

    # Check the response
    if response.status_code == 200:
        print("Export request successful")
        operation_name = response.json().get("name")
        if operation_name:
            print(f"Export Operation Name: {operation_name}")
            return operation_name
        else:
            print("Operation name not found in the response")
            return None
    else:
        print("Export request failed")
        print(response.status_code)
        print(response.text)
        return None


# @functions_framework.cloud_event
def upload_conversation():
    # data = event.data
    # bucket = data["bucket"]
    # name = data["name"]
    # gcs_link = f"gs://{bucket}/{name}"
    # Define the URL
    url = "https://contactcenterinsights.googleapis.com/v1/projects/ccai-dev-project/locations/us-central1/conversations:upload"

    # Define the JSON body
    json_body = {
        "conversation": {
            "qualityMetadata": {
                "agentInfo": [{"agentId": "47", "displayName": "Agent-47"}]
            },
            "dataSource": {
                "gcsSource": {
                    "audioUri": "",
                    "transcriptUri": "gs://sample-insights-conversations/agent-1/chat_1.json",
                },
            },
            "agentId": "47",
            "medium": "CHAT",
            "ttl": "60s",
        },
    }

    # Get the access token
    access_token = get_access_token()

    # Define the headers with the access token
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    # Send the POST request
    response = requests.post(url, json=json_body, headers=headers)
    print(response.json())

    # Check the response
    if response.status_code == 200:
        print("Upload Request successful")
        conversation_meta = response.json()
        conversation_id = conversation_meta["metadata"]["request"]["conversationId"]

        if conversation_id:
            print(f"Conversation ID: {conversation_id}")

            # Perform bulk analysis
            analyze_operation_name = analyze_conversation(conversation_id)
            if analyze_operation_name:
                # Poll the bulk analyze operation
                print(analyze_operation_name)
                poll_operation(analyze_operation_name)

                # Export insights data
                export_operation_name = export_conversation()
                if export_operation_name:
                    # Poll the export operation
                    poll_operation(export_operation_name)
            else:
                print("Operation name not found in the response")
        else:
            print("Operation name not found in the response")
    else:
        print("Upload request failed")
        print(response.status_code)
        print(response.text)


if __name__ == "__main__":
    upload_conversation()

{'name': 'projects/360020353590/locations/us-central1/operations/5335557109856273112', 'metadata': {'@type': 'type.googleapis.com/google.cloud.contactcenterinsights.v1.UploadConversationMetadata', 'createTime': '2024-07-08T07:06:10.555905Z', 'request': {'parent': 'projects/360020353590/locations/us-central1', 'conversationId': '5335557109856272591', 'redactionConfig': {}, 'speechConfig': {}}}}


In [None]:


curl --request POST \
  "https://contactcenterinsights.googleapis.com/v1/projects/ccai-dev-project/locations/us-central1/conversations:upload" \
  --header "Authorization: Bearer $(gcloud auth print-access-token)" \
  --header "Accept: application/json" \
  --header "Content-Type: application/json; charset=utf-8" \
  --data '{
  "conversation": {
    "qualityMetadata": {
      "agentInfo": [{ "agentId": "Monday", "displayName": "Monday" }]
    },
    "dataSource": {
      "gcsSource": {
        "transcriptUri": "gs://transcripts-gcs/conversation_51.json"
      }
    }
  }
}'