Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/apex-malhar into A…
Browse files Browse the repository at this point in the history
…PEXMALHAR-2066-JdbcPolling
  • Loading branch information
devtagare committed Jul 11, 2016
2 parents 782d393 + 9b62506 commit 4ed207d
Show file tree
Hide file tree
Showing 70 changed files with 4,974 additions and 1,070 deletions.
60 changes: 24 additions & 36 deletions benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,30 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>malhar-hive</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>malhar-hive</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
Expand Down Expand Up @@ -519,42 +543,6 @@
<version>${apex.core.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>0.13.1</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.13.1</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>0.13.1</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperatorPS;
import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;


/**
* <p>CassandraOutputOperator class.</p>
*
* @since 1.0.3
*/
public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Integer>{
public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>{

private int id = 0;

Expand Down
56 changes: 34 additions & 22 deletions contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<maxAllowedViolations>2791</maxAllowedViolations>
<maxAllowedViolations>2709</maxAllowedViolations>
<logViolationsToConsole>${checkstyle.console}</logViolationsToConsole>
</configuration>
</plugin>
Expand All @@ -244,7 +244,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<version>0.8.2.1</version>
<optional>true</optional>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -474,14 +474,14 @@
<!-- required by twitter -->
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<version>2.2.6</version>
<version>4.0.4</version>
<optional>true</optional>
</dependency>
<dependency>
<!-- required by twitter -->
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>2.2.6</version>
<version>4.0.4</version>
<optional>true</optional>
</dependency>
<dependency>
Expand Down Expand Up @@ -565,18 +565,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>0.13.1</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
Expand All @@ -603,12 +591,6 @@
<version>1.9.10</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.teknek</groupId>
<artifactId>hiveunit</artifactId>
<version>0.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.rosuda</groupId>
<artifactId>rengine</artifactId>
Expand Down Expand Up @@ -639,12 +621,42 @@
<artifactId>gemfire-core</artifactId>
<version>1.0.0-incubating.M1</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.fge</groupId>
<artifactId>json-schema-validator</artifactId>
<version>2.0.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>1.8.0.7</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

import java.util.Collection;

import javax.annotation.Nonnull;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.lib.db.AbstractBatchTransactionableStoreOutputOperator;

/**
* <p>
* Generic base output adaptor which creates a transaction at the start of window.&nbsp; Subclasses should provide implementation for getting the update statement. <br/>
* Generic Cassandra output adaptor which creates a transaction at the start of window.&nbsp; Subclasses should provide implementation for getting the update statement and setting the statement parameters. <br/>
* </p>
*
* <p>
Expand All @@ -48,29 +54,46 @@
* @param <T>type of tuple</T>
* @since 1.0.2
*/
public abstract class AbstractCassandraTransactionableOutputOperator<T> extends AbstractBatchTransactionableStoreOutputOperator<T, CassandraTransactionalStore> {
public abstract class AbstractCassandraTransactionableOutputOperator<T> extends AbstractBatchTransactionableStoreOutputOperator<T, CassandraTransactionalStore> implements ActivationListener<Context.OperatorContext>
{
private transient PreparedStatement updateCommand;

public AbstractCassandraTransactionableOutputOperator(){
super();
@Override
public void activate(OperatorContext context)
{
updateCommand = getUpdateCommand();
}

/**
* Gets the statement which insert/update the table in the database.
*
* @return the cql statement to update a tuple in the database.
*/
@Nonnull
protected abstract PreparedStatement getUpdateCommand();

/**
* Sets the parameter of the insert/update statement with values from the tuple.
*
* @param tuple tuple
* @return statement The statement to execute
* @throws DriverException
*/
protected abstract Statement getUpdateStatement(T tuple) throws DriverException;
protected abstract Statement setStatementParameters(PreparedStatement updateCommand, T tuple) throws DriverException;


@Override
public void processBatch(Collection<T> tuples)
{
BatchStatement batchCommand = store.getBatchCommand();
for(T tuple: tuples)
{
batchCommand.add(getUpdateStatement(tuple));
batchCommand.add(setStatementParameters(updateCommand, tuple));
}
}

@Override
public void deactivate()
{
}
}

This file was deleted.

0 comments on commit 4ed207d

Please sign in to comment.