/
HdfsStoreBolt.java
147 lines (126 loc) · 4.84 KB
/
HdfsStoreBolt.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/**
* Copyright (c) Acroquest Technology Co, Ltd. All Rights Reserved.
* Please read the associated COPYRIGHTS file for more details.
*
* THE SOFTWARE IS PROVIDED BY Acroquest Technolog Co., Ltd.,
* WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDER BE LIABLE FOR ANY
* CLAIM, DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING
* OR DISTRIBUTING THIS SOFTWARE OR ITS DERIVATIVES.
*/
package acromusashi.stream.bolt.hdfs;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import acromusashi.stream.bolt.AmConfigurationBolt;
import acromusashi.stream.entity.StreamMessage;
import acromusashi.stream.exception.InitFailException;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
/**
* 受信したメッセージをHDFSに出力するBolt<br>
*
* @author kimura
*/
public class HdfsStoreBolt extends AmConfigurationBolt
{
/** serialVersionUID */
private static final long serialVersionUID = -2877852415844943739L;
/** logger */
private static final Logger logger = LoggerFactory.getLogger(HdfsStoreBolt.class);
/** HDFSへの出力コンポーネント */
private transient HdfsOutputSwitcher delegate = null;
/**
* パラメータを指定せずにインスタンスを生成する。
*/
public HdfsStoreBolt()
{}
/**
* {@inheritDoc}
*/
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
super.prepare(stormConf, context, collector);
String componentId = context.getThisComponentId();
int taskId = context.getThisTaskId();
HdfsStoreConfig config = new HdfsStoreConfig();
config.setOutputUri((String) stormConf.get("hdfsstorebolt.outputuri"));
config.setFileNameHeader((String) stormConf.get("hdfsstorebolt.filenameheader"));
config.setFileSwitchIntarval(((Long) stormConf.get("hdfsstorebolt.interval")).intValue());
config.setFileNameBody("_" + componentId + "_" + taskId + "_");
boolean isPreprocess = true;
Object isPreprocessObj = stormConf.get("hdfsstorebolt.executepreprocess");
if (isPreprocessObj != null && isPreprocessObj instanceof Boolean)
{
isPreprocess = ((Boolean) isPreprocessObj).booleanValue();
}
try
{
// HDFSファイルシステム取得
Configuration conf = new Configuration();
Path dstPath = new Path(config.getOutputUri());
FileSystem fileSystem = dstPath.getFileSystem(conf);
// HDFSに対する前処理実施。一時ファイルを本ファイルにリネームする。
if (isPreprocess)
{
HdfsPreProcessor.execute(fileSystem, config.getOutputUri(),
config.getFileNameHeader() + config.getFileNameBody(),
config.getTmpFileSuffix());
}
this.delegate = new HdfsOutputSwitcher();
this.delegate.initialize(fileSystem, config, System.currentTimeMillis());
}
catch (Exception ex)
{
logger.warn("Failed to HDFS write initialize.", ex);
throw new InitFailException(ex);
}
}
@Override
public void onMessage(StreamMessage message)
{
try
{
this.delegate.appendLine(message.toString(), System.currentTimeMillis());
}
catch (IOException ex)
{
String logFormat = "Fail write to hdfs. Dispose received message. : Message={0}";
logger.warn(MessageFormat.format(logFormat, message), ex);
}
ack();
}
@Override
public void cleanup()
{
// cleanupメソッドはLocalClusterでしか呼ばれないため注意
logger.info("HDFSSinkBolt Cleanup Start.");
try
{
this.delegate.close();
}
catch (IOException ex)
{
logger.warn("Failed to HDFS write close. Skip close.", ex);
}
logger.info("HDFSSinkBolt Cleanup finished.");
}
/**
* {@inheritDoc}
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
// This class not has downstream component.
}
}