Skip to content
This repository has been archived by the owner on Mar 31, 2020. It is now read-only.

basic example

ouyangzhe edited this page Nov 30, 2016 · 3 revisions

简单示例

Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对日志文件中的结构化数据进行解析,并上传到Datahub Topic中。

需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):

# test_basic.log
some,log,line1
some,log,line2
...

下面将创建Datahub Topic,并把每行日志的第一列和第三列作为一条记录写入Topic中。

创建Datahub Topic

使用datahub console创建topic语句示例如下:

ct test_project test_topic 1 1 (string c1, string c2);

Flume配置文件

在Flume安装目录的conf/文件夹下创建名为datahub_basic.conf的文件,并输入内容如下:

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log

# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
a1.sinks.k1.datahub.project = test_project
a1.sinks.k1.datahub.topic = test_topic
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = c1,c2,
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里serializer配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。

启动Flume

配置完成后,启动Flume并指定agent的名称和配置文件路径,添加**-Dflume.root.logger=INFO,console**选项可以将日志实时输出到控制台。

$ cd {YOUR_FLUME_DIRECTORY}
$ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console

写入成功,显示日志如下:

...
Write success. Event count: 2
...
Clone this wiki locally