# Integration between IBM CP4D Notebook of Analytics Project and DataStage of Transformation Project

## Libraries and Connection import

In [1]:
# @hidden_cell

# Importing project to access connections
from project_lib import Project

project = Project.access()

# Fetching the COS details
cos_con_metadata = project.get_connection(name="IBM_COS_CONN")
# Fetching the DB2 details
db2_con_metadata = project.get_connection(name="DB2_CONN")

# Importing all the header files
# File processing tools
import pandas as pd
import io
from io import StringIO
import datetime
# COS libraries
import ibm_boto3
from botocore.client import Config
# DB2 Warehouse libraries
import ibm_db
import ibm_db_dbi
# Library for calling DataStage API
import requests
import base64

# Global Constant Variables
global COL_COUNT_CON
global ROW_COUNT_CON
COL_COUNT_CON = 0
ROW_COUNT_CON = 0

## Creating connections
IBM COS and DB2 connections are created in the code below.


In [2]:
def create_connections(cos_con_metadata, db2_con_metadata):
    # Connecting to COS
    client_cos_con = ibm_boto3.client(
        service_name='s3',
        ibm_api_key_id=cos_con_metadata['api_key'],
        ibm_auth_endpoint=cos_con_metadata['iam_url'],
        config=Config(signature_version='oauth'),
        endpoint_url=cos_con_metadata['url']
    )

    # Connecting to DB2
    db2_url = "DATABASE=" + db2_con_metadata['database'] + ";HOSTNAME=" \
              + db2_con_metadata['host'] + ";PORT=" \
              + db2_con_metadata['port'] \
              + ";PROTOCOL=TCPIP;QUERYTIMEOUTINTERVAL=0;UID=" \
              + db2_con_metadata['username'] + ";PWD=" \
              + db2_con_metadata['password'] + ";"
    conn = ibm_db.connect(db2_url, "", "")
    connect = ibm_db_dbi.Connection(conn)
    return conn, connect, client_cos_con

## Parsing the configuration
The configuration file kept in COS Bucket. The configuration values will be entered by users in this file.

In [3]:
def parse_config_file(bucket_name):
    # Configuration file is stored in IBM COS having user entered file details
    streaming_body_1 = client_cos_con.get_object(
        Bucket=bucket_name,
        Key='CONFIG/CONFIGURATION.INI')['Body']
    for items in streaming_body_1:
        items = str(items).replace("b'", "").replace("'", "")
        # Extracting values entered by user
        client_name, file_name, target_table_name, dsjob, proj_name, userid, \
            passwd = items.split("|")
    return \
        client_name, file_name, target_table_name, dsjob, \
        proj_name, userid, passwd

## File Operations
This function performs:
1. Validation on user file.
2. Save the success and failure status in DB2 audit table.
3. Call the datastage API.
4. Call the function to store the validated file into IBM COS.

