Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge doc updates #8

Merged
merged 8 commits into from
Feb 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/correction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#performanceResults/perfJan2016.md

For *Kafka to Kafka Conclusions* I think we better change to Kafka to Kafka Observations, we should not conclude any thing until we have a strong proof.

A table structure would be easier to read

Static configuration such kafka, hadoop and hardware configuration and specification such cpu, memory, IO, network...

|Hardware/Software | DP=8, DR=3 | DP=10, DR=3 |
| -------------------------------- | --------------------------------- |------------------------------------|
| 3 m4.large HD x 5 m4.large kafka | Bytes/sec and record/sec | Bytes/sec and record/sec |
| 4 m4.large HD x 5 m4.large kafka | Bytes/sec and record/sec | Bytes/sec and record/sec |

Legend
HD: Hadoop Worker
DP : Default parallelism
DR : Default Replication

#Dataflow Development

1. I think you can merge all the document into one single md file and use the internal link
2. There is a lot of similarity and duplication in kafka and hdfs. Basically the hdfs is different only in the configuring the output and validate the output, you just need to highlight the different.
3. Need to make sure the sample code is run and one can read the document and follow the instruction steps and run successfully

#Scribengin Quickstart

Same as the devlelopment, make sure one like Badrish can read, understand, follow the instructions and setup successfully

#Terminology

I think you can find a better name to merge the features and terminology file. Need more introduction, description...


#Common rule for the document

For the concept, design and architecture document. You will need to give a lot of words, description , diagram, exampple.

For the developments, give as litle as possible. Provide a workable example that is easy to setup and run. Better have an automation script. You can document directly into the code. The same rule, the example code should have a litle number of line as possible. Avoid the unnecessary dependencies.
2 changes: 2 additions & 0 deletions docs/dataflowDevelopment/dataflowDevTableOfContents.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ Dataflow Development

The following guides will walk you through how to develop a dataflow, starting with setting up your development environment, and moving on to writing a dataflow and custom DataStreamOperators.

