Skip to content

Commit

Permalink
0004255: HBase data loader using HBase client API
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Jan 21, 2020
1 parent 74c9ed7 commit 4b9f99a
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 4 deletions.
4 changes: 3 additions & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -287,7 +287,9 @@ subprojects { subproject ->
exclude group: 'org.slf4j'
}

provided 'com.google.cloud:google-cloud-bigquery:1.99.0'
provided ('com.google.cloud:google-cloud-bigquery:1.99.0') {
exclude group: 'com.google.protobuf'
}


testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar')
Expand Down
19 changes: 17 additions & 2 deletions symmetric-assemble/src/asciidoc/appendix/hbase.ad
@@ -1,7 +1,22 @@

=== HBase

The HBase database is a load only database in SymmetricDS. It does require the phoenix jdbc driver though to utilize it. This driver should be downloaded and placed in the /lib folder of SymmetricDS and restarted.
==== Empty HBase

Tested with jar : phoenix-5.0.0-HBase-2.0-client.jar
If you are setting up replication to HBase and the tables are not already present in Hbase SymmetricDS can create them through the phoenix JDBC driver. This driver maintains some additional meta data about the tables so that they can be accessed using SQL through the JDBC driver.

This configuration is setup as a <<Load Only Node>> in SymmetricDS. It does require the phoenix jdbc driver though to utilize it. This driver should be downloaded and placed in the /lib folder of SymmetricDS and restarted.

==== Existing HBase

If you are setting up replication to an HBase database that already has tables present you will need to follow the steps below.

Setup a new H2 node that will contain all the SymmetricDS runtime tables. To do this go through the <<Add Node>> setup and select type H2 and provide a name for the database (it will create a new one locally if not present). This will allow SymmetricDS to create tables such as incoming_batch etc to maintain the replication.

Next you will need to setup a <<Channels, channel>> (or use the default channel) and set the `data_loader_type` to *hbase*.

Finally setup a parameter that contains the path of your hbase-site.xml file.

hbase.site.xml.path

All changes captured will now use the HBase data loader to load into an existing HBase table.
@@ -0,0 +1,61 @@
package org.jumpmind.symmetric.io;

import java.util.List;

import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;

public class HbaseDataLoaderFactory extends DefaultDataLoaderFactory implements
ISymmetricEngineAware, IBuiltInExtensionPoint {

protected ISymmetricEngine engine;

protected String typeName = "hbase";

protected String hbaseSiteXmlPath;

protected IDataWriter hbaseDataWriter;

public HbaseDataLoaderFactory() {
super();
}

@Override
public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
this.parameterService = engine.getParameterService();
}

@Override
public String getTypeName() {
return typeName;
}

public void setTypeName(String typeName) {
this.typeName = typeName;
}


@Override
public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect,
TransformWriter transformWriter, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

if (hbaseDataWriter == null) {
this.hbaseSiteXmlPath = parameterService.getString(ParameterConstants.HBASE_SITE_XML_PATH);
this.hbaseDataWriter = new HbaseDatabaseWriter(this.hbaseSiteXmlPath);
}
return this.hbaseDataWriter;
}
}
@@ -0,0 +1,129 @@
package org.jumpmind.symmetric.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.jumpmind.db.model.Column;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter;

public class HbaseDatabaseWriter extends AbstractDatabaseWriter {

private Configuration config;
private Connection connection;
private Table table;
private String hbaseSiteXmlPath;

public HbaseDatabaseWriter(String hbaseSiteXmlPath) {
this.hbaseSiteXmlPath = hbaseSiteXmlPath;
}

protected void setup() {
try {
if (config == null) {
Configuration config = HBaseConfiguration.create();
config.addResource(new Path(this.hbaseSiteXmlPath));
}

if (connection == null) {
log.debug("Establishing connection to HBase");
connection = ConnectionFactory.createConnection(config);
}
if (table == null) {
log.debug("Connected to HBase, now looking up table " + this.targetTable.getName());
table = connection.getTable(TableName.valueOf(this.targetTable.getName()));
}
} catch (IOException e) {
log.error("Unable to connect to HBase ", e);
}
}

protected LoadStatus put(CsvData data) {
try {
setup();
Put put = new Put(data.getPkData(this.targetTable)[0].getBytes());

String[] values = data.getParsedData(CsvData.ROW_DATA);
Column[] columns = sourceTable.getColumns();

List<Put> putList = new ArrayList<Put>();

for (int i = 0; i < columns.length; i++) {
if (columns[i].getName().contains(":")) {
log.debug("Preparing put statement into Hbase.");
String[] split = columns[i].getName().split(":");
byte[] columnFamily = split[0].getBytes();
byte[] columnName = split[1].getBytes();

put.addColumn(columnFamily, columnName, values[i].getBytes());
putList.add(put);
}
}

log.debug("Put list for HBase complete with a size of " + putList.size());
table.put(putList);
log.debug("Put rows into HBase now closing connection");
table.close();
} catch (IOException e) {
log.error("Unable to load data into HBase ", e);
throw new RuntimeException(e);
}

return LoadStatus.SUCCESS;
}

@Override
protected LoadStatus insert(CsvData data) {
return put(data);
}

@Override
protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
setup();

String[] pkData = data.getParsedData(CsvData.PK_DATA);
if (pkData != null && pkData.length == 1) {
Delete delete = new Delete(pkData[0].getBytes());
try {
table.delete(delete);
} catch (IOException e) {
log.error("Unable to delete data for table " + this.targetTable.getName() +
", for primary key " + pkData[0]);
throw new RuntimeException(e);
}
}
return LoadStatus.SUCCESS;

}

@Override
protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) {
return put(data);
}

@Override
protected boolean create(CsvData data) {
return false;
}

@Override
protected boolean sql(CsvData data) {
return false;
}

@Override
protected void logFailureDetails(Throwable e, CsvData data, boolean logLastDmlDetails) {
}


}
1 change: 1 addition & 0 deletions symmetric-core/build.gradle
Expand Up @@ -10,6 +10,7 @@ apply from: symAssembleDir + '/common.gradle'
compile "com.fasterxml.jackson.core:jackson-databind:2.9.8"
compile "com.google.code.gson:gson:2.8.5"
compile "org.springframework:spring-core:$springVersion"
provided "org.apache.hbase:hbase-client:1.3.6"

testCompile project(path: ':symmetric-util', configuration: 'testArtifacts')
testCompile project(path: ':symmetric-jdbc', configuration: 'testArtifacts')
Expand Down
Expand Up @@ -520,6 +520,8 @@ private ParameterConstants() {
public final static String GOOGLE_BIG_QUERY_PROJECT_ID = "google.bigquery.project.id";
public final static String GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH = "google.bigquery.security.credentials.path";

public final static String HBASE_SITE_XML_PATH = "hbase.site.xml.path";

public static Map<String, ParameterMetaData> getParameterMetaData() {
return parameterMetaData;
}
Expand Down
@@ -1,4 +1,4 @@
package org.jumpmind.db.platform.hbase;
package org.jumpmind.db.platform.hbase;

import javax.sql.DataSource;

Expand Down

0 comments on commit 4b9f99a

Please sign in to comment.