# ibm_sql_query_cpd

Execute arbitrary SQL queries against CSV and PARQUET files using IBM Cloud SQL Query and Cloud Object Store

In [None]:
"""
import os
create_image = bool(os.environ.get('create_image',False))
if (create_image):
    docker_file="""
    FROM registry.access.redhat.com/ubi8/python-39
    RUN pip install ipython nbformat numpy ibm-cos-sdk-core ibm-cos-sdk ibm-watson-machine-learning ibm-watson-studio-pipelines ibmcloudsql
    ADD ibm-sql-query-cpd.ipynb .
    """
    with open("Dockerfile", "w") as text_file:
        text_file.write(docker_file)

    !docker build -t ibm_sql_query_cpd .
    exit()
else:
    !pip install nbformat numpy ibm-cos-sdk-core ibm-cos-sdk ibm-watson-machine-learning ibm-watson-studio-pipelines ibmcloudsql
"""

In [None]:
import glob
import logging
import ibmcloudsql
from ibmcloudsql import SQLQuery
import os
import shutil
import sys
import re
from ibm_watson_machine_learning import APIClient
from ibm_watson_studio_pipelines import WSPipelines
from ibm_watson_studio_pipelines.cpd_paths import CpdScope, CpdPath

In [None]:
# target dir_path
target_dir_path = os.environ.get('target_dir_path')

# target asset name
target_asset_name = os.environ.get('target_asset_name')

# sql statement to execute ()
sql = os.environ.get('sql')

# IBM Cloud Token (alternative to API key)
token = os.environ.get('token')

# (unique) Custom Resource Name (CRN) of IBM SQL Query Service
sql_query_crn = os.environ.get('sql_query_crn')

# default: CSV - (will be generated into according STORED AS … clause in the INTO clause)
format = os.environ.get('format' , 'CSV')

# if set - will be generated into according PARTITIONED BY (<columns>) clause in the INTO clause)
partition_columns = os.environ.get('partition_columns')

# will be generated into according PARTITIONED INTO <num> OBJECTS clause in INTO clause
number_of_objects = os.environ.get('number_of_objects')

# will be generated into according PARTITIONED EVERY <num> ROWS clause in INTO clause
rows_per_object = os.environ.get('rows_per_object')

# default: False - only valid when no partitioning option is specified. Will be generated into sqlClient.rename_exact_result(jobid) after SQL has run.
exact_name = bool(os.environ.get('exact_name', False))

# default: False - will be generated into JOBPREFIX NONE in the INTO clause. Will cause results of previous runs with same output_uri to be overwritten, because no unique sub folder will be created for the result)
no_jobid_folder = bool(os.environ.get('no_jobid_folder', False))

# default: output.txt - output file name containing the CPD path of the resulting asset
output_file_name = os.environ.get('output_file_name','output.txt')


In [None]:
token = 'eyJraWQiOiIyMDIyMDIxNTA4MjMiLCJhbGciOiJSUzI1NiJ9.eyJpYW1faWQiOiJJQk1pZC0yNzAwMDI1NzNZIiwiaWQiOiJJQk1pZC0yNzAwMDI1NzNZIiwicmVhbG1pZCI6IklCTWlkIiwic2Vzc2lvbl9pZCI6IkMtYmEyZTNhNGEtNmU2NS00MjM2LWEyODktNTVkMzJkOGIxYjMzIiwic2Vzc2lvbl9leHBfbWF4IjoxNjQ2NzU0NTEwLCJzZXNzaW9uX2V4cF9uZXh0IjoxNjQ2Njc1MzIzLCJqdGkiOiJlZGRlNzMwYi05YWMzLTQ0MTUtYWJlYS1iMGEyODc5NjNkMDUiLCJpZGVudGlmaWVyIjoiMjcwMDAyNTczWSIsImdpdmVuX25hbWUiOiJSb21lbyIsImZhbWlseV9uYW1lIjoiS2llbnpsZXIiLCJuYW1lIjoiUm9tZW8gS2llbnpsZXIiLCJlbWFpbCI6InJvbWVvLmtpZW56bGVyQGNoLmlibS5jb20iLCJzdWIiOiJyb21lby5raWVuemxlckBjaC5pYm0uY29tIiwiYXV0aG4iOnsic3ViIjoicm9tZW8ua2llbnpsZXJAY2guaWJtLmNvbSIsImlhbV9pZCI6IklCTWlkLTI3MDAwMjU3M1kiLCJuYW1lIjoiUm9tZW8gS2llbnpsZXIiLCJnaXZlbl9uYW1lIjoiUm9tZW8iLCJmYW1pbHlfbmFtZSI6IktpZW56bGVyIiwiZW1haWwiOiJyb21lby5raWVuemxlckBjaC5pYm0uY29tIn0sImFjY291bnQiOnsiYm91bmRhcnkiOiJnbG9iYWwiLCJ2YWxpZCI6dHJ1ZSwiYnNzIjoiOWIxM2I4NTdhMzIzNDFiNzE2NzI1NWRlNzE3MTcyZjUiLCJpbXNfdXNlcl9pZCI6IjgwODAyMjIiLCJpbXMiOiIyMDMyNDc0In0sImlhdCI6MTY0NjY2ODEyMywiZXhwIjoxNjQ2NjY5MzIzLCJpc3MiOiJodHRwczovL2lhbS5jbG91ZC5pYm0uY29tL2lkZW50aXR5IiwiZ3JhbnRfdHlwZSI6InVybjppYm06cGFyYW1zOm9hdXRoOmdyYW50LXR5cGU6cGFzc2NvZGUiLCJzY29wZSI6ImlibSBvcGVuaWQiLCJjbGllbnRfaWQiOiJieCIsImFjciI6MSwiYW1yIjpbInB3ZCJdfQ.Z8BEMgddgaNPpFswM9GlH6_XQRTTItHQ4xBFqPip2bLv90y-0xyp_VZJ9Z8g8Ux1dxmvuxwoq9o48iX8ruABO0S3-XRV2BvlKvilB7y_5aTZ8R5bvzHp-EsmOS-77CGVf7t8zs95-gnHDPlimEvcHERHXoLGm9xG0smNvUGxL_BRN_-1V37jb59kVF1UUS3PN7D_yMAwe3lnBcsDDUn51liAECyWLzEC0Z4j0-6q8I_fndckQXAdsHcALF_B_9OUIc-7ngFShFhNQPV6hqb29ruBgb1ViZV7ukC5Cen5IGghvpG2hOtjl-_x5kl9Y7aqwnUU-tYC6Q81D9BAo7pVEw'
cos_location='cpd:///projects/e0bce158-a9e4-4be6-a1da-20a04a7770f5/connections/de544f56-05c0-4dda-9e00-06458052c467/files/cos-rkie-sql-engine-test/sql_results'
sql='SELECT * FROM cos://eu-de/claimed-test/data.parquet stored as parquet'
sql_query_crn='crn:v1:bluemix:public:sql-query:us-south:a/9b13b857a32341b7167255de717172f5:f9dd6c9e-b24b-4506-819e-e038c92339e4::'
target_asset_name='target_asset_name'

