This repository has been archived by the owner on Apr 13, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
AbstractColumnFamilyOutputFormat.java
159 lines (142 loc) · 6.36 KB
/
AbstractColumnFamilyOutputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.hadoop2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.thrift.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
/**
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
* values) as Cassandra rows (and respective columns) in a given
* ColumnFamily.
*
* <p>
* As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
* Keyspace and ColumnFamily in your
* Hadoop job Configuration. The {@link ConfigHelper} class, through its
* {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
* simple.
* </p>
*
* <p>
* For the sake of performance, this class employs a lazy write-back caching
* mechanism, where its record writer batches mutations created based on the
* reduce's inputs (in a task-specific map), and periodically makes the changes
* official by sending a batch mutate request to Cassandra.
* </p>
* @param <Y>
*/
public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y>
{
public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class);
/**
* Check for validity of the output-specification for the job.
*
* @param context
* information about the job
* @throws IOException
* when output should not be attempted
*/
public void checkOutputSpecs(JobContext context)
{
checkOutputSpecs(context.getConfiguration());
}
protected void checkOutputSpecs(Configuration conf)
{
if (ConfigHelper.getOutputKeyspace(conf) == null)
throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
if (ConfigHelper.getOutputPartitioner(conf) == null)
throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
if (ConfigHelper.getOutputInitialAddress(conf) == null)
throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
}
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
{
checkOutputSpecs(job);
}
/**
* The OutputCommitter for this format does not write any data to the DFS.
*
* @param context
* the task context
* @return an output committer
* @throws IOException
* @throws InterruptedException
*/
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
{
return new NullOutputCommitter();
}
/**
* Connects to the given server:port and returns a client based on the given socket that points to the configured
* keyspace, and is logged in with the configured credentials.
*
* @param host fully qualified host name to connect to
* @param port RPC port of the server
* @param conf a job configuration
* @return a cassandra client
* @throws Exception set of thrown exceptions may be implementation defined,
* depending on the used transport factory
*/
public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
{
logger.debug("Creating authenticated client for CF output format");
TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port, conf);
TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
{
Map<String, String> creds = new HashMap<String, String>();
creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(conf));
creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(conf));
AuthenticationRequest authRequest = new AuthenticationRequest(creds);
client.login(authRequest);
}
logger.debug("Authenticated client for CF output format created successfully");
return client;
}
/**
* An {@link OutputCommitter} that does nothing.
*/
private static class NullOutputCommitter extends OutputCommitter
{
public void abortTask(TaskAttemptContext taskContext) { }
public void cleanupJob(JobContext jobContext) { }
public void commitTask(TaskAttemptContext taskContext) { }
public boolean needsTaskCommit(TaskAttemptContext taskContext)
{
return false;
}
public void setupJob(JobContext jobContext) { }
public void setupTask(TaskAttemptContext taskContext) { }
}
}