<strong>QuickSight Data APIs to build data sources and datasets</strong>

Author: Ying Wang (Data Visualization Engineer in ProServe GSP)
Date: April 16 2020


In [None]:
import boto3
import json
import time
from IPython.display import JSON
from IPython.display import IFrame
def display_json(doc, root='root'):
    return JSON(doc)
#qs = boto3.client('quicksight')
import logging
from typing import Any, Dict, List, Optional

<strong>construct target information <strong/>

In [None]:
def get_target(targetsession, rds,redshift,s3Bucket,s3Key,vpc,tag,targetadmin,rdscredential,redshiftcredential,region='us-east-1', namespace='default',version='1'): 
    sts_client = targetsession.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    target: Dict[str, Any] = {
        "rds": {"rdsinstanceid": ''},
"s3":{"manifestBucket": '',
"manifestkey": ''},
"vpc": '',
"tag": '',
        "credential": {"rdscredential": rdscredential,
                    "redshiftcredential": redshiftcredential},
        "datasourcepermission": [
            {'Principal': targetadmin,
            'Actions': ["quicksight:DescribeDataSource",
                        "quicksight:DescribeDataSourcePermissions",
                        "quicksight:PassDataSource",
                        "quicksight:UpdateDataSource",
                        "quicksight:DeleteDataSource",
                        "quicksight:UpdateDataSourcePermissions"]
            }
        ],
        "datasetpermission": [{'Principal': targetadmin,
            'Actions': ['quicksight:UpdateDataSetPermissions',
   'quicksight:DescribeDataSet',
   'quicksight:DescribeDataSetPermissions',
   'quicksight:PassDataSet',
   'quicksight:DescribeIngestion',
   'quicksight:ListIngestions',
   'quicksight:UpdateDataSet',
   'quicksight:DeleteDataSet',
   'quicksight:CreateIngestion',
   'quicksight:CancelIngestion']}],
        "version": '1',    
    }
    
    if rds:
        target["rds"]["rdsinstanceid"] = rds
    
    if redshift:
        target["redshift"] = redshift
        
    if s3Bucket:
        target["s3"]["manifestBucket"] = s3Bucket
        target["s3"]["manifestkey"]=s3Key
    
    if vpc:
        target["vpc"] = "arn:aws:quicksight:"+region+":"+account_id+":vpcConnection/"+vpc
    
    if tag:
        target["tag"]=tag
    
    if rdscredential:
        target["credential"]["rdscredential"]=rdscredential
        
    if redshiftcredential:
        target["credential"]["redshiftcredential"]=redshiftcredential
    
    return target



<strong>get object ids from name, or get object name from id <strong/>

In [None]:
# 
def get_dashboard_ids(name: str, session) -> List[str]:
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    
    ids: List[str] = []
    for dashboard in list_dashboards(session):
        if dashboard["Name"] == name:
            ids.append(dashboard["DashboardId"])
    return ids

def get_dashboard_name(did: str, session) -> List[str]:
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    
    name: str
    for dashboard in list_dashboards(session):
        if dashboard["DashboardId"] == did:
            name=dashboard["Name"]
    return name

def get_dataset_name(did: str, session) -> List[str]:
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    
    name: str
    for dataset in data_sets(session):
        if dataset["DataSetId"] == did:
            name=dataset["Name"]
    return name

def get_dataset_ids(name: str, session) -> List[str]:
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
   
    ids: List[str] = []
    for dataset in data_sets(session):
        if dataset["Name"] == name:
            ids.append(dataset["DataSetId"])
    return ids

def get_datasource_name(did: str, session) -> List[str]:
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    
    name: str
    for datasource in data_sources(session):
        if datasource["DataSourceId"] == did:
            name=datasource["Name"]
    return name

def get_datasource_ids(name: str, session) -> List[str]:
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
   
    ids: List[str] = []
    for datasource in data_sources(session):
        if datasource["Name"] == name:
            ids.append(datasource["DataSourceId"])
    return ids

<strong>List current Data sources/datasets<strong/>

