Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

APEXMALHAR-2515 Operator maturity - HBase output operator Multi Table feature. #638

Merged
merged 1 commit into from
Sep 14, 2017

Conversation

prasannapramod
Copy link
Contributor

Implemented HBase output operator multi-table insertion feature.

@venkateshkottapalli @tushargosavi @sanjaypujare @PramodSSImmaneni please see.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.client.Append;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the import order change? Is the new order correct as per the import-order guideline or was the old one better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In apex checkstyle, org is listed before org.apache. IDE ordered it accordingly. Maybe the older imports were before the checkstyle configuration was created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, does it mean both the old and new orders are checkstyle compliant?

private static final Logger logger = LoggerFactory.getLogger(OutputAdapter.class);

HBaseStore store;
OutputAdapter.Driver driver;
Copy link
Contributor

@sanjaypujare sanjaypujare Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the qualifier OutputDriver. since Driver is local to this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also can we get rid of warnings like the below:

OutputAdapter.Driver is a raw type. References to generic type OutputAdapter.Driver should be parameterized

}
}

interface Driver<T>
Copy link
Contributor

@sanjaypujare sanjaypujare Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Driver always contains the store since Driver is implemented by most HBase operators you have modified. Why can't we make this more modular/OO by encapsulating the store also in the Driver? In this case Driver becomes an abstract class as follows:

abstract class Driver<T>
  {
      HBaseStore store;
      Driver(HBaseStore store) {
         this.store = store;
     }    
     void processTuple(T tuple) {
        String tableName = getTableName(tuple);
        HTable table = store.getTable(tableName);
        if (table == null) {
          logger.debug("No table found for tuple {}", tuple);
          errorTuple(tuple);
          return;
       }
       processTuple(tuple, table);
     }
    void processTuple(T tuple, HTable table); 
    String getTableName(T tuple);
    void errorTuple(T tuple);
  }

Then OutputAdapter.processTuple(T tuple) simply calls driver.processTuple(T tuple) as the logic has moved into the latter (and belongs there)? This also makes it easy to implement the batch/window modes that currently have been removed from those classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is AbstractHBaseOutputOperator series and AbstractHBaseWindowOutputOperator series operators belong to two different hierarchies that originate from generic store operators which are a common framework across different connectors such as jdbc, couch, memcache etc. The encapsulation was a way to reuse commonly needed functionality without sacrificing the hierarchy or backward compatibility.

this.driver = driver;
}

public void processTuple(T tuple)
Copy link
Contributor

@sanjaypujare sanjaypujare Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below for Driver<T>

import com.datatorrent.lib.db.AbstractStoreOutputOperator;

