From f9d0e0e6639e31dc05deb7dcca04a83e4610c536 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Tue, 9 May 2017 10:25:33 -0700 Subject: [PATCH 1/2] Migrate hadoop inputformat to website --- src/documentation/io/built-in-hadoop.md | 156 ++++++++++++++++++++++++ src/documentation/io/built-in.md | 2 +- 2 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 src/documentation/io/built-in-hadoop.md diff --git a/src/documentation/io/built-in-hadoop.md b/src/documentation/io/built-in-hadoop.md new file mode 100644 index 00000000000..435903dc447 --- /dev/null +++ b/src/documentation/io/built-in-hadoop.md @@ -0,0 +1,156 @@ +--- +layout: default +title: "Apache Hadoop InputFormat IO" +permalink: /documentation/io/built-in/hadoop/ +--- + +[Pipeline I/O Table of Contents]({{site.baseurl}}/documentation/io/io-toc/) + +# Hadoop InputFormat IO + +A HadoopInputFormatIO is a Transform for reading data from any source which +implements Hadoop InputFormat. For example- Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. + +HadoopInputFormatIO has to make several performance trade-offs in connecting to InputFormat, so if there is another Beam IO Transform specifically for connecting to your data source of choice, we would recommend using that one, but this IO Transform allows you to connect to many data sources that do not yet have a Beam IO Transform. + +You will need to pass a Hadoop Configuration with parameters specifying how the read will occur. Many properties of the Configuration are optional, and some are required for certain InputFormat classes, but the following properties must be set for all InputFormats: + +mapreduce.job.inputformat.class: The InputFormat class used to connect to your data source of choice. +key.class: The key class returned by the InputFormat in 'mapreduce.job.inputformat.class'. +value.class: The value class returned by the InputFormat in 'mapreduce.job.inputformat.class'. + +For example: +```java +Configuration myHadoopConfiguration = new Configuration(false); +// Set Hadoop InputFormat, key and value class in configuration +myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass, + InputFormat.class); +myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class); +myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class); +``` + +You will need to check to see if the key and value classes output by the InputFormat have a Beam Coder available. If not, You can use withKeyTranslation/withValueTranslation to specify a method transforming instances of those classes into another class that is supported by a Beam Coder. These settings are optional and you don't need to specify translation for both key and value. + +For example: +```java +SimpleFunction myOutputKeyType = +new SimpleFunction() { + public MyKeyClass apply(InputFormatKeyClass input) { + // ...logic to transform InputFormatKeyClass to MyKeyClass + } +}; +SimpleFunction myOutputValueType = +new SimpleFunction() { + public MyValueClass apply(InputFormatValueClass input) { + // ...logic to transform InputFormatValueClass to MyValueClass + } +}; +``` + +### Reading using Hadoop InputFormat IO +Pipeline p = ...; // Create pipeline. +// Read data only with Hadoop configuration. + +```java +p.apply("read", + HadoopInputFormatIO.read() + .withConfiguration(myHadoopConfiguration); +``` + +// Read data with configuration and key translation (Example scenario: Beam Coder is not +available for key class hence key translation is required.). + +```java +p.apply("read", + HadoopInputFormatIO.read() + .withConfiguration(myHadoopConfiguration) + .withKeyTranslation(myOutputKeyType); +``` + +// Read data with configuration and value translation (Example scenario: Beam Coder is not +available for value class hence value translation is required.). + +```java +p.apply("read", + HadoopInputFormatIO.read() + .withConfiguration(myHadoopConfiguration) + .withValueTranslation(myOutputValueType); +``` + +// Read data with configuration, value translation and key translation (Example scenario: Beam Coders are not available for both key class and value class of InputFormat hence key and value translation is required.). + +```java +p.apply("read", + HadoopInputFormatIO.read() + .withConfiguration(myHadoopConfiguration) + .withKeyTranslation(myOutputKeyType) + .withValueTranslation(myOutputValueType); +``` + +# Examples for specific InputFormats + +### Cassandra - CqlInputFormat + +To read data from Cassandra, org.apache.cassandra.hadoop.cql3.CqlInputFormat +CqlInputFormat can be used which needs following properties to be set. + +Create Cassandra Hadoop configuration as follows: + +```java +Configuration cassandraConf = new Configuration(); +cassandraConf.set("cassandra.input.thrift.port", "9160"); +cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp); +cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner"); +cassandraConf.set("cassandra.input.keyspace", "myKeySpace"); +cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily"); +cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class); +cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class); +cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class); +``` + +Call Read transform as follows: + +```java +PCollection> cassandraData = + p.apply("read", + HadoopInputFormatIO.read() + .withConfiguration(cassandraConf) + .withValueTranslation(cassandraOutputValueType); +``` + +The CqlInputFormat key class is java.lang.Long Long, which has a Beam Coder. The CqlInputFormat value class is com.datastax.driver.core.Row Row, which does not have a Beam Coder. Rather than write a new coder, you can provide your own translation method as follows: + +```java +SimpleFunction cassandraOutputValueType = SimpleFunction() +{ + public String apply(Row row) { + return row.getString('myColName'); + } +}; +``` + +### Elasticsearch - EsInputFormat + +To read data from Elasticsearch, EsInputFormat can be used which needs following properties to be set. + +Create ElasticSearch Hadoop configuration as follows: + +```java +Configuration elasticSearchConf = new Configuration(); +elasticSearchConf.set("es.nodes", ElasticsearchHostIp); +elasticSearchConf.set("es.port", "9200"); +elasticSearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName"); +elasticSearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class); +elasticSearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class); +elasticSearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class); +``` + +Call Read transform as follows: + +```java +PCollection> elasticData = p.apply("read", + HadoopInputFormatIO.read().withConfiguration(elasticSearchConf)); +``` + +The org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat key class is +org.apache.hadoop.io.Text Text and value class is org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable. Both key and value classes have Beam Coders. diff --git a/src/documentation/io/built-in.md b/src/documentation/io/built-in.md index 6a73f6b2f0c..3bb53430d03 100644 --- a/src/documentation/io/built-in.md +++ b/src/documentation/io/built-in.md @@ -35,7 +35,7 @@ Consult the [Programming Guide I/O section]({{site.baseurl }}/documentation/prog