In [4]:
def file_operations(
        bucket_name, file_name, schema_name, table_name, conn,
        dsjob, proj_name, userid, passwd, COL_COUNT_CON,
        ROW_COUNT_CON):
    # Load File in dataframe to perform operations
    obj = client_cos_con.get_object(Bucket=bucket_name, Key=file_name)

    # Display all the digits after decimal
    pd.options.display.float_format = '{:.9f}'.format
    content = obj['Body'].read()

    # Load the file into dataframe
    load_df = pd.read_csv(
        io.BytesIO(content), sep='\t', dtype=object, low_memory=False,
        float_precision='round_trip')

    # sysibm.syscolumns holds all the colum names for all tables
    column_count_sql = "select count(*) from sysibm.syscolumns where " \
                       "tbcreator = '" + schema_name + "' and tbname = '" \
                       + table_name + "';"
    stmt = ibm_db.exec_immediate(conn, column_count_sql)
    result = ibm_db.fetch_assoc(stmt)

    # '1' index holds the column count
    target_table_col_count = result['1']

    # Check whether target table and file column count is matching or not
    if str(target_table_col_count) == str(len(load_df.columns.values)):
        print("Equal Column count between file and table!")
    else:
        print("Unequal column count between file and table!")
        error_description_1 = "Unequal column count between file and table!"
        COL_COUNT_CON = 1

    """
    Extract the number of lines from the last line
    Last line in each file holds the number of rows entered by user
    """
    num_of_lines = str(
        load_df[load_df.columns[0]].iloc[-1]).replace("\
                        TRAILER: ", "").replace(" Data Records", "")

    # Get the number of rows from dataframe, i.e., file
    len_of_df = len(load_df) - 1

    # Comparison of user provided last line and number of rows
    if str(num_of_lines) == str(len_of_df):
        print("Equal number of rows between file rows and Trailer count!")
    else:
        print("Unequal number of rows between file rows and Trailer count!")
        error_description_2 = "Unequal number of rows between file rows and\
                                Trailer count!"
        ROW_COUNT_CON = 1

    # Fetch the maximum id from audit status table
    check_rows = "select max(ID) from demo.audit_status;"
    stmt = ibm_db.exec_immediate(conn, check_rows)
    result = ibm_db.fetch_assoc(stmt)
    audit_rows = 0 if result['1'] is None else result['1']
    id = int(audit_rows) + 1

    # Extract client name from file name
    client_name = (str(file_name).split("_")[0]).replace("/landing", "")

    # Extract file name
    file_tab_name = str(file_name).split("/")[2]

    # Value of today's date
    today = datetime.datetime.now()
    etl_ld_dt = today.strftime("%Y-%m-%d %H:%M:%S")

    """
    Checking whether the column count and row count matches or not.
    If not, then do not perform audit operations.
    """
    if COL_COUNT_CON == 0 and ROW_COUNT_CON == 0:
        # Insert values in audit table
        insert_audit_sql = "insert into demo.audit_table (ID,CLIENT_NAME,\
                                RECON_LYR,TABLE_NM,METRIC_TYP,METRIC_VAL,\
                                ETL_LD_DT) values (" + str(id) + ", '" \
                           + client_name + "', 'SOURCE', '" \
                           + file_tab_name + "', 'COUNT', " \
                           + num_of_lines + ", '" + etl_ld_dt + "');"
        insert_audit_stmt = ibm_db.prepare(conn, insert_audit_sql)
        ibm_db.execute(insert_audit_stmt)

        """
        Insert the success status of file in audit table
        if no discrepancy exist in row and column count.
        """
        insert_batch_audit_sql = "insert into demo.audit_table \
                                    (ID,CLIENT_ID,DATA_LYR,JOB_NAME,\
                                    ETL_LD_DT,JOB_STATUS) values (" + str(id) \
                                 + ", '" + client_name + "', 'SOURCE', '" \
                                 + file_tab_name + "', '" + etl_ld_dt \
                                 + "', 'SUCCESS');"
        insert_batch_audit_stmt = ibm_db.prepare(conn, insert_batch_audit_sql)
        ibm_db.execute(insert_batch_audit_stmt)

        # Strip off the trailer record line from the file
        load_df.drop(load_df.tail(1).index, inplace=True)

        # Load dataframe into COS
        store_file_name = str(file_name).split("/")[2]
        storepath = client_name + "/processed_zone/" + store_file_name

        # call function to load the dataframe to IBM COS
        copy_to_s3(client_cos_con, load_df, bucket_name, storepath)

        # Call the datastage api to run the internal transformation command
        call_dsjob_api(dsjob, proj_name, userid, passwd)
    elif COL_COUNT_CON == 1 or ROW_COUNT_CON == 1:
        """
        Insert the failure status of file in audit table
        if discrepancy exist in row and column count.
        """
        # Unmatched Column Count
        if COL_COUNT_CON == 1:
            insert_batch_audit_sql_1 = "insert into demo.audit_table (ID,\
                                            CLIENT_ID,DATA_LYR,JOB_NAME,\
                                            ETL_LD_DT,JOB_STATUS,DESCRIPTION)\
                                            values (" + str(id) + ", '" \
                                       + client_name + "', 'SOURCE', '" \
                                       + file_tab_name + "', '" + etl_ld_dt \
                                       + "', 'FAILURE', '" \
                                       + error_description_1 + "');"
            insert_batch_audit_stmt_1 = ibm_db.prepare(
                conn, insert_batch_audit_sql_1)
            ibm_db.execute(insert_batch_audit_stmt_1)

        # Unmatched row count
        if ROW_COUNT_CON == 1:
            insert_batch_audit_sql_2 = "insert into demo.audit_table (ID,\
                                            CLIENT_ID,DATA_LYR,JOB_NAME,\
                                            ETL_LD_DT,JOB_STATUS,DESCRIPTION)\
                                            values (" + str(id) + ", '" \
                                       + client_name + "', 'SOURCE', '" \
                                       + file_tab_name + "', '" + etl_ld_dt \
                                       + "', 'FAILURE', '" \
                                       + error_description_2 + "');"
            insert_batch_audit_stmt_2 = ibm_db.prepare(
                conn, insert_batch_audit_sql_2)
            ibm_db.execute(insert_batch_audit_stmt_2)
        print("File is failed due to above discrepancy!")

