Skip to content

How to connect to IAE via HDFS Toolkit

Ahmad Nouri edited this page Aug 7, 2019 · 5 revisions

How to connect to the IBM Analytics Engine on Cloud via HDFS Toolkit

This document describes a step by step instruction to connect to the Hadoop file system on IBM cloud via streamsx.hdfs Toolkit.

1 - Create an IBM Analytics Engine (IAE) service

The following sample shows an IBM Analytics Engine service credentials:

{
 "apikey": "xyxyxyxyxyxyxyxyxyxyxyxyxyx",
  "cluster": {
  "cluster_id": "20180404-125209-123-VNhbnQRZ",
  "service_endpoints": {
    "ambari_console": "https://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:9443",
    "hive_hdfs": "hdfs:hive2://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/;ssl=true;transportMode=http;httpPath=gateway/default/hive",
    "livy": "https://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/gateway/default/livy/v1/batches",
    "notebook_gateway": "https://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/gateway/default/jkg/",
    "notebook_gateway_websocket": "wss://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/gateway/default/jkgws/",
    "oozie_rest": "https://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/gateway/default/oozie",
    "phoenix_hdfs": "hdfs:phoenix:thin:url=https://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/gateway/default/avatica;authentication=BASIC;serialization=PROTOBUF",
    "spark_history_server": "https://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/gateway/default/sparkhistory",
    "spark_sql": "hdfs:hive2://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/;ssl=true;transportMode=http;httpPath=gateway/default/spark",
    "ssh": "ssh clsadmin@chs-nxr-123-mn003.bi.services.us-south.bluemix.net",
    "webhdfs": "https://chs-nxr-123-mn001.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/"
    },
  } 
},
 "service_endpoints_ip": {
     "ssh": "ssh clsadmin@19.60.137.126",
     "webhdfs": "https://19.73.137.126:8443/gateway/default/webhdfs/v1/"
   .... 

The user name and the password is not more a part of credentials:
The user is clsadmin and you need to reset the password manually.
For a webdfs connection to the Hadoop file system, we need 3 parameters from Analytics Engine service credentials:

The hdfsUri has to be set to "webhdfs://IAE-server:port":
You can find the ip address and the port of HDFS server in line :
"webhdfs": "https://19.73.137.126:8443/gateway/default/webhdfs/v1/"
in service credential.
For example in this case you have to set the value of hdfsUri to :
hdfsUri : "webhdfs://19.73.137.126:port";

The value of hdfsUser has to be set to the user name of IAE.
You can find the user name in the following line of service credential:
"ssh": "ssh clsadmin@19.60.137.126"

The value of hdfsPassword has to be set to the password of IAE.
Reset the password as described in:
https://cloud.ibm.com/docs/services/AnalyticsEngine?topic=AnalyticsEngine-retrieve-cluster-credentials
And copy the created password.

For example:

 $hdfsUri      = "webhdfs://19.73.137.126:8443";
 $hdfsUser     = "clsadmin";
 $hdfsPassword = "IAEPASSWORD";  

Replace the value of "$hdfsUri" ,"$hdfsUser" and "$hdfsPassword" parameters with your IBM Analytics Engine service credential in the following SPL file.

2 - Set TLS version of the SSL handshake

If you get the following SSL connection error: javax.net.ssl.SSLHandshakeException: Received fatal alert: handshake_failure
You have to set the TLS version on your JAVA client.
There are two properties that a java application can use to specify the TLS version of the SSL handshake.
jdk.tls.client.protocols="TLSv1.2" and https.protocols="TLSv1.2"
The streams JAVA operators have possibility to use the vmArg parameter to specify additional JVM arguments.
Add the following parameter in every HDFS operator.
vmArg :"-Dhttps.protocols=TLSv1.2";

3 - Create a Streams application

Create a new project CloudSample in your workspace:

~/workspace/CloudSample/CloudSample.spl
~/workspace/CloudSample/Makefile

you can find the SPL and Make file in:

https://github.com/IBMStreams/streamsx.hdfs/tree/master/samples/CloudSample

The CloudSample runs with HDFS Toolkit version 4.0.0 or higher.

/**
 * This SPL application shows how to connect to a Hadoop instance running on Cloud via webhdfs
 * Specify the name of a directory to read HDFS files as a submission time parameter.
 * Additional required parameters are the hdfsUri of the HDFS server, username and password for authentication.
 * To get these credentials:
 * Create a Analytics Engine service on IBM cloud.
 * https://console.bluemix.net/catalog/?search=Analytics%20Engine
 * IBM Analytics Engine documentation 
 * https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction
 * Create a service credential for Analytics Engine service on IBM cloud.
 * And replace the value of hdfsUser and hdfsPassword in this spl file with values 
 * from user and password in IAE credential
 * The value of $hdfsUri is  webhdfs://<host>:<port>
 * Reset the password as described in: <br/>
 * https://cloud.ibm.com/docs/services/AnalyticsEngine?topic=AnalyticsEngine-retrieve-cluster-credentials
 * And copy the created password.
 * It is also possible to set the value of these parameters during the submission.
 * 
 * If you want to run this sample in on-perm Streams server, you have to unset 
 * The HADOOP_HOME environment variable
 * unset HADOOP_HOME   
 * 
 * The CloudSample composite first creates some files in directory via HDFS2FileSink.
 * Then HDFS2DirectoryScan reads the file names located in the test directory.
 * The HDFS2FileSource reads lines from files located in the test directory
 * See the toolkit's documentation for compile and run instructions.
 * @param hdfsUri HDFS URI to connect to, of the form  webhdfs://<host>:<port>
 * @param hdfsUser User to connect to HDFS.
 * @param hdfsPassword Password to connect to HDFS.
 * @param directory directory to read and write files.
 */

use com.ibm.streamsx.hdfs::* ;

composite CloudSample
{
  param
    expression<rstring> $hdfsUri      : getSubmissionTimeValue("hdfsUri", "webhdfs://19.73.137.126:8443") ;
    expression<rstring> $hdfsUser     : getSubmissionTimeValue("hdfsUser", "clsadmin") ;
    expression<rstring> $hdfsPassword : getSubmissionTimeValue("hdfsPassword", "IAE-Password") ;
    expression<rstring> $directory    : getSubmissionTimeValue("directory", "testDirectory") ;

  graph


    // The pulse is a Beacon operator that generates counter.
    stream<int32 counter> pulse = Beacon()
    {
        logic
            state : mutable int32 i = 0 ;
        param
            initDelay    : 1.0 ;
            iterations   : 25u ;
        output
            pulse : counter = i ++ ;
    }

    // creates lines and file names for HDFS2FileSink
    stream<rstring line, rstring filename> CreateLinesFileNames= Custom(pulse)
    {
        logic
            state :
            {
                mutable int32 count = 0 ;
                mutable timestamp ts = getTimestamp() ;
                mutable rstring strTimestamp = "" ;
            }

            onTuple pulse :
            {
                // every 5 lines in a new file
                if ( (counter % 5) == 0)
                {
                    ts = getTimestamp() ;
                }
                // create date time in yyyymmdd-hhMMss format
                strTimestamp =  (rstring) year(ts) +((month(ts) < 9u) ? "0" : "") 
                              + (rstring)(month(ts) + 1u) +((day(ts) < 10u) ? "0": "") 
                              + (rstring) day(ts) + "-" +((hour(ts) < 10u) ? "0" : "") 
                              + (rstring) hour(ts) +((minute(ts) < 10u) ? "0" :"") 
                              + (rstring) minute(ts) +((second(ts) < 10u) ? "0" : "") 
                              + (rstring) second(ts) ;
                submit({ line = "HDFS 4.0 and Streams test with webhdfs " + strTimestamp + " " +(rstring) counter, 
                filename = "/user/" + $hdfsUser + "/" + $directory + "/" + strTimestamp + "-hdfs.out" }, CreateLinesFileNames) ;
            }

    }

    // writes tuples that arrive on input port from CreateLinesFileNames to the output file. 
    // The file names created also by CreateLinesFileNames on input port
    stream<rstring out_file_name, uint64 size> HdfsFileSink = HDFS2FileSink(CreateLinesFileNames)
    {
        logic
            onTuple CreateLinesFileNames :
            {
                printStringLn("HDFS2FileSink message : " + line) ;
            }

        param
            hdfsUri           : $hdfsUri ;
            hdfsUser          : $hdfsUser ;
            hdfsPassword      : $hdfsPassword ;
            fileAttributeName : "filename" ;
            vmArg        : "-Dhttps.protocols=TLSv1.2";
    }

    // print out the file name and the size of file
    () as PrintHdfsSink = Custom(HdfsFileSink)
    {
        logic
            onTuple HdfsFileSink :
            {
                printStringLn("HDFS2FileSink  Wrote " +(rstring) size + " bytes to file " + out_file_name) ;
            }

    }

   // scan the given directory from HDFS, default to . which is the user's home directory
    stream<rstring fileNames> HdfsDirectoryScan = HDFS2DirectoryScan()
    {
        param
            initDelay    : 10.0 ;
            directory    : $directory ;
            hdfsUri      : $hdfsUri ;
            hdfsUser     : $hdfsUser ;
            hdfsPassword : $hdfsPassword ;
            vmArg        : "-Dhttps.protocols=TLSv1.2";
    }

    //print out the names of each file found in the directory
    () as PrintDirectoryScan = Custom(HdfsDirectoryScan)
    {
        logic
            onTuple HdfsDirectoryScan :
            {
                printStringLn("HDFS2DirectoryScan found file in directory: " + fileNames) ;
            }

    }

    // use the file name from directory scan to read the file
    stream<rstring lines> HdfsFileSource = HDFS2FileSource(HdfsDirectoryScan)
    {
        param
            hdfsUri      : $hdfsUri ;
            hdfsUser     : $hdfsUser ;
            hdfsPassword : $hdfsPassword ;
            vmArg        : "-Dhttps.protocols=TLSv1.2";
    }

    //print out the lines from file found in the directory
    () as PrintHdfsFileSource = Custom(HdfsFileSource)
    {
        logic
            onTuple HdfsFileSource :
            {
                printStringLn("HdfsFileSource line : " + lines) ;
            }

    }

}

3 - Make the SPL application

To create this SPL application the new version of HDFS toolkit (4.0.0) is required.

Download and copy the latest version of streamsx.hdfs in your workspace.

https://github.com/IBMStreams/streamsx.hdfs

The Makefile makes also the toolkit.

#####################################################################
# Copyright (C)2014, 2018 International Business Machines Corporation and
# others. All Rights Reserved.
#####################################################################


.PHONY: all distributed clean 

#HDFS_TOOLKIT_INSTALL = $(STREAMS_INSTALL)/toolkits/com.ibm.streamsx.hdfs
HDFS_TOOLKIT_INSTALL = ../../com.ibm.streamsx.hdfs
SPLC_FLAGS ?= -a
SPLC = $(STREAMS_INSTALL)/bin/sc

SPL_CMD_ARGS ?= -t $(HDFS_TOOLKIT_INSTALL)
SPL_MAIN_COMPOSITE = CloudSample

all: distributed

distributed:
	cd ../../com.ibm.streamsx.hdfs; ant; 
	JAVA_HOME=$(STREAMS_INSTALL)/java $(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS) --data-directory data

clean: 
	$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
	rm -rf output

Be aware of tabs in Makefile

4 - Run the SPL application

Download the HDFS Toolkit version 4.0.0 from:

https://github.com/IBMStreams/streamsx.hdfs/releases/tag/v4.0.0

And extract it in your workspace directory.

Change the database credentials in samples/CloudSample/CloudSample.spl file with your IBM Analytics Engine credentials and run

$> cd samples/CloudSample
$> make

If you want to run this sample in on-perm Streams server, you have to unset the HADOOP_HOME environment variable

unset HADOOP_HOME   

Start the application with

$> output/bin/standalone

Or you can submit the job on your local Streams server with:

$ streamtool submitjob output/CloudSample.sab

5 - Submit the spl application on IBM Streams Cloud

Create a Streaming Analytics on IBM Cloud

https://console.bluemix.net/catalog/?search=streams

Start the service

Lunch the application

It starts the IBM Streams console.

Now it is possible to submit a SAB file as streams job with IBM Streams console.

The SAB file is located in your project output directory:

output/CloudSample.sab

The SAB file includes all Hadoop client JAR libraries.

6 - Check the result on Hadoop server

To check the result, login on IAE server and check the contain of hadoop file system.

$> ssh clsadmin@<your-IAE-ip-address>
$ > hadoop fs -ls /user/clsadmin/testDirectory

[clsadmin@chs-nxr-593-mn003 ~]$ hadoop fs -ls /user/clsadmin/testDirectory
Found 4 items
-rw-r--r--   3 clsadmin biusers        285 2018-04-19 12:23 /user/clsadmin/testDirectory/20180419-142316-hdfs.out
-rw-r--r--   3 clsadmin biusers        285 2018-04-19 12:23 /user/clsadmin/testDirectory/20180419-142317-hdfs.out
-rw-r--r--   3 clsadmin biusers        290 2018-04-19 12:23 /user/clsadmin/testDirectory/20180419-142319-hdfs.out
-rw-r--r--   3 clsadmin biusers        290 2018-04-19 12:23 /user/clsadmin/testDirectory/20180419-142321-hdfs.out

For more details about the hadoop commands check this link:

https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html