Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 19 additions & 31 deletions docs/content.zh/docs/connectors/datastream/formats/hadoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ under the License.

## Project Configuration

Support for Hadoop is contained in the `flink-hadoop-compatibility`
Maven module.
对 Hadoop 的支持位于 `flink-hadoop-compatibility` Maven 模块中。

Add the following dependency to your `pom.xml` to use hadoop
将以下依赖添加到 `pom.xml` 中使用 hadoop

```xml
<dependency>
Expand All @@ -43,8 +42,7 @@ Add the following dependency to your `pom.xml` to use hadoop
</dependency>
```

If you want to run your Flink application locally (e.g. from your IDE), you also need to add
a `hadoop-client` dependency such as:
如果你想在本地运行你的 Flink 应用(例如在 IDE 中),你需要按照如下所示将 `hadoop-client` 依赖也添加到 `pom.xml`:

```xml
<dependency>
Expand All @@ -57,20 +55,13 @@ a `hadoop-client` dependency such as:

## Using Hadoop InputFormats

To use Hadoop `InputFormats` with Flink the format must first be wrapped
using either `readHadoopFile` or `createHadoopInput` of the
`HadoopInputs` utility class.
The former is used for input formats derived
from `FileInputFormat` while the latter has to be used for general purpose
input formats.
The resulting `InputFormat` can be used to create a data source by using
`ExecutionEnvironmen#createInput`.
在 Flink 中使用 Hadoop `InputFormats`,必须首先使用 `HadoopInputs` 工具类的 `readHadoopFile` 或 `createHadoopInput` 包装 Input Format。
前者用于从 `FileInputFormat` 派生的 Input Format,而后者必须用于通用的 Input Format。
生成的 `InputFormat` 可通过使用 `ExecutionEnvironmen#createInput` 创建数据源。

The resulting `DataStream` contains 2-tuples where the first field
is the key and the second field is the value retrieved from the Hadoop
InputFormat.
生成的 `DataStream` 包含 2 元组,其中第一个字段是键,第二个字段是从 Hadoop `InputFormat` 接收的值。

The following example shows how to use Hadoop's `TextInputFormat`.
下面的示例展示了如何使用 Hadoop`TextInputFormat`

{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
{{< tab "Java" >}}
Expand All @@ -82,7 +73,7 @@ DataStream<Tuple2<LongWritable, Text>> input =
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));

// Do something with the data.
// 对数据进行一些处理。
[...]
```

Expand All @@ -96,7 +87,7 @@ val input: DataStream[(LongWritable, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))

// Do something with the data.
// 对数据进行一些处理。
[...]
```

Expand All @@ -105,40 +96,37 @@ val input: DataStream[(LongWritable, Text)] =

## Using Hadoop OutputFormats

Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
that implements `org.apache.hadoop.mapred.OutputFormat` or extends
`org.apache.hadoop.mapreduce.OutputFormat` is supported.
The OutputFormat wrapper expects its input data to be a DataSet containing
2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
Flink 为 Hadoop `OutputFormats` 提供了一个兼容性包装器。支持任何实现 `org.apache.hadoop.mapred.OutputFormat` 或扩展 `org.apache.hadoop.mapreduce.OutputFormat` 的类。
`OutputFormat` 包装器期望其输入数据是包含键和值的 2-元组的 DataSet。这些将由 Hadoop `OutputFormat` 处理。

The following example shows how to use Hadoop's `TextOutputFormat`.
下面的示例展示了如何使用 Hadoop`TextOutputFormat`

{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
{{< tab "Java" >}}

```java
// Obtain the result we want to emit
// 获取我们希望发送的结果
DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...]

// Set up the Hadoop TextOutputFormat.
// 设置 the Hadoop TextOutputFormat
HadoopOutputFormat<Text, IntWritable> hadoopOF =
// create the Flink wrapper.
// 创建 Flink wrapper.
new HadoopOutputFormat<Text, IntWritable>(
// set the Hadoop OutputFormat and specify the job.
// 设置 Hadoop OutputFormat 并指定 job
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
// 使用 Hadoop TextOutputFormat 发送数据。
hadoopResult.output(hadoopOF);
```

{{< /tab >}}
{{< tab "Scala" >}}

```scala
// Obtain your result to emit.
// 获取我们希望发送的结果
val hadoopResult: DataStream[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
Expand Down