Skip to content

Commit

Permalink
SPOI-8304: Cassandra Input operator example (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
DT-Priyanka authored and bhupeshchawda committed Oct 12, 2016
1 parent 8e17f46 commit 028ed34
Show file tree
Hide file tree
Showing 10 changed files with 655 additions and 1 deletion.
1 change: 1 addition & 0 deletions tutorials/README.md
@@ -1,5 +1,6 @@
| Application | Description |
| ----------------- | ----------- |
| cassandra-input-app | Example to show how to use Cassandra Input Operator.|
| cassandra-output-app | Example to show how to use Cassandra Output Operator.|
| dedup | Example showing use of Dedup operator |
| dynamic-partition | Example showing use of StatsListener with dynamic paritioning |
Expand Down
32 changes: 32 additions & 0 deletions tutorials/cassandraInput/README.md
@@ -0,0 +1,32 @@
# Sample Cassandra InputOperator implementation

This application reads data from cassandra database using [CassandraPOJOInputOperator](https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java)
and writes the records to a file using [GenericFileOutputOperator](https://github.com/apache/apex-malhar/blob/master/library/src/main/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperator.java) from [Apex Malhar](https://github.com/apache/apex-malhar).


Follow these steps to run this application:

**Step 1**: Update/add these properties in the file `src/main/resources/META-INF/properties-CassandraInputApplication.xml`:

| Property Name | Description |
| ------------- | ----------- |
| dt.operator.CassandraReader.prop.store.node | cassandra server node hostname or IP address |
| dt.operator.CassandraReader.prop.store.userName | cassandra server userName |
| dt.operator.CassandraReader.prop.store.password | cassandra server password |

**Step 2**: Create database keyspace and table and add entries

Go to the console and run (where _{path}_ is a suitable prefix):

shell> ./cqlsh -f {path}/src/main/resources/META-INF/example.cql

After this, please verify that `testapp.dt_meta` & `testapp.TestUser` tables are created and `TestUser` table has records.

**Step 3**: Build the code:

shell> mvn clean package

Upload the `target/cassandra-input-app-1.0.0-SNAPSHOT.apa` to the UI console if available or launch it from
the commandline using `apex` cli script.

**Step 4**: During launch use `src/main/resources/META-INF/properties-CassandraInputApplication.xml` as a custom configuration file; then verify that the output by checking files created by application where table data is written.
219 changes: 219 additions & 0 deletions tutorials/cassandraInput/pom.xml
@@ -0,0 +1,219 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.datatorrent</groupId>
<artifactId>cassandra-input-app</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>cassandra-input-application</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<apex.version>3.4.0</apex.version>
<malhar.version>3.5.0</malhar.version>
<datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.9</version>
<configuration>
<downloadSources>true</downloadSources>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.7</source>
<target>1.7</target>
<debug>true</debug>
<optimize>false</optimize>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/deps</outputDirectory>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>app-package-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}-apexapp</finalName>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/assemble/appPackage.xml</descriptor>
</descriptors>
<archiverConfig>
<defaultDirectoryMode>0755</defaultDirectoryMode>
</archiverConfig>
<archive>
<manifestEntries>
<Class-Path>${datatorrent.apppackage.classpath}</Class-Path>
<DT-Engine-Version>${apex.version}</DT-Engine-Version>
<DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name>
<DT-App-Package-Version>${project.version}</DT-App-Package-Version>
<DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name>
<DT-App-Package-Description>${project.description}</DT-App-Package-Description>
</manifestEntries>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>package</phase>
<configuration>
<target>
<move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" />
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
<execution>
<!-- create resource directory for xml javadoc-->
<id>createJavadocDirectory</id>
<phase>generate-resources</phase>
<configuration>
<tasks>
<delete dir="${project.build.directory}/generated-resources/xml-javadoc" />
<mkdir dir="${project.build.directory}/generated-resources/xml-javadoc" />
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.9.1</version>
<executions>
<execution>
<id>attach-artifacts</id>
<phase>package</phase>
<goals>
<goal>attach-artifact</goal>
</goals>
<configuration>
<artifacts>
<artifact>
<file>target/${project.artifactId}-${project.version}.apa</file>
<type>apa</type>
</artifact>
</artifacts>
<skipAttach>false</skipAttach>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-api</artifactId>
<version>${apex.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-common</artifactId>
<version>${apex.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
<version>${apex.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-library</artifactId>
<version>${malhar.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-contrib</artifactId>
<version>${malhar.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-contrib</artifactId>
<version>${malhar.version}</version>
<classifier>tests</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.7.8</version>
</dependency>
</dependencies>
</project>
47 changes: 47 additions & 0 deletions tutorials/cassandraInput/src/assemble/appPackage.xml
@@ -0,0 +1,47 @@
<!--
Copyright (c) 2015 DataTorrent, Inc.
All rights reserved.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>appPackage</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${basedir}/target/</directory>
<outputDirectory>/app</outputDirectory>
<includes>
<include>${project.artifactId}-${project.version}.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/target/deps</directory>
<outputDirectory>/lib</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/site/conf</directory>
<outputDirectory>/conf</outputDirectory>
<includes>
<include>*.xml</include>
</includes>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/META-INF</directory>
<outputDirectory>/META-INF</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/app</directory>
<outputDirectory>/app</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/src/main/resources/resources</directory>
<outputDirectory>/resources</outputDirectory>
</fileSet>
</fileSets>

</assembly>

@@ -0,0 +1,62 @@
package com.datatorrent.cassandra;

import java.util.List;

import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.cassandra.CassandraPOJOInputOperator;
import com.datatorrent.contrib.cassandra.CassandraTransactionalStore;
import com.datatorrent.lib.converter.Converter;
import com.datatorrent.lib.util.FieldInfo;
import com.google.common.collect.Lists;

/**
* Application to test Cassandra Input Operator from Apache Apex-Malhar library
* Application has following operators:<br/>
*
* @author priyanka
*
*/
@ApplicationAnnotation(name = "CassandraInputApplication")
public class Application implements StreamingApplication
{

@Override
public void populateDAG(DAG dag, Configuration conf)
{
List<FieldInfo> fieldInfos = Lists.newArrayList();
fieldInfos.add(new FieldInfo("id", "id", null));
fieldInfos.add(new FieldInfo("city", "city", null));
fieldInfos.add(new FieldInfo("fname", "firstName", null));
fieldInfos.add(new FieldInfo("lname", "lastName", null));

CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
CassandraPOJOInputOperator cassandraInput = dag.addOperator("CassandraReader", new CassandraPOJOInputOperator());
cassandraInput.setStore(transactionalStore);
cassandraInput.setFieldInfos(fieldInfos);
GenericFileOutputOperator<Object> fileOutput = dag.addOperator("fileWriter", new GenericFileOutputOperator<>());
fileOutput.setConverter(new ObjectConverter());

dag.addStream("CassandraToFile", cassandraInput.outputPort, fileOutput.input);
}

public static class ObjectConverter implements Converter<Object, byte[]>
{

private static transient Logger logger = LoggerFactory.getLogger(ObjectConverter.class);

@Override
public byte[] convert(Object userInfo)
{
logger.info("userInfo: " + userInfo.toString());
return userInfo.toString().getBytes();
}

}
}

0 comments on commit 028ed34

Please sign in to comment.