/**
* Created by lakshmi on 6/27/17.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More Javadoc to describe this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any Javadoc for this class yet

tuples.add(tuple);
}

@Override
public void setup(OperatorContext context)
{
mode=context.getValue(context.PROCESSING_MODE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context.PROCESSING_MODE => OperatorContext.PROCESSING_MODE

(to get rid of "The static field Context.OperatorContext.PROCESSING_MODE should be accessed in a static way")

private transient ProcessingMode mode;

@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More info about why deprecated? What is the alternative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processing mode being a common platform feature should be set as an attribute so not sure why it was made available as a property. Even if the property is set, it wouldn't effect how the platform would process that feature for the operator.

Copy link
Contributor

@sanjaypujare sanjaypujare Jul 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add Javadoc with this explanation so readers know why it is deprecated? That was the intent of my original comment

@@ -46,8 +40,6 @@
* for the tuple in the table.<br>
*
* <br>
* This class provides batch append where tuples are collected till the end
* window and they are appended on end window
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, the operator is no different from AbstractHBaseAppendOutputOperator. So:

  • why can't we support batch append even with multi-table support? Pls see comments below.
  • Isn't the class name ...WindowAppend... misleading if there is no batch or window append support? Can we make it deprecated and prevent creation of this operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking earlier they look similar and couldn't figure out a better way to avoid this. Even though the concrete implementations look similar they will function differently because of the operators they extend. Also, with the changes, both will support multiple tables.

The difference between the non-window and the window operator is this. In the case of restart from failure, the non-window operator writes the tuples from all the windows replayed from the checkpoint. The windowed operator doesn't write tuples belonging to those windows after the checkpoint that were completely processed in the prior run, before the operator failed. It only repeats the partial writes for the window during which the operator failed. This reduces the number of duplicates in cases where the writes will result in new entries in HBase. Exactly once is not being done because transaction support is not available in HBase. This functionality requires the user to provide additional resources from hbase such as a meta column family etc, which is why I think there were two operators created and not a single one.

In either case when tuples are written, they are not flushed immediately to hbase server for performance reasons. They are given to hbase api which would internally flush it when some internal conditions are satisfied like the buffer filling up. However, we do want to make sure all pending tuples are flushed in certain conditions to ensure no data loss in case of operator failure. The non-window operator explicitly flushes before checkpoint and the window operator flushes during end of window because fully processed windows will be skipped on replay after failure.

In the earlier implementation of windowed operator, I noticed that the operator was batching all the tuples within the operator into one big batch and then sending all the tuples to the HBase api at the end of the window. I think this is not necessary as the HBase api internally would batch and flush appropriately. Batching at the operator level on a per window basis will introduce latency and will also significantly increase memory usage. That is why I changed it and one of the reasons now the concrete operators look similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation - makes sense.

}

@Override
public void storeAggregate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By getting rid of this method/logic we are no longer supporting batch/window mode. Why can't we keep this method and do the following?

   Iterator<T> it = tuples.iterator();		
   while (it.hasNext()) {		
       T t = it.next();
       HTable table = store.getTable(getTableName(t));
       try {		
         Append append = operationAppend(t);		
         table.append(append);		
       } catch (IOException e) {		
         logger.error("Could not output tuple", e);		
         DTThrowable.rethrow(e);		
       }				
     }

Coupled with my comments on Adapter/Driver this may become even easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my description above. The window name is not for batching of tuples but rather for fault recovery behavior. The batching of tuples is not optimal as I described in the last paragraph above and hence I changed it. This also means the class hierarchy is slightly different but I don't think it will be a problem as I don't see anybody extending the class and overriding this method. Also since the operator is in contrib it is evolving in nature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

public void setup(OperatorContext context)
{
mode=context.getValue(context.PROCESSING_MODE);
if(mode==ProcessingMode.EXACTLY_ONCE){
throw new RuntimeException("This operator only supports atmost once and atleast once processing modes");
}
if(mode==ProcessingMode.AT_MOST_ONCE){
tuples.clear();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should re-instate this once batch/window mode is re-instated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see above, batching introduces unnecessary issues and is not needed.

}
}

interface Driver<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs for this interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be better to separate the functionality of processTuple/errorTuple and getTableName into 2 separate classes. I don't like the idea of individual operators being made responsible to map a tuple to a table-name. How about another interface like

interface TupleToTableMapper<T>
  {
    String getTableName(T tuple);
  }

And users of our HBase output operators just provide an instance of TupleToTableMapper? So declare it in AbstractHBaseOutputOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to do it but do we want to introduce another interface just for getting the table name. It would have to be an additional adapter that is passed into the OutputAdapter. Also it would have to a property in the operator with a default implementation instantiated within the operator that enables the existing behavior of returning the same table for all tuples for compatibility. Is this extra complexity needed in every case? If some operator wants the pluggability they could implement the getTableName method and delegate it to another adapter. Other operators like file output operator just have the operator return the filename as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

*/
public abstract class AbstractHBaseWindowOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, HBaseWindowStore> implements OutputAdapter.Driver<T>, Operator.CheckpointNotificationListener
{
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks a bit out of place here. May be we can move it to class Javadocs and/or to the respective methods such as endWindow or beforeCheckpoint?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I don't see endWindow() being defined and calling flush

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a developer comment for anyone working on this operator in future and not for end users, hence didn't put it in javadoc. The endWindow is being implemented by the parent class AbstractPassThruTransactionableStoreOutputOperator which will result in flush getting called.

import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;

/**
* Created by lakshmi on 6/27/17.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More javadocs for the class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add

}

@Override
public abstract void processTuple(T tuple, HTable table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage in declaring this abstract method here with @Override annotation? Subclasses are anyway required to implement this because of OutputAdapter.Driver interface unless this idiom is common or has some advantages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly for easy indication to developers extending this class that they need to implement the abstract method. I can remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually keep it. I can see the value of having both abstract and Override annotation here.

* Get the table name for tuple.
*
* Implementations can override this method to return the table name where the tuple should be written to.
* Return null to write to default table
Copy link
Contributor

@sanjaypujare sanjaypujare Jul 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? Following the code of OutputAdapter.processTuple(T) and HBaseMultiTableStore.getTable(String) I don't see how a "default table" is returned for null value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead why can't we leave this as abstract ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HBaseStore will return default table for the null table name. The reason we can't leave it abstract and upto the implementations is that it will break existing concrete operators extending from the operator as they are not implementing this method. By default, it should be compatible with existing operators from an api and functional perspective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -46,8 +40,6 @@
* the tuple in the table.<br>
*
* <br>
* This class provides a batch put where tuples are collected till the end
* window and they are put on end window
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All same comments for AbstractHBaseWindowAppendOutputOperator apply here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see above

*/
public class HBaseMultiTableStore extends HBaseWindowStore
{
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this comment up as the Java doc for the class with some more explanation about what this class does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, I need to remove this class. It was part of my initial trials and no longer needed.


protected HTable loadTable(String tableName) throws IOException
{
if ((allowedTableNames != null) && !ArrayUtils.contains(allowedTableNames, tableName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If allowedTableNames is initialized to new String[0] we don't need to check for null


private static final Logger logger = LoggerFactory.getLogger(HBaseMultiTableStore.class);

protected String[] allowedTableNames;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HBaseStore also has allowedTableNames and this is a subclass of HBaseStore. Why do we need duplication?

protected transient LoadingCache<String, HTable> tableCache;

@Min(1)
protected int maxOpenTables = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't MAX_VALUE too high to be default value for maxOpenTables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the confusion on HBaseMultiTableStore. I will remove it.

// HBase does support transactions so this method left empty

try {
flushTables();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to just flushTables for commitTransaction when we don't have any other transaction semantics implemented? e.g. beginTransactions, rollback etc are empty and do nothing. So implementing commitTransaction() like this is misleading?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also these other functions have a comment "HBase does support transactions so this method left empty" . Seems to be a typo with "not" missing. Can we fix these comemnts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for doing this as part of commitTransaction is that we are piggybacking on the mechanism to minimize rewrites on a replay from failure. Please see the long description above on the workings of these operators. There are existing common store operators that implement the fault tolerance mechanism by using the transaction pattern. If transactions are supported it can result in exactly once otherwise it will help in reducing writing duplicates significantly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

@sanjaypujare sanjaypujare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls address comments

@prasannapramod
Copy link
Contributor Author

@sanjaypujare Addressed comments and made changes where necessary. Please see.

@sanjaypujare
Copy link
Contributor

@prasannapramod both build checks are failing. Are these pre-existing failures?

@prasannapramod
Copy link
Contributor Author

Jenkins, re-test this, please.

@@ -126,14 +134,17 @@ public void connect() throws IOException {

@Override
public void beginTransaction() {
// HBase does support transactions so this method left empty
// HBase does not support transactions so this method left empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar change needed in rollbackTransaction() and isInTransaction() . And possibly others. Pls check everywhere for this comment and fix it.

Copy link
Contributor

@sanjaypujare sanjaypujare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of comments requesting small changes - mainly docs/Javadocs. Once they are done I'll approve

@prasannapramod prasannapramod force-pushed the APEXMALHAR-2515 branch 2 times, most recently from d476d7a to e9476e0 Compare September 11, 2017 21:58
@prasannapramod
Copy link
Contributor Author

@sanjaypujare @PramodSSImmaneni Added JavaDocs as suggested. Please see.

@pramodin
Copy link
Contributor

LGTM

Copy link
Contributor

@sanjaypujare sanjaypujare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prasannapramod I still see missing Javadocs in a few places and where you commented you will add them. Waiting for them

@prasannapramod prasannapramod force-pushed the APEXMALHAR-2515 branch 2 times, most recently from a0e4528 to a773ffd Compare September 13, 2017 13:36
@prasannapramod
Copy link
Contributor Author

@sanjaypujare I added JavaDocs for all newly created classes now. Hoping there is sufficient documentation for methods that I changed. Please see

Copy link
Contributor

@sanjaypujare sanjaypujare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @PramodSSImmaneni you can merge

@pramodin
Copy link
Contributor

@prasannapramod can you fix the commit message to include jira number

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
3 participants