In [None]:
#
def data_sources (session):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    token=None

    try:
        sourceslist=list_sources (session, 100)
    except Exception:
            print(traceback.format_exc())

    if sourceslist['ResponseMetadata']['HTTPStatusCode']==200:
        datasources=sourceslist['DataSources']
        if 'NextToken' in sourceslist:
            token=sourceslist['NextToken']
            #print('get next list of datasources')
            while token is not None:
                datasourcelistnext=list_sources (session, 100, token)
                datasources=datasources+datasourcelistnext['DataSources']

                if 'NextToken' not in datasourcelistnext:
                    #print("Get all the data sources!")
                    break

                elif 'NextToken' in datasourcelistnext:
                    token=datasourcelistnext['NextToken']
                    continue
        else: 
            pass
            #print('token is none. No further action.')
    else:
        print("List data sources failed!")

    return datasources

def list_sources (session, MaxResults, Token=None):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    if Token is None:
        response = qs.list_data_sources(
        AwsAccountId=AccountId,
        MaxResults=MaxResults)
    else:
        response = qs.list_data_sources(
        AwsAccountId=AccountId,
        NextToken=Token,
        MaxResults=MaxResults)
    return response
    
def data_sets (session):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    token=None

    try:
        dslist=list_datasets (session,Token=None,Max=100)
    except Exception:
            print(traceback.format_exc())

    #print(dslist)
    if dslist['ResponseMetadata']['HTTPStatusCode']==200:
        datasets=dslist['DataSetSummaries']
        if 'NextToken' in dslist:
            token=dslist['NextToken']
            print('get next list of datasets')
            while token is not None:
                datasetslistnext=list_datasets (session, token,100)
                datasets=datasets+datasetslistnext['DataSetSummaries']

                if 'NextToken' not in datasetslistnext:
                    print("Get all the data sets!")
                    break

                elif 'NextToken' in datasetslistnext:
                    token=datasetslistnext['NextToken']
                    continue
        else: pass
            #print('token is none. No further action.')
    else:
        print("List data sets failed!")

    return datasets

def list_datasets (session,Token=None,Max=100):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    if Token is None:
        response = qs.list_data_sets(
        AwsAccountId=AccountId,
        MaxResults=Max)
    else:    
        response = qs.list_data_sets(
        AwsAccountId=AccountId,
        NextToken=Token,
        MaxResults=Max)
    return response

def templates (session):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    token=None
    
    args: Dict[str, Any] = {
        "AwsAccountId": account_id,
        "MaxResults": 100,
    }
    
    tlist=qs.list_templates(**args)

    templates=tlist['TemplateSummaryList']
        
    if 'NextToken' in tlist:
        token=tlist['NextToken']
        while token is not None:
            args["NextToken"] = token
            tlist=qs.list_templates(**args)
            templates.append(tlist['TemplateSummaryList'])
            token = tlist.get("NextToken", None)         
    else: pass
        #print('token is none. No further action.')

    return templates

def dashboards(session)-> List[Dict[str, Any]]:
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    dashboards = []
    response = qs.list_dashboards(AwsAccountId=account_id)
    next_token: str = response.get("NextToken", None)
    dashboards += response["DashboardSummaryList"]
    while next_token is not None:
        response = qs.list_dashboards(AwsAccountId=account_id, NextToken=next_token)
        next_token = response.get("NextToken", None)
        dashboards += response["DashboardSummaryList"]
    return dashboards

<strong>Describe a Data source/dataset<strong/>

In [None]:
#
def describe_source (session, DSID):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    response = qs.describe_data_source(
        AwsAccountId=AccountId,
        DataSourceId=DSID)
    return response


#Describe a Dataset
def describe_dataset (session, DSID):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    response = qs.describe_data_set(
        AwsAccountId=AccountId,
        DataSetId=DSID)
    return response

def describe_dashboard(session, dashboard):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    try:
        response = qs.describe_dashboard(
        AwsAccountId=account_id,
        DashboardId=dashboard)
    except Exception as e:
                return ('Faild to describe dashboard: '+str(e))
    else: return response

def describe_template(session,tid):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    response = qs.describe_template(
        AwsAccountId=account_id,
        TemplateId=tid)
    return response

<strong>delete object <strong/>