## Copy file to IBM COS
This function write the content as CSV file in IBM COS.

In [5]:
def copy_to_s3(client, df, bucket, filepath):
    # Write CSV file into buffer
    csv_buf = StringIO()
    df.to_csv(
        csv_buf, header=True, index=False, sep='\t', encoding='utf-8',
        quoting=1)
    csv_buf.seek(0)

    # Put the csv buffer dataframe into the IBM COS bucket path
    client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
    print(
        f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')

## Call DataStage API
The below function will call DataStage API for running the Transformation Job.

In [6]:
def call_dsjob_api(dsjob, proj_name, userid, passwd):
    try:

        # Creating a syntax for an API Call
        # CP4D_URL is the always static value
        CP4D_URL = "https://xxx.xxxx.com"

        # Creation of JOB API format
        CP4D_JOB_API = "/ibm/iis/api/dscdesignerapi?api={apiName}&jobName" \
                       "={jobName}&projectName={projectName}&hostName" \
                       "=is-en-conductor-0.en-cond&getFullOutput" \
                       "={getFullOutput}"

        # Creation of Data Stage API for running job
        jobAPI = CP4D_URL + CP4D_JOB_API
        jobRunURL = jobAPI.format(
            apiName='runDSJob', jobName=dsjob,
            projectName=proj_name, getFullOutput="true")

        # Call Data Stage API
        contentType = "application/json"
        basic_user_and_pasword = base64.b64encode(
            '{}:{}'.format(
                userid,
                passwd).encode(
                'utf-8'))
        httpSession = requests.Session()
        response = httpSession.post(
            jobRunURL,
            headers={
                "Content-Type": contentType,
                "Authorization": "Basic " +
                                 basic_user_and_pasword.decode('utf-8')})

        # Printing a message for User Satisfaction
        print("Data Stage job is Running!")
        responseData = response.json()

        # If response is not succeeded, then print the error
        if response.status_code != 200:
            print(
                "Error occurred. Http status code is " +
                str(response.status_code))

        """
        If response is not succeeded then check
        that Job Status is FWF which means job is aborted
        and need re-compilation.
        """
        if not responseData["succeeded"]:
            if str(responseData["JobStatus"]) in {'FWF'}:
                print(
                    "The job is not in a runnable state, it may need to be"
                    " re-compiled. Its current state is 'Aborted'. "
                    "Job is re-compiled!")

                # Creation of Data Stage API for compiling job
                jobCompileURL = jobAPI.format(
                    apiName='compileDSJob',
                    jobName=dsjob,
                    projectName=proj_name,
                    getFullOutput="true")

                # Call Data Stage API for compiling job
                httpSession.post(jobCompileURL, headers={
                    "Content-Type": contentType,
                    "Authorization": "Basic " +
                                     basic_user_and_pasword.decode(
                                                                'utf-8')})
                print("Process is finished as Job is re-compiling!")

        # Job Status API
        if str(responseData["JobStatus"]) not in {'FWF'}:

            # If the job is not in aborted status then check its status.
            jobStatusURL = jobAPI.format(
                apiName='getDSJobStatus', jobName=dsjob,
                projectName=proj_name,
                getFullOutput="true")
            contentType = "application/json"
            httpSession = requests.Session()
            response = httpSession.post(jobStatusURL, headers={
                "Content-Type": contentType,
                "Authorization": "Basic " +
                                 basic_user_and_pasword.decode(
                                                            'utf-8')})
            responseData = response.json()

            """
            If job response status code is not succeeded then print error
            status.
            """
            if response.status_code != 200:
                print(
                    "Error occurred. Http status code is " +
                    str(response.status_code))
            else:
                responseData = response.json()

            """
            Recursive function for checking job Status for Running ("RUN",
            "RNW", "RNF", "RNS") and compiling ("FOK", "CMP")
            Condition run = 1 for recursive function.
            This function is polling DataStage.
            This is a nested function and working as an Encapsulation.
            """

            def call_api_func(
                    jobStatusURL, contentType, basic_user_and_pasword):
                response = httpSession.post(jobStatusURL, headers={
                    "Content-Type": contentType,
                    "Authorization": "Basic " +
                                     basic_user_and_pasword.decode(
                                                                'utf-8')})
                return response

            run = 1
            if "JobStatus" in responseData.keys():
                # Check if job is running or compiling
                if str(responseData["JobStatus"]) in {
                                            "RUN", "RNW", "RNF",
                                            "RNS", "CMP"}:
                    run = 0
                    while run == 0:
                        response = call_api_func(
                            jobStatusURL, contentType,
                            basic_user_and_pasword)
                        responseData = response.json()
                        """
                        If the status is not Run or Compile,
                        then give the job status as succeeded, failed or
                        aborted
                        """
                        if str(responseData["JobStatus"]) not in {
                                                        "RUN", "RNW", "RNF",
                                                        "RNS", "CMP"}:
                            responseData = response.json()
                            run = 1

                # Print the status of Job
                if str(responseData["JobStatus"]) == "FWW":
                    print("Job is Finished with Warnings")
                elif str(responseData["JobStatus"]) in {"SUCCESS", "FOK"}:
                    print("Job is Succeeded!")
                elif str(responseData["JobStatus"]) == "FAILED":
                    print("Job is Failed!")
                elif str(responseData["JobStatus"]) == "FWF":
                    print("Job is Failed - Aborted!")
            else:
                print(responseData)

    except Exception as e:
        print("Error Occurred.")
        print(e)

## Main Cell
Program executes from here. This cell will call all the required functions for performing the job.

### User Inputs
1. **client_name:** Name of client of this file.
1. **file_name:** Name of file that needs to be processed.
2. **target_table_name:** Name of the table where this file will be uploaded.
3. **dsjob:** Name of Job that needs to be run for transformation on the data of this file.
4. **proj_name:** Name of the DataStage Transformation project.
5. **userid:** User ID of the user on this IBM CP4D Cluster.
6. **passwd:** Password of the user on this IBM CP4D Cluster.

In [7]:
# Name of the bucket of s3 (IBM COS)
bucket_name = 'demo-files'

# Calling function to create connections
conn, connect, client_cos_con = create_connections(
    cos_con_metadata, db2_con_metadata)

# Calling functions to parse the user provided file
client_name, file_name, target_table_name, dsjob, proj_name, userid, \
    passwd = parse_config_file(bucket_name)

# Constructing file name
file_name = client_name + '/landing_zone/' + file_name

# Extract the schema and table names
schema_name, table_name = target_table_name.split(".")

# If user provided all the necessary values, call the main processing function
if client_name is not None and file_name is not None \
        and target_table_name is not None \
        and dsjob is not None \
        and proj_name is not None \
        and userid is not None \
        and passwd is not None:
    print(
        "Processing File: " + str(file_name) +
        " of client " + str(client_name))
    file_operations(
        bucket_name, file_name, schema_name, table_name, conn,
        dsjob, proj_name, userid, passwd, COL_COUNT_CON, ROW_COUNT_CON)

Processing File: BANK/landing_zone/BANK_LAT_TAB of client BANK
Equal Column count between file and table!
Equal number of rows between file rows and Trailer count!
insert into demo.audit_table (ID,CLIENT_ID,RECON_LYR,TABLE_NM,METRIC_TYP,METRIC_VAL,ETL_LD_DT) values (18, 'BANK', 'SOURCE', 'BANK_LAT_TAB', 'COUNT', 49559, '2021-10-19 13:52:08');
Copy 49559 rows to S3 Bucket demo-files at BANK/processed_zone/BANK_LAT_TAB, Done!
Data Stage job is Running!
Job is Succeeded!
