Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow SQL datasets export #638

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cid/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
try:
dashboard = self.qs.discover_dashboard(dashboardId=dashboard_id)
except CidCritical:
pass
pass

if not dashboard_definition:
if isinstance(dashboard, Dashboard):
dashboard_definition = dashboard.definition
Expand Down Expand Up @@ -445,8 +445,13 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs
dashboard_definition['datasets'] = {}

for dataset_name in required_datasets_names:
dataset = None
# First try to find the dataset with the id
dataset = self.qs.describe_dataset(id=dataset_name)
try:
dataset = self.qs.describe_dataset(id=dataset_name)
except Exception as exc:
logger.debug(f'Failed to describe_dataset {dataset_name} {exc}')

if isinstance(dataset, Dataset):
logger.debug(f'Found dataset {dataset_name} with id match = {dataset.arn}')
dashboard_definition['datasets'][dataset_name] = dataset.arn
Expand Down Expand Up @@ -515,7 +520,7 @@ def _deploy(self, dashboard_id: str=None, recursive=True, update=False, **kwargs

print(f'Deploying dashboard {dashboard_id}')
try:
dashboard = self.qs.create_dashboard(dashboard_definition, **kwargs)
dashboard = self.qs.create_dashboard(dashboard_definition)
print(f"\n#######\n####### Congratulations!\n####### {dashboard_definition.get('name')} is available at: {_url}\n#######")
self.track('created', dashboard_id)
except self.qs.client.exceptions.ResourceExistsException:
Expand Down
60 changes: 35 additions & 25 deletions cid/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
AnalysisId=analysis_id
)['Analysis']

logger.info("analysing datasets")
logger.info("analyzing datasets")
resources = {}
resources['dashboards'] = {}
resources['datasets'] = {}
Expand All @@ -102,9 +102,9 @@
dataset_references = []
datasets = {}
all_views = []
all_databases = []
all_databases = []
for dataset_arn in analysis['DataSetArns']:
dependancy_views = []
dependency_views = []
dataset_id = dataset_arn.split('/')[-1]
dataset = qs.describe_dataset(dataset_id)

Expand Down Expand Up @@ -135,31 +135,42 @@
}

for key, value in dataset_data['PhysicalTableMap'].items():
if 'RelationalTable' not in value \
or 'DataSourceArn' not in value['RelationalTable'] \
or 'Schema' not in value['RelationalTable']:
raise CidCritical(f'Dataset {key} does not seems to be Antena dataset. Only Athena datasets are supported.' )
all_databases.append(value['RelationalTable']['Schema'])
value['RelationalTable']['DataSourceArn'] = '${athena_datasource_arn}'
value['RelationalTable']['Schema'] = '${athena_database_name}'
athena_source = value['RelationalTable']['Name']
views_name = athena_source.split('.')[-1]
dependancy_views.append(views_name)
all_views.append(views_name)
if 'RelationalTable' in value \
and 'DataSourceArn' in value['RelationalTable'] \
and 'Schema' in value['RelationalTable']:
logger.debug(f"Dataset {dataset.raw['DataSetId']} looks like classic athena dataset")
value['RelationalTable']['DataSourceArn'] = '${athena_datasource_arn}'
all_databases.append(value['RelationalTable']['Schema'])
value['RelationalTable']['Schema'] = '${athena_database_name}'
athena_source = value['RelationalTable']['Name']
views_name = athena_source.split('.')[-1]
dependency_views.append(views_name)
all_views.append(views_name)
elif 'CustomSql' in value and 'DataSourceArn' in value['CustomSql']:
logger.debug(f"Dataset {dataset.raw['DataSetId']} looks like CustomSql athena dataset")
value['CustomSql']['DataSourceArn'] = '${athena_datasource_arn}'
databases = [db_['Name'] for db_ in athena.list_databases()]
for database in databases:
if f'{database}.' in value['CustomSql']['SqlQuery'] or f'"{database}".' in value['CustomSql']['SqlQuery']:
logger.debug(f"Replacing {database} in text")
value['CustomSql']['SqlQuery'] = value['CustomSql']['SqlQuery'].replace(database, '${athena_database_name}')
logger.warning(f"Dataset {dataset.raw['DataSetId']} use SqlQuery. Discovery of tables and views for that is not supported yet. Please add them manually")
else:
raise CidCritical(f"Dataset {key} {dataset.raw['Name']} do not seems to be Athena dataset. Only Athena datasets are supported." )