In [None]:
def delete_source (session, DataSourceId):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    delsource = qs.delete_data_source(
        AwsAccountId=AccountId,
        DataSourceId=DataSourceId)
    return delsource

def delete_dataset (session, DataSetId):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    response = qs.delete_data_set(
        AwsAccountId=AccountId,
        DataSetId=DataSetId)
    return response

def delete_template (session, tid, version=None):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    args: Dict[str, Any] = {
        "AwsAccountId": AccountId,
        "TemplateId": tid,
    }
    if version:
        args["VersionNumber"] = version
        
    response = qs.delete_template(**args)
    return response

def delete_dashboard(session, did):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    response = qs.delete_dashboard(
    AwsAccountId=account_id,
    DashboardId=did)
    return response

<html>
<body>
<strong>Create objects in QuickSight:</strong>
</body>

</html>

def create_data_source (account, dsourceid, dsourcename, dsourcetype, parameter, ssl, vpcconnection, profile, permission, credential=None, Tags=None):
    session = boto3.Session(profile_name=profile)
    qs = session.client('quicksight')
    
    args: Dict[str, Any] = {
        "AwsAccountId": account,
        "DataSourceId": dsourceid,
        "Name": dsourcename,
        "Type": dsourcetype,
        "SslProperties": ssl,
    }
        
    if parameter is not None:
        args["DataSourceParameters"] = parameter
    
    if Tags is not None:
        _tags: List[Dict[str, str]] = [{"Key": k, "Value": v} for k, v in Tags.items()]
        args["Tags"] = _tags
        
    if credential is not None:
        args["Credentials"] = credential
    
    if vpcconnection is not None:
        args["VpcConnectionProperties"] = vpcconnection
    
    NewSource = qs.create_data_source(**args)
    
    return NewSource
        
    

<strong> Create Objects <strong/>

In [None]:
def create_data_source (source, session, target):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    credential=None
     
    #rds
    if source['Type'].lower() in ['aurora','aurorapostgresql','mariadb','mysql','postgresql','sqlserver'] and 'RdsParameters' in source['DataSourceParameters']:    
        #Update data source instance name
        instance_id=source['DataSourceParameters']['RdsParameters']
        instance_id['InstanceId'] = target['rds']['rdsinstanceid']
        credential=target['credential']['rdscredential']
        
        #redshift
    if source['Type']=="REDSHIFT":
            
        #Update data source instance name
        Cluster=source['DataSourceParameters']['RedshiftParameters']
        if 'ClusterId' in Cluster:
            Cluster['ClusterId'] = target['redshift']['ClusterId']
        Cluster['Host'] = target['redshift']['Host']
        if target['redshift']['Database'] is not None and 'Database' in Cluster:
            Cluster['Database'] = target['redshift']['Database']
        credential=target['credential']['redshiftcredential']
            
    if 'VpcConnectionProperties' in source and target['vpc'] is not None:
            source['VpcConnectionProperties']['VpcConnectionArn'] = target['vpc']
    elif 'VpcConnectionProperties' in source and target['vpc'] is None:
            raise Exception("Sorry, you need the targetvpc information")

    args: Dict[str, Any] = {
        "AwsAccountId": account_id,
        "DataSourceId": source['DataSourceId'],
        "Name": source['Name'],
        "Type": source['Type'],
    }
    
    if "SslProperties" in source: 
        args["SslProperties"] = source['SslProperties']
        
    if 'DataSourceParameters' in source:
        args["DataSourceParameters"] = source['DataSourceParameters']
    
    if target['tag'] is not None:
        args["Tags"] = target['tag']
        
    if credential is not None:
        args["Credentials"] = credential
    
    if 'VpcConnectionProperties' in source:
        args["VpcConnectionProperties"] = source['VpcConnectionProperties']
    
    args["Permissions"] = target['datasourcepermission']
    
    try:
        NewSource = qs.create_data_source(**args)
        return NewSource
    except Exception as e:
        error={"DataSource": args, "Error": str(e)}
        return error
  

In [None]:
def loaddsinput (file, part):
    import json
    with open(file) as f:
        data = json.load(f)
    
    res=data['DataSet'][part]
    
    return res