Google Cloud PubSub

-

Apache Hadoop InputFormat

+

Apache Hadoop InputFormat

Apache HBase

MongoDB

JDBC

From 89e839735fe3fd739d1aa2850f0ec26999bb76af Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Tue, 9 May 2017 14:03:17 -0700 Subject: [PATCH 2/2] fix review comments --- src/documentation/io/built-in-hadoop.md | 91 ++++++++++++++++++------- 1 file changed, 66 insertions(+), 25 deletions(-) diff --git a/src/documentation/io/built-in-hadoop.md b/src/documentation/io/built-in-hadoop.md index 435903dc447..dda584395fa 100644 --- a/src/documentation/io/built-in-hadoop.md +++ b/src/documentation/io/built-in-hadoop.md @@ -8,16 +8,15 @@ permalink: /documentation/io/built-in/hadoop/ # Hadoop InputFormat IO -A HadoopInputFormatIO is a Transform for reading data from any source which -implements Hadoop InputFormat. For example- Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. +A `HadoopInputFormatIO` is a transform for reading data from any source that implements Hadoop's `InputFormat`. For example, Cassandra, Elasticsearch, HBase, Redis, Postgres, etc. -HadoopInputFormatIO has to make several performance trade-offs in connecting to InputFormat, so if there is another Beam IO Transform specifically for connecting to your data source of choice, we would recommend using that one, but this IO Transform allows you to connect to many data sources that do not yet have a Beam IO Transform. +`HadoopInputFormatIO` allows you to connect to many data sources that do not yet have a Beam IO transform. However, `HadoopInputFormatIO` has to make several performance trade-offs in connecting to `InputFormat`. So, if there is another Beam IO transform for connecting specifically to your data source of choice, we recommend you use that one. -You will need to pass a Hadoop Configuration with parameters specifying how the read will occur. Many properties of the Configuration are optional, and some are required for certain InputFormat classes, but the following properties must be set for all InputFormats: +You will need to pass a Hadoop `Configuration` with parameters specifying how the read will occur. Many properties of the `Configuration` are optional and some are required for certain `InputFormat` classes, but the following properties must be set for all `InputFormat` classes: -mapreduce.job.inputformat.class: The InputFormat class used to connect to your data source of choice. -key.class: The key class returned by the InputFormat in 'mapreduce.job.inputformat.class'. -value.class: The value class returned by the InputFormat in 'mapreduce.job.inputformat.class'. +- `mapreduce.job.inputformat.class` - The `InputFormat` class used to connect to your data source of choice. +- `key.class` - The `Key` class returned by the `InputFormat` in `mapreduce.job.inputformat.class`. +- `value.class` - The `Value` class returned by the `InputFormat` in `mapreduce.job.inputformat.class`. For example: ```java @@ -29,7 +28,11 @@ myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class); myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class); ``` -You will need to check to see if the key and value classes output by the InputFormat have a Beam Coder available. If not, You can use withKeyTranslation/withValueTranslation to specify a method transforming instances of those classes into another class that is supported by a Beam Coder. These settings are optional and you don't need to specify translation for both key and value. +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +You will need to check if the `Key` and `Value` classes output by the `InputFormat` have a Beam `Coder` available. If not, you can use `withKeyTranslation` or `withValueTranslation` to specify a method transforming instances of those classes into another class that is supported by a Beam `Coder`. These settings are optional and you don't need to specify translation for both key and value. For example: ```java @@ -47,9 +50,13 @@ new SimpleFunction() { }; ``` +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + ### Reading using Hadoop InputFormat IO -Pipeline p = ...; // Create pipeline. -// Read data only with Hadoop configuration. + +#### Read data only with Hadoop configuration. ```java p.apply("read", @@ -57,8 +64,13 @@ p.apply("read", .withConfiguration(myHadoopConfiguration); ``` -// Read data with configuration and key translation (Example scenario: Beam Coder is not -available for key class hence key translation is required.). +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +#### Read data with configuration and key translation + +For example scenario: Beam `Coder` is not available for key class hence key translation is required. ```java p.apply("read", @@ -67,8 +79,13 @@ p.apply("read", .withKeyTranslation(myOutputKeyType); ``` -// Read data with configuration and value translation (Example scenario: Beam Coder is not -available for value class hence value translation is required.). +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +#### Read data with configuration and value translation + +For example scenario: Beam `Coder` is not available for value class hence value translation is required. ```java p.apply("read", @@ -77,7 +94,13 @@ p.apply("read", .withValueTranslation(myOutputValueType); ``` -// Read data with configuration, value translation and key translation (Example scenario: Beam Coders are not available for both key class and value class of InputFormat hence key and value translation is required.). +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +#### Read data with configuration, value translation and key translation + +For example scenario: Beam Coders are not available for both `Key` class and `Value` class of `InputFormat` hence key and value translation is required. ```java p.apply("read", @@ -87,14 +110,15 @@ p.apply("read", .withValueTranslation(myOutputValueType); ``` +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + # Examples for specific InputFormats ### Cassandra - CqlInputFormat -To read data from Cassandra, org.apache.cassandra.hadoop.cql3.CqlInputFormat -CqlInputFormat can be used which needs following properties to be set. - -Create Cassandra Hadoop configuration as follows: +To read data from Cassandra, use `org.apache.cassandra.hadoop.cql3.CqlInputFormat`, which needs the following properties to be set: ```java Configuration cassandraConf = new Configuration(); @@ -108,6 +132,10 @@ cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Ob cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class); ``` +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + Call Read transform as follows: ```java @@ -118,7 +146,11 @@ PCollection> cassandraData = .withValueTranslation(cassandraOutputValueType); ``` -The CqlInputFormat key class is java.lang.Long Long, which has a Beam Coder. The CqlInputFormat value class is com.datastax.driver.core.Row Row, which does not have a Beam Coder. Rather than write a new coder, you can provide your own translation method as follows: +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +The `CqlInputFormat` key class is `java.lang.Long` `Long`, which has a Beam `Coder`. The `CqlInputFormat` value class is `com.datastax.driver.core.Row` `Row`, which does not have a Beam `Coder`. Rather than write a new coder, you can provide your own translation method, as follows: ```java SimpleFunction cassandraOutputValueType = SimpleFunction() @@ -128,12 +160,14 @@ SimpleFunction cassandraOutputValueType = SimpleFunction> elasticData = p.apply("read", HadoopInputFormatIO.read().withConfiguration(elasticSearchConf)); ``` -The org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat key class is -org.apache.hadoop.io.Text Text and value class is org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable. Both key and value classes have Beam Coders. +```py + # The Beam SDK for Python does not support Hadoop InputFormat IO. +``` + +The `org.elasticsearch.hadoop.mr.EsInputFormat`'s `EsInputFormat` key class is `org.apache.hadoop.io.Text` `Text`, and its value class is `org.elasticsearch.hadoop.mr.LinkedMapWritable` `LinkedMapWritable`. Both key and value classes have Beam Coders.