How to use the error output port in streamsx.hbase
This document describes how to use the Error output port in streamsx.hbase toolkit. The streamsx.hbase toolkit from version 3.4.0 has been improved with a new output port to return the error messages.
The tuple in error output port have only one string as error message.
The error messages contains the current data time stamp, operator name, error message and also the input tuple.
It helps the user to find easily the root cause of errors.
Here is an example:
IBM Streams 4.3.0.0 or higher
streamsx.hbase toolkit Version 3.4.0 or higher
https://github.com/IBMStreams/streamsx.hbase/releases/tag/v3.4.0
Apacahe HBase Version 1.2 or higher
The input tuple has the following type:
<rstring tableName, rstring character, rstring colF, rstring colQ, rstring value>
And the error message has these information:
20190219-110421 PutToTable: Table 'streamsSample_lotrrr' does not exists. ,TUPLE{streamsSample_lotrrr,Eowyn,appearance,overall,beautiful}
The following SPL application shows also how to use successAttr parameter and dynamic table name tableNameAttribute coming with input tuple.
At the end it shows how to use the dynamic file name in FileSink operator to create daily or hourly a new log file.
The following file parameter for FileSink creates hourly a new log file
file : "{localtime:%Y%m%d-%H}-HBASE-errors.log" ;
For example:
data/20190220-14-HBASE-errors.log
Or the following file parameter for FileSink operator creates daily a new log file
file : "{localtime:%Y%m%d}-HBASE-errors.log" ;
For example:
data/20190220-HBASE-errors.log
The log file names have a date time information to find easily the error logs.
Before you begin with your test, you have to create some test tables on your HBase server.
echo "create 'streamsSample_lotr', 'appearance','location'" | hbase shell
echo "create 'streamsSample_lotr1', 'appearance','location'" | hbase shell
echo "create 'streamsSample_lotr2', 'appearance','location'" | hbase shell
Copy the HBase configuration file from your server in ‘etc’ directory in your project. In this case it is not necessary to install and HADOOP and HBase client on your streams server.
Copy the test files allAttributes.csv and queries.csv into ‘data’ directory.
/data/allAttributes.csv
#tableName,character,columnFamily,columnQualifier,value
streamsSample_lotr,Boromir,location,beginFellowship,travelling
streamsSample_lotr,Boromir,location,beginFellowship,tall
streamsSample_lotr,Boromir,location,beginTwoTowers,dead
streamsSample_lotr1,Aragorn,appearance,height,tall
streamsSample_lotr1,Boromir,location,beginFellowship,travelling
streamsSample_lotr2,Boromir,location,beginFellowship,travelling
streamsSample_lotr2,Boromir,location,beginTwoTowers,dead
streamsSample_lotr2,Gandalf,location,beginTwoTowers,dead
streamsSample_lotr5,Boromir,location,beginFellowship,travelling
data/queries.csv
streamsSample_lotr,Boromir,location,beginFellowship
streamsSample_lotr1,Gandalf,location,beginTwoTowers
streamsSample_lotr,Gandalf,location,tall
streamsSample_lotr2,Arwen,appearance,overall
streamsSample_lotr3,Eowyn,appearance,overall
streamsSample_lotr4,Harry,location,beginFellowship
#####################################################################
# Copyright (C)2014, 2019 International Business Machines Corporation and
# others. All Rights Reserved.
#####################################################################
.PHONY: all clean
SPLC_FLAGS = -t $(STREAMS_INSTALL)/toolkits/com.ibm.streamsx.hdfs --data-directory data
#SPLC_FLAGS = -t ../streamsx.hbase/com.ibm.streamsx.hbase --data-directory data
SPLC = $(STREAMS_INSTALL)/bin/sc
SPL_CMD_ARGS ?=
SPL_COMP1NAME=HBaseErrorPort
SPL_MAIN_COMPOSITE1 = application::$(SPL_COMP1NAME)
BUILD_OUTPUT_DIR = output
all: data clean
$(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE1) --output-dir ./$(BUILD_OUTPUT_DIR) $(SPL_CMD_ARGS)
data:
mkdir data
clean:
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE1) --output-dir output
-rm -rf toolkit.xml
/*
* This SPL sample demonstrate how to use the Error output port in streamsx.hbase toolkit.
* The FileSource opearotr ReadAttributes reads attributes from a "csv" file
* and put them into a Hbase tables (PutAttributes).
* The Hbase table name are defined in the first column of lines in the "allAttributes.csv" file.
* If the processing of file "allAttributes.csv" is finished, (onPunct ReadAttributes)
* the PrintAttributes submit the queries file name queries.csv to the next streams (ReadQueries).
* it reads the queries from a “csv” file and forward them to the GetQueries.
* The Hbase table name are defined in the first column of lines in the "queries.csv" file.
* The GetQueries is a HBASEGet operator and gets the result from Hbase table if the query match.
* The PrintResultQueries prints the results from GetQueries.
* In case of any Hbase error the error output ports PutErrors and GutErrors
* returns the error messages.
* The CollectErrors collets incoming errors and forwards them to the FileSink
* operator SaveErrors.
* The FileSink opeartor uses the dynamic file name.
* It can create depending to the current date time a daily or hourly a new log file.
* The name of new log files begin with date time.
* For example. (20190220-14-hbase-errors.log)
*
*/
namespace application ;
use com.ibm.streamsx.hbase::HBASEGet ;
use com.ibm.streamsx.hbase::HBASEPut ;
composite HBaseErrorPort
{
param
// copy the Hbase configuration file "hbase-site.xml" into "etc" directory.
// or set the path and the file name of hbase-site.xml during the submit of job.
// streamtool submitjob output/application.HBaseErrorPort.sab -P hbaseSite="/home/user1/hbase-site.xml"
expression<rstring> $hbaseSite : getSubmissionTimeValue("hbaseSite", "etc/hbase-site.xml") ;
graph
stream<rstring tableName, rstring character, rstring colF, rstring colQ, rstring value> ReadAttributes = FileSource()
{
param
file : "allAttributes.csv" ;
format : csv ;
}
stream<rstring fileName> PrintAttributes = Custom(ReadAttributes)
{
logic
onTuple ReadAttributes : printStringLn("Attributes : " + (rstring)ReadAttributes);
onPunct ReadAttributes :
{
if(currentPunct() == Sys.FinalMarker)
{
submit ({ fileName = "queries.csv"}, PrintAttributes);
}
}
}
// Read queries from a csv file.
stream<rstring tableName, rstring who, rstring infoType, rstring requestedDetail> ReadQueries = FileSource(PrintAttributes)
{
param
format : csv ;
initDelay : 5.0 ;
}
() as printQueries = Custom(ReadQueries)
{
logic
onTuple ReadQueries : printStringLn("Queries : " +(rstring) ReadQueries) ;
}
// Put tuples into HBASE.
// In case of any error during the PUT it returns the error message in PutErrors output port.
(stream<boolean success, rstring tableName> PutAttributes ; stream<rstring errorText> PutErrors) = HBASEPut(ReadAttributes)
{
param
hbaseSite : $hbaseSite;
tableNameAttribute : tableName ;
rowAttrName : "character" ;
columnFamilyAttrName : "colF" ;
columnQualifierAttrName : "colQ" ;
valueAttrName : "value" ;
successAttr : "success" ;
}
() as printPutAttributes = Custom(PutAttributes)
{
logic
onTuple PutAttributes : printStringLn("PutAttributes : " +(rstring) PutAttributes) ;
}
// Send the query to HBASE. The value is placed in value attribute.
// Not all queries will return results. To distinguish between a value
// with empty string because no entry was in the database and a value
// that is empty because the database value field was empty, you can use the
// outputCountAttribute, which gives the number of outputs found.
// "streamsSample_lotr,Boromir,location,beginFellowship, "1"
// "streamsSample_lotr","Harry","location","beginFellowship","",0 -- no row for Harry
// "streamsSample_lotr1","Harry","location","beginFellowship","",0 -- no row for Harry
// "streamsSample_lotr2","Aragorn","location","beginFellowship","",0 -- no entry for Aragorn location:beginFellowship
// In case of any error during the GET it returns the error message in GetErrors output port.
(stream<rstring tableName, rstring who, rstring infoType, rstring requestedDetail, rstring value, int32 numResults> GetQueries ;
stream<rstring errorText> GetErrors) = HBASEGet(ReadQueries)
{
param
hbaseSite : $hbaseSite;
tableNameAttribute : tableName ;
rowAttrName : "who" ;
columnFamilyAttrName : "infoType" ;
columnQualifierAttrName : "requestedDetail" ;
outAttrName : "value" ;
outputCountAttr : "numResults" ;
}
() as PrintResultQueries = Custom(GetQueries)
{
logic
onTuple GetQueries : printStringLn("GetQueries " + (rstring)GetQueries);
}
// The Custom operator collects the error messages from Put and Get streams.
// And forward them to the FileSink
stream<rstring errorMessage> CollectErrors = Custom(PutErrors ; GetErrors)
{
logic
onTuple PutErrors :
{
submit({ errorMessage = errorText }, CollectErrors) ;
}
onTuple GetErrors :
{
submit({ errorMessage = errorText }, CollectErrors) ;
}
}
// creates a new log file and writes incoming error messages into a file.
() as SaveErrors = FileSink(CollectErrors)
{
logic
onTuple CollectErrors :
{
printStringLn("Error Text : " + errorMessage) ;
}
param
// file : "{localtime:%Y%m%d}-HBASE-errors.log";
// it creates daily a new log file
// file : "{localtime:%Y%m%d-%H}-HBASE-errors.log";
// it creates hourly a new log file
file : "{localtime:%Y%m%d-%H%M}-HBASE-errors.log" ;
// It creates every minute a new log file
closeMode : dynamic ;
}
}
Now you can make you application.
cd HBasdErrorPort
make
It is possible to run your application in standalone mode.
output/bin/standalone
or you can submit your sab file as new job.
streamtool submitjob output/application.HBaseErrorPort.sab
If everything goes well, your SPL application creates some log files in data directory:
For example:
data/20190220-1643-HBASE-errors.log
And the log file contains information about the caused errors:
cat data/20190220-1643-HBASE-errors.log
"20190220-164352 , ErrorHbasePut , Table 'streamsSample_lotr5' does not exists. , TUPLE{streamsSample_lotr5,Boromir,location,beginFellowship,travelling}"