-
Notifications
You must be signed in to change notification settings - Fork 6
/
VersionedTap.java
148 lines (118 loc) · 4.08 KB
/
VersionedTap.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.backtype.cascading.tap;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import com.backtype.hadoop.datastores.VersionedStore;
import com.backtype.support.CascadingUtils;
import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.tap.hadoop.Hfs;
public class VersionedTap extends Hfs {
public static enum TapMode {SOURCE, SINK}
public Long version = null;
// a sane default for the number of versions of your data to keep around
private int versionsToKeep = 3;
// source-specific
public TapMode mode;
// sink-specific
private String newVersionPath;
public VersionedTap(String dir, Scheme<JobConf,RecordReader,OutputCollector,?,?> scheme, TapMode mode)
throws IOException {
super(scheme, dir);
this.mode = mode;
}
public VersionedTap setVersion(long version) {
this.version = version;
return this;
}
/**
* Sets the number of versions of your data to keep. Unneeded versions are cleaned up on creation
* of a new one. Pass a negative number to keep all versions.
*/
public VersionedTap setVersionsToKeep(int versionsToKeep) {
this.versionsToKeep = versionsToKeep;
return this;
}
public int getVersionsToKeep() {
return this.versionsToKeep;
}
public String getOutputDirectory() {
return getPath().toString();
}
public VersionedStore getStore(JobConf conf) throws IOException {
return new VersionedStore(FileSystem.get(conf), getOutputDirectory());
}
public String getSourcePath(JobConf conf) {
VersionedStore store;
try {
store = getStore(conf);
return (version != null) ? store.versionPath(version) : store.mostRecentVersionPath();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public String getSinkPath(JobConf conf) {
try {
VersionedStore store = getStore(conf);
return version == null ? store.createVersion() : store.createVersion(version);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
super.sourceConfInit(process, conf);
FileInputFormat.setInputPaths(conf, getSourcePath(conf));
}
@Override
public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
super.sinkConfInit(process, conf);
if (newVersionPath == null)
newVersionPath = getSinkPath(conf);
FileOutputFormat.setOutputPath(conf, new Path(newVersionPath));
}
@Override
public boolean resourceExists(JobConf jc) throws IOException {
return getStore(jc).mostRecentVersion() != null;
}
@Override
public boolean createResource(JobConf jc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public boolean deleteResource(JobConf jc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getIdentifier() {
String outDir = getOutputDirectory();
String versionString = (version == null) ? "LATEST" : version.toString();
return "manhattan"
+ ((mode == TapMode.SINK) ? "sink" : "source")
+ ":" + outDir + ":" + versionString;
}
@Override
public boolean commitResource(JobConf conf) throws IOException {
VersionedStore store = new VersionedStore(FileSystem.get(conf), getOutputDirectory());
if (newVersionPath != null) {
store.succeedVersion(newVersionPath);
CascadingUtils.markSuccessfulOutputDir(new Path(newVersionPath), conf);
newVersionPath = null;
store.cleanup(getVersionsToKeep());
}
return true;
}
@Override
public boolean rollbackResource(JobConf conf) throws IOException {
if (newVersionPath != null) {
getStore(conf).failVersion(newVersionPath);
newVersionPath = null;
}
return true;
}
}