# Gmail Data Source Integration with Amazon Q Business

## What You Will Learn

This notebook demonstrates how to programmatically integrate Gmail as a data source with Amazon Q Business using Infrastructure as Code (IaC) principles. By following this tutorial, you will gain hands-on experience with:

- **AWS SDK Integration**: Using boto3 to interact with multiple AWS services programmatically
- **Secrets Management**: Securely storing Gmail service account credentials in AWS Secrets Manager
- **IAM Role Creation**: Setting up proper permissions for Amazon Q Business to access Gmail data
- **Amazon Q Business Configuration**: Creating applications, indexes, and data sources via API
- **Gmail API Integration**: Configuring Google Workspace service accounts for data access
- **Error Handling**: Implementing robust error handling for AWS service interactions
- **Resource Management**: Creating and managing AWS resources with proper naming conventions

This approach enables automated deployment and consistent configuration across environments, making it ideal for enterprise implementations where manual setup is not scalable.


## Prerequisites

- **AWS CLI configured with appropriate credentials
- **Required Python packages installed
- ** Appropriate IAM permissions to
    - Create and get Q Business Application, Subscriptions and Indexes 
    - Create IAM roles
    - Create secrets in Secrets Manager
    - Create IAM Identity center instances
- ** Google Workspace admin access for service account setup
- ** Gmail service account with domain-wide delegation enabled

## Begin by setting up the required dependencies and configuration.

In [1]:
import boto3
import json
import botocore
import time
import random
import string
import os
import ssl
import urllib3
from datetime import datetime

## Define variables for region, role, user configuration, application name and secrets

In [3]:
 # Generate a random 5-character suffix
def generate_suffix(length=5):
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


# AWS Region
region = "us-east-1"  # Replace with your desired region

# Role names prefix
gmail_role_name = "QBusiness-gmail-DataSource-"

# Application name prefix
instance_name = "my-q-business-application-"

# Gmail Secret prefix
gmail_secret_name = "QBusiness-gmail-"

# Add random suffix to names
suffix = generate_suffix()
gmail_role_name = f"{gmail_role_name}{suffix}"
instance_name = f"{instance_name}{suffix}"
gmail_secret_name = f"{gmail_secret_name}{suffix}"

# User configuration
email = "johndoe@example.com"
first_name = "John"
family_name = "Doe"

## Display Configuration:

In [None]:
print(f"Using gmail role name: {gmail_role_name}")
print(f"Using instance name: {instance_name}")
print(f"Using secret name: {gmail_secret_name}")

## Define the Gmail Credentials

In [None]:
# Gmail configuration

# Service account email - identifies the service account in Google Cloud
client_email = "your client email"

# Admin account email - the Google Workspace admin account that the service account will impersonate
# This account must have the necessary permissions to access Gmail data across the organization
admin_account_email = "your admin account email"
# Private key for service account authentication
# This is the RSA private key from the service account JSON file
private_key = "your private key" 


## SSL Configuration for secure AWS API communication

In [None]:
# Disable SSL warnings and verification
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
os.environ["PYTHONHTTPSVERIFY"] = "0"
ssl._create_default_https_context = ssl._create_unverified_context

# Check if AWS credentials are properly configured
try:
    # Try to get caller identity (lightweight AWS API call)
    sts = boto3.client("sts", region_name=region, verify=False)
    identity = sts.get_caller_identity()

    print("✅ AWS credentials are properly configured!")
    print(f"Account ID: {identity['Account']}")
    print(f"User ID: {identity['UserId']}")
    print(f"ARN: {identity['Arn']}")
except Exception as e:
    print("❌ AWS credentials are not properly configured!")
    print(f"Error: {str(e)}")

## Create a secret in AWS Secrets Manager to store the service account credentials

In [None]:
# Create client with SSL verification disabled
# Make sure the IAM role related to the Sagamaker instance has permissions to create a secret
secrets_client = boto3.client("secretsmanager", region_name=region, verify=False)

gmail_credentials = {
    "clientEmailId": client_email,  # Updated variable name
    "adminAccountEmailId": admin_account_email,  # Updated variable name
    "privateKey": private_key,  # Updated variable name
}