In [None]:
#AccountId string; DataSetId string; Name string; Physical: json; Logical: json; Mode: string;
#ColumnGroups: json array; Permissions: json array; RLS: json; Tags: json array
def create_dataset (session, DataSetId, Name, Physical, Logical, Mode, Permissions):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    AccountId = sts_client.get_caller_identity()["Account"]
    response = qs.create_data_set(
    AwsAccountId=AccountId,
    DataSetId=DataSetId,
    Name=Name,
    PhysicalTableMap=Physical,
    LogicalTableMap=Logical,
    ImportMode=Mode,
    Permissions=Permissions)
    return response

In [None]:
def create_template(session, TemplateId, tname, dsref, sourceanalysis, version):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    try:
        delete_template(session, TemplateId)
    except Exception:
        print(traceback.format_exc())
    #assert isinstance(TemplateId, int), ''
    finally:
        response = qs.create_template(
            AwsAccountId=account_id,    
            TemplateId=TemplateId,
            Name=tname,
            SourceEntity={
                'SourceAnalysis': {
                    'Arn': sourceanalysis,
                    'DataSetReferences': dsref
                }
            },
            VersionDescription=version
        )
        return response
    



def update_template_permission(session, TemplateId, Principal):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    response = qs.update_template_permissions(
        AwsAccountId=account_id,
        TemplateId=TemplateId,
        GrantPermissions=[
            {
                'Principal': Principal,
                'Actions': [
                    'quicksight:DescribeTemplate',
                ]
            }
        ]
    )
    return response

#res=update_template_permission('810061268089', 'covid19czbdev', 'arn:aws:iam::889399602426:root')
    
def copy_template(session, TemplateId, tname, SourceTemplatearn):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    try:
        delete_template(session, TemplateId)
    except Exception:
        print(traceback.format_exc())
    #assert isinstance(TemplateId, int), ''
    finally:
        response = qs.create_template(
            AwsAccountId=account_id,    
            TemplateId=TemplateId,
            Name=tname,
            SourceEntity={
                'SourceTemplate': {
                    'Arn': SourceTemplatearn
                }
            },
            VersionDescription='1'
        )
        return response

def create_dashboard(session, dashboard, name,principal, SourceEntity, version, filter='ENABLED',csv='ENABLED', sheetcontrol='EXPANDED'):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]
    response = qs.create_dashboard(
        AwsAccountId=account_id,
        DashboardId=dashboard,
        Name=name,
        Permissions=[
            {
                'Principal': principal,
                'Actions': [
                'quicksight:DescribeDashboard',
                'quicksight:ListDashboardVersions',
                'quicksight:UpdateDashboardPermissions',
                'quicksight:QueryDashboard',
                'quicksight:UpdateDashboard',
                'quicksight:DeleteDashboard',
                'quicksight:DescribeDashboardPermissions',
                'quicksight:UpdateDashboardPublishedVersion' 
                ]
            },
        ],
        SourceEntity=SourceEntity,
        VersionDescription=version,
        DashboardPublishOptions={
            'AdHocFilteringOption': {
                'AvailabilityStatus': filter
            },
            'ExportToCSVOption': {
                'AvailabilityStatus': csv
            },
            'SheetControlsOption': {
                'VisibilityState': sheetcontrol
            }
        }
    )

    return response

def update_dashboard(session, dashboard, name, SourceEntity, version, filter='ENABLED',csv='ENABLED', sheetcontrol='EXPANDED'):
    qs = session.client('quicksight')
    sts_client = session.client("sts")
    account_id = sts_client.get_caller_identity()["Account"]

    response = qs.update_dashboard(
    AwsAccountId=account_id,
    DashboardId=dashboard,
    Name=name,
    SourceEntity=SourceEntity,
    VersionDescription=version,
    DashboardPublishOptions={
        'AdHocFilteringOption': {
            'AvailabilityStatus': filter
        },
        'ExportToCSVOption': {
            'AvailabilityStatus': csv
        },
        'SheetControlsOption': {
            'VisibilityState': sheetcontrol
        }
    })
    
    return response