In [None]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

exact_name = bool(exact_name)
no_jobid_folder = bool(no_jobid_folder)


for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [None]:
cpdaas_url = 'https://api.dataplatform.cloud.ibm.com'

In [None]:
cos_location_path = CpdPath.from_string(cos_location)
project_id = cos_location_path.scope_id()
connection_id = cos_location_path.resource_id()
bucket_name = cos_location_path.bucket_name()
dir_path = cos_location_path.file_path()

In [None]:
pipelines_client = WSPipelines.from_token(token, url=cpdaas_url)
wml_credentials = pipelines_client.get_wml_credentials(cos_location_path.scope())

In [None]:
wml_credentials['token'] = token
wml_client = APIClient(wml_credentials)
wml_client.set.default_project(project_id)

In [None]:
wml_client.connections.get_details(connection_id)

In [None]:
cos_props = wml_client.connections.get_details(connection_id)
cos_props

In [None]:
cos_props = cos_props['entity']['properties']
access_key = cos_props['access_key']
secret_key = cos_props['secret_key']
url = cos_props['url'].split('//')[1]
bucket = cos_props['bucket']
target_location = "cos://{}/{}{}".format(url,bucket, dir_path)
target_location

In [None]:
sqlClient = SQLQuery(api_key=None, token=token, instance_crn=sql_query_crn)

sql = sql + ' INTO {}'.format(target_location)

partitioned_by = False

if partition_columns is not None and len(partition_columns) > 0:
    if not partitioned_by:
        sql = sql + ' PARTITIONED'
        partitioned_by = True
    sql = sql + ' BY ({})'.format(partition_columns)

if number_of_objects is not None and len(number_of_objects) > 0:
    if not partitioned_by:
        sql = sql + ' PARTITIONED'
        partitioned_by = True
    sql = sql + ' INTO {} OBJECTS'.format(number_of_objects)    

if rows_per_object is not None and len(rows_per_object) > 0:
    if not partitioned_by:
        sql = sql + ' PARTITIONED'
        partitioned_by = True
    sql = sql + ' EVERY {} ROWS'.format(rows_per_object)

if no_jobid_folder:
    sql = sql + ' JOBPREFIX NONE'
    
sql = sql + ' STORED AS {}'.format(format)

if exact_name:
    job_id = sqlClient.submit_sql(sql)
    job_status = sqlClient.wait_for_job(job_id)
    print("Job " + job_id + " terminated with status: " + job_status)
    sqlClient.rename_exact_result(job_id) 
else:
    sqlClient.run_sql(sql)

In [None]:
print(sql)

In [None]:
metadata = {
  wml_client.data_assets.ConfigurationMetaNames.NAME: target_asset_name,
  wml_client.data_assets.ConfigurationMetaNames.DESCRIPTION: 'Data asset created for SQL Query result',
  wml_client.data_assets.ConfigurationMetaNames.CONNECTION_ID: connection_id,
  wml_client.data_assets.ConfigurationMetaNames.DATA_CONTENT_NAME: dir_path
}
asset_details = wml_client.data_assets.store(meta_props=metadata)


In [None]:
new_cpd_path = cos_location.split('connections')[0]+'assets/'+asset_details['metadata']['asset_id']
with open(output_file_name,'w') as fout:
    fout.write(new_cpd_path)

1. create new CPD-path with asset_details.id  intead of connectionid (keep project id)
2. new parameter for output file name + add to output: section in yaml + write cpd-path into it


outputs:
 {name: data_asset, }
 
implementation:

...
  - {outputPath: data_asset}