try:
    response = secrets_client.create_secret(
        Name=gmail_secret_name, SecretString=json.dumps(gmail_credentials)
    )
    print(f"Secret created: {response['ARN']}")
except secrets_client.exceptions.ResourceExistsException:
    print(f"Secret '{gmail_secret_name}' already exists")

## Create Service-linked role for Q Business if it doesn't exist

In [None]:
# Refresh session to ensure credentials are valid
session = boto3.Session(region_name=region)
iam = session.client("iam", region_name=region, verify=False)

# Use the managed service-linked role for Amazon Q Business application
app_role_arn = (
    f"arn:aws:iam::{identity['Account']}:"
    f"role/aws-service-role/qbusiness.amazonaws.com/AWSServiceRoleForQBusiness"
)

# Check if the service-linked role exists, create it if it doesn't
try:
    iam.get_role(RoleName="AWSServiceRoleForQBusiness")
    print("✅ Service-linked role for Q Business already exists")
except iam.exceptions.NoSuchEntityException:
    print("Creating service-linked role for Q Business...")
    iam.create_service_linked_role(AWSServiceName="qbusiness.amazonaws.com")
    print("✅ Service-linked role for Q Business created")

print(f"Using service-linked role for application: {app_role_arn}")

## Create an IAM role for Gmail data source

In [16]:
# Create a separate role for the Gmail datasource with specific permissions
gmail_trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "qbusiness.amazonaws.com"},
            "Action": "sts:AssumeRole",
        }
    ],
}

## Create an IAM Policy to attach to the role created

In [14]:
# Define custom policy for the Gmail datasource
gmail_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowsAmazonQToGetS3Objects",
            "Action": ["s3:GetObject"],
            "Resource": ["arn:aws:s3:::*/*"],
            "Effect": "Allow",
            "Condition": {"StringEquals": {"aws:ResourceAccount": identity["Account"]}},
        },
        {
            "Sid": "AllowsAmazonQToGetSecret",
            "Effect": "Allow",
            "Action": ["secretsmanager:GetSecretValue"],
            "Resource": [
                f"arn:aws:secretsmanager:{region}:{identity['Account']}:secret:*"
            ],
        },
        {
            "Sid": "AllowsAmazonQToDecryptSecret",
            "Effect": "Allow",
            "Action": ["kms:Decrypt"],
            "Resource": [f"arn:aws:kms:{region}:{identity['Account']}:key/*"],
            "Condition": {
                "StringLike": {"kms:ViaService": ["secretsmanager.*.amazonaws.com"]}
            },
        },
        {
            "Sid": "AllowsAmazonQToIngestDocuments",
            "Effect": "Allow",
            "Action": ["qbusiness:BatchPutDocument", "qbusiness:BatchDeleteDocument"],
            "Resource": "*",
        },
        {
            "Sid": "AllowsAmazonQToIngestPrincipalMapping",
            "Effect": "Allow",
            "Action": [
                "qbusiness:PutGroup",
                "qbusiness:CreateUser",
                "qbusiness:DeleteGroup",
                "qbusiness:UpdateUser",
                "qbusiness:ListGroups",
            ],
            "Resource": "*",
        },
    ],
}

## Create the role with IAM policy attached

In [None]:
try:
    # Create the gmail role
    gmail_role_response = iam.create_role(
        RoleName=gmail_role_name,
        AssumeRolePolicyDocument=json.dumps(gmail_trust_policy),
        Description="Role for Amazon Q Business Gmail",
    )
    print("Gmail role created successfully.")

    # Create and attach the custom inline policy
    iam.put_role_policy(
        RoleName=gmail_role_name,
        PolicyName="QBusinessGmailPermissions",
        PolicyDocument=json.dumps(gmail_policy_document),
    )
    print("Custom policy attached to Gmail role.")

    # Get the Gmail role ARN
    gmail_role_arn = gmail_role_response["Role"]["Arn"]
    print(f"Gmail Role ARN: {gmail_role_arn}")

except iam.exceptions.EntityAlreadyExistsException:
    print(f"Role '{gmail_role_name}' already exists.")
    gmail_role_response = iam.get_role(RoleName=gmail_role_name)
    gmail_role_arn = gmail_role_response["Role"]["Arn"]
    print(f"Gmail Role ARN: {gmail_role_arn}")

