This repository has been archived by the owner on Sep 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
/
DocumentDBInputSplit.java
176 lines (152 loc) · 5.95 KB
/
DocumentDBInputSplit.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
package com.microsoft.azure.documentdb.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.azure.documentdb.Database;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.FeedOptions;
import com.microsoft.azure.documentdb.QueryIterable;
import com.microsoft.azure.documentdb.SqlParameter;
import com.microsoft.azure.documentdb.SqlParameterCollection;
import com.microsoft.azure.documentdb.SqlQuerySpec;
/**
* An input split that represents one collection from documentdb. It reads data one page at a time and
* sends one by one document to the mapper.
* In order to be able to use it, you need to set the required configuration properties for the input split.
*/
public class DocumentDBInputSplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit {
private static final Log LOG = LogFactory.getLog(DocumentDBWritable.class);
private final int MAX_PAGE_SIZE = 700;
private Text host, key, dbName, collName, query;
private Iterator<Document> documentIterator;
public DocumentDBInputSplit() {
this.host = new Text();
this.key = new Text();
this.dbName = new Text();
this.collName = new Text();
this.query = new Text();
}
public DocumentDBInputSplit(String host, String key, String dbName, String collName, String query) {
this.host = new Text(host);
this.key = new Text(key);
this.dbName = new Text(dbName);
this.collName = new Text(collName);
if (query == null) {
query = "";
}
this.query = new Text(query);
}
/**
* Gets the list of DocumentDBInputSplit used.
*/
public static List<InputSplit> getSplits(Configuration conf, String dbHost, String dbKey, String dbName,
String[] collNames, String query) {
int internalNumSplits = collNames.length;
List<InputSplit> splits = new LinkedList<InputSplit>();
for (int i = 0; i < internalNumSplits; i++) {
splits.add(new DocumentDBInputSplit(dbHost, dbKey, dbName, collNames[i].trim(), query));
}
return splits;
}
/**
* @inheritDoc
*/
@Override
public long getLength() {
return Integer.MAX_VALUE;
}
/**
* @inheritDoc
*/
@Override
public String[] getLocations() throws IOException {
// Since we're pulling the data from DocumentDB, it's not localized
// to any single node so just return localhost.
return new String[] { "localhost" };
}
public String getCollectionName() {
return this.collName.toString();
}
/**
* @inheritDoc
*/
public void readFields(DataInput in) throws IOException {
this.host.readFields(in);
this.key.readFields(in);
this.dbName.readFields(in);
this.collName.readFields(in);
this.query.readFields(in);
}
/**
* @inheritDoc
*/
public void write(DataOutput out) throws IOException {
this.host.write(out);
this.key.write(out);
this.dbName.write(out);
this.collName.write(out);
this.query.write(out);
}
/**
*
* @return an Iterator for documents in the collection wrapped by the split.
* @throws IOException if a read operation fails on documentdb
*/
public Iterator<Document> getDocumentIterator() throws IOException {
if (this.documentIterator != null)
return this.documentIterator;
Database db;
DocumentCollection coll;
DocumentClient client;
try {
LOG.debug("Connecting to " + this.host + " and reading from collection " + this.collName);
ConnectionPolicy policy = ConnectionPolicy.GetDefault();
policy.setUserAgentSuffix(DocumentDBConnectorUtil.UserAgentSuffix);
client = new DocumentClient(this.host.toString(), this.key.toString(), policy,
ConsistencyLevel.Session);
db = DocumentDBConnectorUtil.GetDatabase(client, this.dbName.toString());
if (db == null) {
throw new IOException(String.format("Database %s doesn't exist", this.dbName));
}
coll = DocumentDBConnectorUtil.GetDocumentCollection(client, db.getSelfLink(), this.collName.toString());
if (coll == null) {
throw new IOException(String.format("collection %s doesn't exist", this.collName));
}
String query = this.query.toString();
if (query != null && !query.isEmpty()) {
query = this.query.toString();
} else {
query = "select * from root";
}
FeedOptions options = new FeedOptions();
options.setPageSize(MAX_PAGE_SIZE);
this.documentIterator = client.queryDocuments(
coll.getSelfLink(),
query,
options).getQueryIterator();
} catch (Exception e) {
throw new IOException(e);
}
return this.documentIterator;
}
public String toString() {
return String.format("DocumentDBSplit(collection=%s)", this.collName);
}
}