for key, value in dataset_data.get('LogicalTableMap', {}).items():
if 'Source' in value and "DataSetArn" in value['Source']:
#FIXME add value['Source']['DataSetArn'] to the list of dataset_arn s
#FIXME add value['Source']['DataSetArn'] to the list of dataset_arn
raise CidCritical(f"DataSet {dataset.raw['Name']} contains unsupported join. Please replace join of {value.get('Alias')} from DataSet to DataSource")

dep_cur = False
for dep_view in dependancy_views[:]:
for dep_view in dependency_views[:]:
Dismissed Show dismissed Hide dismissed
if cur_helper.table_is_cur(name=dep_view):
dependancy_views.remove(dep_view)
dependency_views.remove(dep_view)
dep_cur = True
datasets[dataset_name] = {
'data': dataset_data,
'dependsOn': {'views': dependancy_views},
'dependsOn': {'views': dependency_views},
'schedules': ['default'], #FIXME: need to read a real schedule
}
if dep_cur:
Expand Down Expand Up @@ -187,7 +198,7 @@
if isinstance(view_data.get('data'), str):
view_data['data'] = view_data['data'].replace('CREATE VIEW ', 'CREATE OR REPLACE VIEW ')

# Analyse dependancies: if the dependancy is CUR there is a special flag
# Analyse dependencies: if the dependency is CUR there is a special flag
deps = view_data.get('dependsOn', {})
non_cur_dep_views = []
for dep_view in deps.get('views', []):
Expand Down Expand Up @@ -247,7 +258,7 @@
dashboard_resource['dependsOn'] = {
# Historically CID uses dataset names as dataset reference. IDs of manually created resources have uuid format.
# We can potentially reconsider this and use IDs at some point
'datasets': sorted(list(set([dataset_name for dataset_name in datasets.keys()] + resources_datasets)))
'datasets': sorted(list(set([dataset_name for dataset_name in datasets] + resources_datasets)))
}
dashboard_resource['name'] = analysis['Name']
dashboard_resource['dashboardId'] = dashboard_id
Expand Down Expand Up @@ -319,7 +330,7 @@
default='*'
)

time.sleep(5) # Some times update_template_permissions does not work immediatly.
time.sleep(5) # Some times update_template_permissions does not work immediately.

res = qs.update_template_permissions(
TemplateId=template_id,
Expand All @@ -342,9 +353,9 @@
AnalysisId=analysis_id,
)['Definition']

for datasest in definition.get('DataSetIdentifierDeclarations', []):
for dataset in definition.get('DataSetIdentifierDeclarations', []):
# Hide region and account number of the source account
datasest["DataSetArn"] = 'arn:aws:quicksight:::dataset/' + datasest["DataSetArn"].split('/')[-1]
dataset["DataSetArn"] = 'arn:aws:quicksight:::dataset/' + dataset["DataSetArn"].split('/')[-1]
dashboard_resource['data'] = yaml.safe_dump(definition)

resources['dashboards'][analysis['Name'].upper()] = dashboard_resource
Expand All @@ -358,7 +369,7 @@
default=f"{analysis['Name'].replace(' ', '-')}.yaml"
)

with open(output, "w") as output_file:
with open(output, "w", encoding='utf-8') as output_file:
output_file.write(yaml.safe_dump(resources, sort_keys=False))
cid_print(f'Output: <BOLD>{output}<END>')

Expand All @@ -369,4 +380,3 @@
qs = QuickSight(boto3.session.Session(), boto3.client('sts').get_caller_identity())
athena = Athena(boto3.session.Session(), boto3.client('sts').get_caller_identity())
export_analysis(qs, athena)