## Regions to check for IAM Identity Center

In [19]:
 # Define regions to check for IAM Identity Center
regions_to_check = [
    "us-east-1",
    "us-east-2",
    "us-west-1",
    "us-west-2",
    "eu-west-1",
    "eu-central-1",
    "ap-northeast-1",
    "ap-southeast-1",
]

## Create an IAM Identity Center instance if it doesn't exist already

In [20]:
# Find IAM Identity Center instance across all regions
def find_idc_instance():
    # List of AWS regions to check
    print("Searching for IAM Identity Center instances across regions...")
    # Check each region for an Identity Center instance
    for check_region in regions_to_check:
        try:
            # Create SSO admin client for this region
            sso_admin = boto3.client(
                "sso-admin", region_name=check_region, verify=False
            )

            # List existing Identity Center instances
            response = sso_admin.list_instances()

            # Check if any instances exist in this region
            if response["Instances"]:
                instance_arn = response["Instances"][0]["InstanceArn"]
                instance_region = check_region
                print(
                    f"✅ Found IAM Identity Center instance in {check_region}: "
                    f"{instance_arn}"
                )
                return {
                    "identityType": "AWS_IAM_IDC",
                    "identityCenterInstanceArn": instance_arn,
                    "region": instance_region,
                }
        except Exception as e:
            # Continue to next region if there's an error
            continue

    # If no instance found, return None
    return None

In [None]:
# Try to find existing IDC instance
idc_config = find_idc_instance()

# If no instance found, try to create one
if not idc_config:
    print(
        "No existing IAM Identity Center instances found. Attempting to "
        "create one..."
    )

    try:
        # Create Organizations client to check if organization exists
        org_client = boto3.client("organizations")
        org_client.describe_organization()

        # Create SSO admin client in current region
        sso_admin = boto3.client("sso-admin", region_name=region, verify=False)

        # Create Identity Center instance
        response = sso_admin.create_instance()
        print(f"✅ Created new IAM Identity Center instance in {region}")

        # Wait for instance to be available
        print("Waiting for instance to be available...")
        time.sleep(10)

        # Get the ARN of the new instance
        list_response = sso_admin.list_instances()
        if list_response["Instances"]:
            instance_arn = list_response["Instances"][0]["InstanceArn"]
            print(f"New instance ARN: {instance_arn}")
            idc_config = {
                "identityType": "AWS_IAM_IDC",
                "identityCenterInstanceArn": instance_arn,
                "region": region,
            }
    except Exception as e:
        print(f"Error creating IAM Identity Center instance: {str(e)}")

if not idc_config:
    print("❌ Could not find or create an IAM Identity Center instance")

## Create a user in IAM Identity center

In [25]:
def create_idc_user(username, given_name, family_name):
    try:
        # Variables to store instance information
        instance_arn = None
        identity_store_id = None
        idc_region = None

        # Check each region for an Identity Center instance
        for check_region in regions_to_check:
            try:
                # Create SSO admin client for this region
                sso_admin = boto3.client(
                    "sso-admin", region_name=check_region, verify=False
                )

                # List existing Identity Center instances
                response = sso_admin.list_instances()

                # Check if any instances exist in this region
                if response.get("Instances"):
                    instance = response["Instances"][0]
                    instance_arn = instance["InstanceArn"]

                    # Get the Identity Store ID
                    instance_response = sso_admin.describe_instance(
                        InstanceArn=instance_arn
                    )
                    identity_store_id = instance_response["IdentityStoreId"]
                    idc_region = check_region
                    break
            except Exception:
                continue

        if not instance_arn or not identity_store_id:
            print("No IAM Identity Center instances found in any region")
            return None

        return instance_arn, identity_store_id, idc_region
    except Exception as e:
        print(f"Error finding Identity Center: {str(e)}")
        return None, None, None

In [None]:
# Get Identity Center information
instance_arn, identity_store_id, idc_region = create_idc_user(
    email, first_name, family_name
)