Full example code can be found [here](https://github.com/Nventdata/NeverwinterDP/tree/master/scribengin/dataflow/example/src/main/java/com/neverwinterdp/scribengin/dataflow/example) and [here](https://github.com/Nventdata/NeverwinterDP/tree/master/scribengin/dataflow/example/src/test/java/com/neverwinterdp/scribengin/dataflow/example)

- [Setting up your dev environment](devEnvironmentSetup.md)
- [A Simple Dataflow](simpleDataflowDev.md)
- [A Simple HDFS Dataflow](hdfsDataflowDev.md)
Expand Down
300 changes: 6 additions & 294 deletions docs/dataflowDevelopment/hdfsDataflowDev.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ HDFS Dataflow Development
This howto will show you how to develop your own dataflow and to push data from Kafka to HDFS.

#Sample code#
You can find sample code in the Scribengin package com.neverwinterdp.scribengin.example.*. The code comes complete with unit tests and full comments.
You can find sample code in the Scribengin package com.neverwinterdp.scribengin.dataflow.example.*. The code comes complete with unit tests and full comments.

#A Kafka to HDFS Dataflow

Expand Down Expand Up @@ -75,299 +75,33 @@ We'll use these properties to define our HDFS output DataSet.
//"output" - the dataset's name
//HDFSRegistryPath - where in the registry to HDFS config
//HDFSLocation - the path in HDFS to put our data
DataSet<Message> outputDs = dfl.createOutput(new HDFSStorageConfig("output", HDFSRegistryPath, HDFSLocation));
DataSet<Message> outputDs = dfl.createOutput(new HDFSStorageConfig("output", hdfsLocation));

```


#The Dataflow Submitter
```java
package com.neverwinterdp.scribengin.dataflow.example.hdfs;

import java.util.Properties;

import com.neverwinterdp.message.Message;
import com.neverwinterdp.scribengin.dataflow.DataSet;
import com.neverwinterdp.scribengin.dataflow.Dataflow;
import com.neverwinterdp.scribengin.dataflow.DataflowDescriptor;
import com.neverwinterdp.scribengin.dataflow.DataflowSubmitter;
import com.neverwinterdp.scribengin.dataflow.KafkaDataSet;
import com.neverwinterdp.scribengin.dataflow.Operator;
import com.neverwinterdp.scribengin.dataflow.example.simple.SimpleDataStreamOperator;
import com.neverwinterdp.scribengin.shell.ScribenginShell;
import com.neverwinterdp.storage.hdfs.HDFSStorageConfig;
import com.neverwinterdp.storage.kafka.KafkaStorageConfig;
import com.neverwinterdp.util.JSONSerializer;
import com.neverwinterdp.vm.client.VMClient;

public class ExampleHdfsDataflowSubmitter {
private String dataflowID;
private int defaultReplication;
private int defaultParallelism;

private int numOfWorker;
private int numOfExecutorPerWorker;

private String inputTopic;

private String hdfsLocation;

private ScribenginShell shell;
private DataflowSubmitter submitter;

String localAppHome;
String dfsAppHome;

public ExampleHdfsDataflowSubmitter(ScribenginShell shell){
this(shell, new Properties());
}

/**
* Constructor - sets shell to access Scribengin and configuration properties
* @param shell ScribenginShell to connect to Scribengin with
* @param props Properties to configure the dataflow
*/
public ExampleHdfsDataflowSubmitter(ScribenginShell shell, Properties props){
//This it the shell to communicate with Scribengin with
this.shell = shell;

//The dataflow's ID. All dataflows require a unique ID when running
dataflowID = props.getProperty("dataflow.id", "ExampleDataflow");

//The default replication factor for Kafka
defaultReplication = Integer.parseInt(props.getProperty("dataflow.replication", "1"));
//The number of DataStreams to deploy
defaultParallelism = Integer.parseInt(props.getProperty("dataflow.parallelism", "8"));

//The number of workers to deploy (i.e. YARN containers)
numOfWorker = Integer.parseInt(props.getProperty("dataflow.numWorker", "2"));
//The number of executors per worker (i.e. threads per YARN container)
numOfExecutorPerWorker = Integer.parseInt(props.getProperty("dataflow.numExecutorPerWorker", "2"));

//The kafka input topic
inputTopic = props.getProperty("dataflow.inputTopic", "input.topic");

//Where in HDFS to store our data
hdfsLocation = props.getProperty("dataflow.hdfsLocation", "build/working/storage/hdfs/output");


//The example hdfs dataflow local location
localAppHome = props.getProperty("dataflow.localapphome", "N/A");

//DFS location to upload the example dataflow
dfsAppHome = props.getProperty("dataflow.dfsAppHome", "/applications/dataflow/hdfsexample");
}

/**
* The logic to submit the dataflow
* @param kafkaZkConnect [host]:[port] of Kafka's Zookeeper conenction
* @throws Exception
*/
public void submitDataflow(String kafkaZkConnect) throws Exception{
//Upload the dataflow to HDFS
VMClient vmClient = shell.getScribenginClient().getVMClient();
vmClient.uploadApp(localAppHome, dfsAppHome);

Dataflow<Message, Message> dfl = buildDataflow(kafkaZkConnect);
//Get the dataflow's descriptor
DataflowDescriptor dflDescriptor = dfl.buildDataflowDescriptor();
//Output the descriptor in human-readable JSON
System.out.println(JSONSerializer.INSTANCE.toString(dflDescriptor));

//Ensure all your sources and sinks are up and running first, then...

//Submit the dataflow and wait until it starts running
submitter = new DataflowSubmitter(shell.getScribenginClient(), dfl).submit().waitForDataflowRunning(60000);

}

/**
* Wait for the dataflow to complete within the given timeout
* @param timeout Timeout in ms
* @throws Exception
*/
public void waitForDataflowCompletion(int timeout) throws Exception{
this.submitter.waitForDataflowStop(timeout);
}

/**
* The logic to build the dataflow configuration
* @param kafkaZkConnect [host]:[port] of Kafka's Zookeeper conenction
* @return
*/
public Dataflow<Message,Message> buildDataflow(String kafkaZkConnect){
//Create the new Dataflow object
// <Message,Message> pertains to the <input,output> object for the data
Dataflow<Message,Message> dfl = new Dataflow<Message,Message>(dataflowID);
dfl.
setDefaultParallelism(defaultParallelism).
setDefaultReplication(defaultReplication);

dfl.getWorkerDescriptor().setNumOfInstances(numOfWorker);
dfl.getWorkerDescriptor().setNumOfExecutor(numOfExecutorPerWorker);


//Define our input source - set name, ZK host:port, and input topic name
KafkaDataSet<Message> inputDs =
dfl.createInput(new KafkaStorageConfig("input", kafkaZkConnect, inputTopic));

//Our output sink will be HDFS
//"output" - the dataset's name
//HDFSRegistryPath - where in the registry to HDFS config
//HDFSLocation - the path in HDFS to put our data
DataSet<Message> outputDs = dfl.createOutput(new HDFSStorageConfig("output", hdfsLocation));

//Define which operator to use.
//This will be the logic that ties the input to the output
Operator<Message, Message> operator = dfl.createOperator("simpleOperator", SimpleDataStreamOperator.class);

//Connect your input to the operator
inputDs.useRawReader().connect(operator);
//Connect your operator to the output
operator.connect(outputDs);

return dfl;
}


public String getDataflowID() {
return dataflowID;
}

public String getInputTopic() {
return inputTopic;
}
public String getHDFSLocation() {
return hdfsLocation;
}


}

```
Full code can be found [here](https://github.com/Nventdata/NeverwinterDP/blob/master/scribengin/dataflow/example/src/main/java/com/neverwinterdp/scribengin/dataflow/example/hdfs/ExampleHdfsDataflowSubmitter.java)


#Writing a Unit Test

The main difference to note in this unit test is how we do the verification of our data stored in HDFS. We use an internal Scribengin class to be able to read our partitioned data easily. Study the function ```readDirsRecursive()``` for how it works.
The main difference to note in this unit test is how we do the verification of our data stored in HDFS. We use an internal Scribengin class to be able to read our partitioned data easily. Study the function ```readDirsRecursive()``` for how it works. Full code can be found [here](https://github.com/Nventdata/NeverwinterDP/blob/master/scribengin/dataflow/example/src/test/java/com/neverwinterdp/scribengin/dataflow/example/hdfs/ExampleHdfsDataflowSubmitterTest.java)


```java
import static org.junit.Assert.assertEquals;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.neverwinterdp.message.Message;
import com.neverwinterdp.registry.Registry;
import com.neverwinterdp.registry.RegistryConfig;
import com.neverwinterdp.scribengin.LocalScribenginCluster;
import com.neverwinterdp.scribengin.shell.ScribenginShell;
import com.neverwinterdp.storage.hdfs.HDFSStorage;
import com.neverwinterdp.storage.hdfs.HDFSStorageConfig;
import com.neverwinterdp.storage.hdfs.source.HDFSSource;
import com.neverwinterdp.storage.source.SourcePartitionStream;
import com.neverwinterdp.storage.source.SourcePartitionStreamReader;

public class ExampleHdfsDataflowSubmitterTest {
LocalScribenginCluster localScribenginCluster ;
ScribenginShell shell;
int numMessages = 10000;
String BASE_DIR = "build/working";
Registry registry;

/**
* Setup a local Scribengin cluster
* This sets up kafka, zk, and vm-master
* @throws Exception
*/
@Before
public void setup() throws Exception{

System.setProperty("app.home", BASE_DIR + "/scribengin");
System.setProperty("vm.app.dir", BASE_DIR + "/scribengin");

localScribenginCluster = new LocalScribenginCluster(BASE_DIR) ;
localScribenginCluster.clean();
localScribenginCluster.useLog4jConfig("classpath:scribengin/log4j/vm-log4j.properties");
localScribenginCluster.start();
registry = RegistryConfig.getDefault().newInstance().connect();
shell = localScribenginCluster.getShell();

}

/**
* Destroy the local Scribengin cluster and clean up
* @throws Exception
*/
@After
public void teardown() throws Exception{
localScribenginCluster.shutdown();
}

/**
* Test our HDFS Dataflow Submitter
* 1. Write data to Kafka into the input topic
* 2. Run our dataflow
* 3. Use a HDFS stream reader to read the data in the output HDFS partition and make sure its all present
* @throws Exception
*/
@Test
public void TestExampleSimpleDataflowSubmitterTest() throws Exception{
//Create a new DataflowSubmitter with default properties
ExampleHdfsDataflowSubmitter eds = new ExampleHdfsDataflowSubmitter(shell);

//Populate kafka input topic with data
sendKafkaData(localScribenginCluster.getKafkaCluster().getKafkaConnect(), eds.getInputTopic());

//Submit the dataflow and wait for it to start running
eds.submitDataflow(localScribenginCluster.getKafkaCluster().getZKConnect());
//Output the registry for debugging purposes
shell.execute("registry dump");

//Get basic info on the dataflow
shell.execute("dataflow info --dataflow-id "+eds.getDataflowID());

//Give the dataflow a second to get going
Thread.sleep(1000);

//Do some very simple verification to ensure our data has been moved correctly
//We'll use some basic HDFS classes to do the reading, so we'll configure our local HDFS FS here
Path path = new Path(eds.getHDFSLocation());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
int numEntries = readDirsRecursive(fs, eds.getHDFSRegistryPath(), eds.getHDFSLocation());
fs.close();

//Make sure all the messages were written
assertEquals(numEntries, numMessages);

//Get basic info on the dataflow
shell.execute("dataflow info --dataflow-id "+eds.getDataflowID());
}

/**
* Use our HDFSSource to read our data through all partitions
* @param fs HDFS File system
* @param registryPath Path to HDFS info in the registry
* @param hdfsPath Path our data is saved to
* @return count of records in HDFS
* @throws Exception
*/
private int readDirsRecursive(FileSystem fs, String registryPath, String hdfsPath) throws Exception{
private int readDirsRecursive(FileSystem fs, String hdfsPath) throws Exception{
int count = 0;

//Configure our HDFS storage object
HDFSStorageConfig storageConfig = new HDFSStorageConfig("test", registryPath, hdfsPath);
HDFSStorageConfig storageConfig = new HDFSStorageConfig("output", hdfsPath);
HDFSStorage storage = new HDFSStorage(registry, fs, storageConfig);

//Get our source object from the storage object
Expand All @@ -387,28 +121,6 @@ public class ExampleHdfsDataflowSubmitterTest {

return count;
}

/**
* Push data to Kafka
* @param kafkaConnect Kafka's [host]:[port]
* @param inputTopic Topic to write to
*/
private void sendKafkaData(String kafkaConnect, String inputTopic){
Properties props = new Properties();
props.put("metadata.broker.list", kafkaConnect);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);
for(int i = 0; i < numMessages; i++){
producer.send(new KeyedMessage<String, String>(inputTopic, "test", Integer.toString(i)));
}
producer.close();
}

}
```


Expand Down
2 changes: 1 addition & 1 deletion docs/dataflowDevelopment/simpleDataflowDev.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
This howto will show you how to develop your own dataflow and operator. You will then learn how to submit your dataflow to Scribengin to be run.

#Sample code#
You can find sample code in the Scribengin package com.neverwinterdp.scribengin.example.*. The code comes complete with unit tests and full comments.
You can find sample code in the Scribengin package com.neverwinterdp.scribengin.dataflow.example.*. The code comes complete with unit tests and full comments.

#A Simple Dataflow

Expand Down
Loading