Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
APEXMALHAR-2515. Added support for writing into multiple HBase tables.
Browse files Browse the repository at this point in the history
  • Loading branch information
prasannapramod committed Sep 14, 2017
1 parent 712027a commit a725639
Show file tree
Hide file tree
Showing 9 changed files with 447 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@

import java.io.IOException;

import org.apache.hadoop.hbase.client.Append;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.HTable;

import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.db.AbstractStoreOutputOperator;

/**
* A base implementation of a StoreOutputOperator operator that stores tuples in HBase columns and offers non-transactional append.&nbsp; Subclasses should provide implementation for appending operations. <br>
* A base implementation of a StoreOutputOperator operator that stores tuples in HBase columns and offers
* non-transactional append.&nbsp; Subclasses should provide implementation for appending operations. <br>
* <p>
* <br>
* This class provides a HBase output operator that can be used to store tuples
Expand All @@ -47,8 +49,7 @@
* The tuple type
* @since 1.0.2
*/
public abstract class AbstractHBaseAppendOutputOperator<T>
extends AbstractStoreOutputOperator<T, HBaseStore> {
public abstract class AbstractHBaseAppendOutputOperator<T> extends AbstractHBaseOutputOperator<T> {
private static final transient Logger logger = LoggerFactory
.getLogger(AbstractHBaseAppendOutputOperator.class);

Expand All @@ -57,10 +58,10 @@ public AbstractHBaseAppendOutputOperator() {
}

@Override
public void processTuple(T tuple) {
public void processTuple(T tuple, HTable table) {
Append append = operationAppend(tuple);
try {
store.getTable().append(append);
table.append(append);
} catch (IOException e) {
logger.error("Could not append tuple", e);
DTThrowable.rethrow(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.hbase;

import org.apache.hadoop.hbase.client.HTable;

import com.datatorrent.api.Operator;
import com.datatorrent.lib.db.AbstractStoreOutputOperator;

/**
* A base implementation of a HBase output operator that stores tuples in HBase and offers
* non-transactional Insert.&nbsp; Subclasses should provide implementation for specific Hbase operations.
*/
public abstract class AbstractHBaseOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> implements OutputAdapter.Driver<T>, Operator.CheckpointNotificationListener
{
private transient OutputAdapter<T> outputAdapter;

public AbstractHBaseOutputOperator()
{
outputAdapter = new OutputAdapter<T>(store, this);
}

@Override
public void processTuple(T tuple)
{
outputAdapter.processTuple(tuple);
}

@Override
public abstract void processTuple(T tuple, HTable table);

/**
* Get the table name for tuple.
*
* Implementations can override this method to return the table name where the tuple should be written to.
* Return null to write to default table
* @param tuple The tuple
* @return The table name
*/
@Override
public String getTableName(T tuple)
{
return null;
}

@Override
public void errorTuple(T tuple)
{

}

@Override
public void beforeCheckpoint(long l)
{
outputAdapter.flushTuples();
}

@Override
public void checkpointed(long l)
{

}

@Override
public void committed(long l)
{

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@

import java.io.InterruptedIOException;

import javax.validation.constraints.Min;

import com.datatorrent.api.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;

import com.datatorrent.lib.db.AbstractStoreOutputOperator;

import com.datatorrent.netlet.util.DTThrowable;

/**
Expand All @@ -53,7 +48,7 @@
* The tuple type
* @since 1.0.2
*/
public abstract class AbstractHBasePutOutputOperator<T> extends AbstractStoreOutputOperator<T, HBaseStore> implements Operator.CheckpointNotificationListener {
public abstract class AbstractHBasePutOutputOperator<T> extends AbstractHBaseOutputOperator<T> {
private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBasePutOutputOperator.class);

public AbstractHBasePutOutputOperator()
Expand All @@ -62,9 +57,8 @@ public AbstractHBasePutOutputOperator()
}

@Override
public void processTuple(T tuple)
public void processTuple(T tuple, HTable table)
{
HTable table = store.getTable();
Put put = operationPut(tuple);
try {
table.put(put);
Expand All @@ -77,28 +71,6 @@ public void processTuple(T tuple)
}
}

@Override
public void beforeCheckpoint(long windowId)
{
try {
store.getTable().flushCommits();
} catch (InterruptedIOException e) {
DTThrowable.rethrow(e);
} catch (RetriesExhaustedWithDetailsException e) {
DTThrowable.rethrow(e);
}
}

@Override
public void checkpointed(long l) {

}

@Override
public void committed(long l) {

}

public abstract Put operationPut(T t);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@
package com.datatorrent.contrib.hbase;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.HTable;

import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator.ProcessingMode;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.db.AbstractAggregateTransactionableStoreOutputOperator;

/**
* A base implementation of an AggregateTransactionableStoreOutputOperator operator that stores tuples in HBase columns and provides batch append.&nbsp; Subclasses should provide implementation for appending operations. <br>
Expand All @@ -46,8 +40,6 @@
* for the tuple in the table.<br>
*
* <br>
* This class provides batch append where tuples are collected till the end
* window and they are appended on end window
*
* Note that since HBase doesn't support transactions this store cannot
* guarantee each tuple is written only once to HBase in case the operator is
Expand All @@ -63,50 +55,38 @@
* The tuple type
* @since 1.0.2
*/
public abstract class AbstractHBaseWindowAppendOutputOperator<T> extends AbstractAggregateTransactionableStoreOutputOperator<T, HBaseWindowStore> {
public abstract class AbstractHBaseWindowAppendOutputOperator<T> extends AbstractHBaseWindowOutputOperator<T> {
private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBaseWindowAppendOutputOperator.class);
private List<T> tuples;
private transient ProcessingMode mode;

/**
* Processing mode being a common platform feature should be set as an attribute.
* Even if the property is set, it wouldn't effect how the platform would process that feature for the operator. */
@Deprecated
public ProcessingMode getMode()
{
return mode;
}

@Deprecated
public void setMode(ProcessingMode mode)
{
this.mode = mode;
}

public AbstractHBaseWindowAppendOutputOperator() {
store = new HBaseWindowStore();
tuples = new ArrayList<T>();
}

@Override
public void storeAggregate() {
HTable table = store.getTable();
Iterator<T> it = tuples.iterator();
while (it.hasNext()) {
T t = it.next();
try {
Append append = operationAppend(t);
table.append(append);
} catch (IOException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
}

}
public void processTuple(T tuple, HTable table) {
try {
table.flushCommits();
} catch (RetriesExhaustedWithDetailsException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
} catch (InterruptedIOException e) {
Append append = operationAppend(tuple);
table.append(append);
} catch (IOException e) {
logger.error("Could not output tuple", e);
DTThrowable.rethrow(e);
}
tuples.clear();
}

/**
Expand All @@ -120,21 +100,13 @@ public void storeAggregate() {
*/
public abstract Append operationAppend(T t);

@Override
public void processTuple(T tuple) {
tuples.add(tuple);
}

@Override
public void setup(OperatorContext context)
{
mode=context.getValue(context.PROCESSING_MODE);
mode=context.getValue(OperatorContext.PROCESSING_MODE);
if(mode==ProcessingMode.EXACTLY_ONCE){
throw new RuntimeException("This operator only supports atmost once and atleast once processing modes");
}
if(mode==ProcessingMode.AT_MOST_ONCE){
tuples.clear();
}
super.setup(context);
}

Expand Down
Loading

0 comments on commit a725639

Please sign in to comment.