if identity_store_id:
    try:
        # Create an Identity Store client in the correct region
        identity_store = boto3.client(
            "identitystore", region_name=idc_region, verify=False
        )

        # Create a new user with the provided username, given name, and family name
        user_response = identity_store.create_user(
            IdentityStoreId=identity_store_id,
            UserName=email,
            Name={"GivenName": first_name, "FamilyName": family_name},
            DisplayName=f"{first_name} {family_name}",
            Emails=[
                {
                    "Value": email if "@" in email else f"{email}@example.com",
                    "Type": "Work",
                    "Primary": True,
                }
            ],
        )

        user_id = user_response["UserId"]
        print(
            f"\nUser '{first_name} {family_name}' created successfully. id: "
            f"{user_id}, username: {email}"
        )
        print(
            "\nNote: For security reasons, "
            "password must be set manually through the AWS Console."
        )
    except Exception as e:
        print(f"Error creating user in IAM Identity Center: {str(e)}")
        user_id = None
else:
    print("Cannot create user without Identity Store ID")
    user_id = None

## Create Amazon Q Business application

In [None]:
# Initialize the Q Business client with region
q_business = boto3.client("qbusiness", region_name=region, verify=False)

try:
    if idc_config:
        # Create the Q Business instance with IAM Identity Center
        create_params = {
            "displayName": instance_name,
            "roleArn": app_role_arn,
            "identityType": idc_config["identityType"],
            "identityCenterInstanceArn": idc_config["identityCenterInstanceArn"],
        }

        print(
            f"Creating Q Business application with IAM Identity Center from "
            f"{idc_config['region']}"
        )
        response = q_business.create_application(**create_params)

        print("Q Business instance creation initiated successfully!")
        print("\nResponse: ")
        print(json.dumps(response, indent=2))

        # Store the application ID for future reference
        application_id = response["applicationId"]
    else:
        print("Cannot create Q Business application without IAM Identity " "Center")

except Exception as e:
    print(f"Error creating Q Business instance: {str(e)}")

## Check application status

In [34]:
def check_instance_status(application_id):
    try:
        response = q_business.get_application(applicationId=application_id)
        status = response["status"]
        print(f"Current status: {status}")
        return status
    except Exception as e:
        print(f"Error checking status: {str(e)}")
        return None

In [None]:
# Check status periodically until the instance is ready
max_attempts = 30
attempt = 0

while attempt < max_attempts:
    status = check_instance_status(application_id)
    if status == "ACTIVE":
        print("\nQ Business instance is now active!")
        break
    elif status == "FAILED":
        print("\nQ Business instance creation failed!")
        break

    print(
        f"Waiting for instance to become active... (Attempt {attempt + 1}/{max_attempts})"
    )
    time.sleep(60)  # Wait for 60 seconds before checking again
    attempt += 1

## Add the IAM Identity user to the application with Pro subscription

In [41]:
def add_user_with_pro_subscription(application_id, user_id):
    """
    Add a user to the Q Business instance with a Pro subscription.

    Parameters:
    - application_id: The ID of the Q Business application
    - user_id: The user ID from IAM Identity Center
    """
    try:
        # Initialize the Q Business client
        q_business = boto3.client("qbusiness", region_name=region, verify=False)

        # Check if user_id exists before creating subscription
        if user_id:
            try:
                subscription_response = q_business.create_subscription(
                    applicationId=application_id,
                    principal={"user": user_id},
                    type="Q_BUSINESS",
                )
                print(
                    f"User {user_id} added with subscription "
                    f"{subscription_response['subscriptionId']}"
                )
                return subscription_response
            except Exception as e:
                print(f"Error creating subscription: {str(e)}")
                return None
        else:
            print("User creation failed - user_id is None")
            return None
    except Exception as e:
        print(f"❌ Error adding user to Q Business: {str(e)}")
        return None

In [None]:
# Add the user with Pro subscription
subscription_response = add_user_with_pro_subscription(application_id, user_id)

if subscription_response:
    print(f"✅ User with ID {user_id} added to Q Business with Pro subscription!")
    print(f"Q Business User ID: {user_id}")
    print(f"Subscription ID: {subscription_response['subscriptionId']}")
    print(json.dumps(subscription_response, indent=5))

## Print application id, useris and subscription id

In [None]:
print(f"application_id: {application_id}")
print(f"user {user_id}")

print("list subscriptions: ")
print(json.dumps(q_business.list_subscriptions(applicationId=application_id), indent=5))
sub_id = q_business.list_subscriptions(applicationId=application_id)["subscriptions"][
    0
]["subscriptionId"]
print(f"subscription_id: {sub_id}")

## Create Amazon Q Business index

In [None]:
try:
    # Create the index with displayName and capacity configuration
    index_response = q_business.create_index(
        applicationId=application_id,
        displayName="gmail-index",
        description="Index for gmail data source",
        type="ENTERPRISE",
        capacityConfiguration={
            "units": 1  # Specify the number of capacity units (1-10)
        },
    )

    print("Index creation initiated successfully!")
    print("\nResponse: ")
    print(json.dumps(index_response, indent=2))

    # Store the index ID for future reference
    index_id = index_response["indexId"]

except Exception as e:
    print(f"Error creating index: {str(e)}")

## Check Index status

In [69]:
def check_index_status(application_id, index_id):
    try:
        response = q_business.get_index(applicationId=application_id, indexId=index_id)
        status = response["status"]
        print(f"Current status: {status}")
        return status
    except Exception as e:
        print(f"Error checking status: {str(e)}")
        return None

In [None]:
#Check status periodically until the index is ready
max_attempts = 30
attempt = 0

while attempt < max_attempts:
    status = check_index_status(application_id, index_id)
    if status == "ACTIVE":
        print("\nIndex is now active!")
        break
    elif status == "FAILED":
        print("\nIndex creation failed!")
        break

    print(
        f"Waiting for index to become active... (Attempt {attempt + 1}/{max_attempts})"
    )
    time.sleep(60)  # Wait for 60 seconds before checking again
    attempt += 1

## Create Amazon Q Business retriever 

In [None]:
try:
    # Create a retriever with correct configuration
    retriever_response = q_business.create_retriever(
        applicationId=application_id,
        displayName="gmail-retriever1",
        type="NATIVE_INDEX",
        configuration={
            "nativeIndexConfiguration": {
                "indexId": index_id,
            }
        },
        roleArn=app_role_arn,
    )

    print("Retriever creation initiated successfully!")
    print("\nResponse: ")
    print(json.dumps(retriever_response, indent=2))

    # Store the retriever ID for future reference
    retriever_id = retriever_response["retrieverId"]

except Exception as e:
    print(f"Error creating retriever: {str(e)}")

## Configure Gmail Data source

In [100]:
gmail_config = {
    "version": "1.0.0",
    "syncMode": "FORCED_FULL_CRAWL",
    "secretArn": f"arn:aws:secretsmanager:{region}:{identity['Account']}:secret:{gmail_secret_name}",
    "connectionConfiguration": {
        "repositoryEndpointMetadata": {
            "type": "GMAIL"
        }
    },
    "repositoryConfigurations": {
        "message": {
            "fieldMappings": [
                {
                    "indexFieldName": "subject",
                    "indexFieldType": "STRING",
                    "dataSourceFieldName": "subject"
                },
                {
                    "indexFieldName": "sentDate",
                    "indexFieldType": "DATE",
                    "dataSourceFieldName": "sentDate",
                    "dateFieldFormat": "yyyy-MM-dd'T'HH:mm:ss'Z'"
                },
                {
                    "indexFieldName": "fromAddress",
                    "indexFieldType": "STRING",
                    "dataSourceFieldName": "fromAddress"
                },
                {
                    "indexFieldName": "body",
                    "indexFieldType": "STRING",
                    "dataSourceFieldName": "body"
                }
            ]
        }
    },
    "additionalProperties": {
        "isCrawlAcl": True,
        "fieldForUserId": "fromAddress",
        "inclusionLabelNamePatterns": [],
        "exclusionLabelNamePatterns": [],
        "inclusionAttachmentTypePatterns": [],
        "exclusionAttachmentTypePatterns": [],
        "inclusionAttachmentNamePatterns": [],
        "exclusionAttachmentNamePatterns": [],
        "inclusionSubjectFilter": [],
        "exclusionSubjectFilter": [],
        "isSubjectAnd": True,
        "inclusionFromFilter": [],
        "exclusionFromFilter": [],
        "inclusionToFilter": [],
        "exclusionToFilter": [],
        "inclusionCcFilter": [],
        "exclusionCcFilter": [],
        "inclusionBccFilter": [],
        "exclusionBccFilter": [],
        "beforeDateFilter": "",
        "afterDateFilter": "",
        "isCrawlAttachment": True,
        "shouldCrawlDraftMessages": True,
        "maxFileSizeInMegaBytes": "50"
    },
    "type": "GMAIL",
}

## Create the data source for Gmail Connector

In [None]:
try:
    # Create the data source
    response = q_business.create_data_source(
        applicationId=application_id,
        indexId=index_id,
        displayName="gmail-source",
        configuration=gmail_config,
        roleArn=gmail_role_arn,
    )

    data_source_id = response["dataSourceId"]
    print(f"gmail data source created: {data_source_id}")

except Exception as e:
    print(f"Error: {str(e)}")

gmail data source created: 24b79e2a-f054-4b7a-891c-c00f8e3695d4


## Check data source status

In [102]:
def check_data_source_status(application_id, index_id, data_source_id):
    try:
        response = q_business.get_data_source(
            applicationId=application_id, indexId=index_id, dataSourceId=data_source_id
        )
        status = response["status"]
        print(f"Current status: {status}")
        return status
    except Exception as e:
        print(f"Error checking status: {str(e)}")
        return None

In [None]:
# Check status periodically until the data source is ready
max_attempts = 30
attempt = 0

while attempt < max_attempts:
    status = check_data_source_status(application_id, index_id, data_source_id)
    if status == "ACTIVE":
        print("\nGmail data source is now active!")
        break
    elif status == "FAILED":
        print("\nGmail data source creation failed!")
        break

    print(
        f"Waiting for data source to become active... (Attempt {attempt + 1}/{max_attempts})"
    )
    time.sleep(60)  # Wait for 60 seconds before checking again
    attempt += 1

## Run a sync job on the data source

In [None]:
try:
    # Start a sync job for the data source
    sync_response = q_business.start_data_source_sync_job(
        applicationId=application_id, indexId=index_id, dataSourceId=data_source_id
    )

    if sync_response["ResponseMetadata"]["HTTPStatusCode"] == 200:
        print("Data source sync job initiated successfully!")
        print("\nResponse: ")
        print(json.dumps(sync_response, indent=2))

        # Store the execution ID for future reference
        execution_id = sync_response["executionId"]
    else:
        print(
            f"Error starting sync job. Status code: "
            f"{sync_response['ResponseMetadata']['HTTPStatusCode']}"
        )
        print("\nResponse: ")
        print(json.dumps(sync_response, indent=2))

except Exception as e:
    print(f"Error starting data source sync job: {str(e)}")

## Check sync job status

In [107]:
def check_sync_job_status(application_id, index_id, data_source_id):
    try:
        response = q_business.list_data_source_sync_jobs(
            applicationId=application_id, indexId=index_id, dataSourceId=data_source_id
        )

        # Get the most recent sync job from history
        if response["history"]:
            latest_job = response["history"][0]  # Jobs are returned in descending order
            status = latest_job["status"]
            print(f"Current sync job status: {status}")

            # Print metrics if available
            if "metrics" in latest_job:
                print("\nSync metrics: ")
                print(json.dumps(latest_job["metrics"], indent=2))

            # Print error if job failed
            if status == "FAILED" and "error" in latest_job:
                print("\nError details: ")
                print(json.dumps(latest_job["error"], indent=2))

            return status
        else:
            print("No sync jobs found in history")
            return None

    except Exception as e:
        print(f"Error checking sync job status: {str(e)}")
        return None

In [None]:
# Check status periodically until the sync job is complete
max_attempts = 30
attempt = 0

while attempt < max_attempts:
    status = check_sync_job_status(application_id, index_id, data_source_id)
    if status == "SUCCEEDED":
        print("\nSync job completed successfully!")
        break
    elif status in ["FAILED", "ABORTED"]:
        print("\nSync job failed!")
        break
    elif status in ["SYNCING", "SYNCING_INDEXING"]:
        print(
            f"Waiting for sync job to complete... (Attempt {attempt + 1}/{max_attempts})"
        )
        time.sleep(60)  # Wait for 60 seconds before checking again
        attempt += 1
    else:
        print(f"\nUnexpected status: {status}")
        break