Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 5 commits
  • 2 files changed
  • 0 comments
  • 2 contributors
2  project.clj
... ...
@@ -1,4 +1,4 @@
1  
-(defproject com.twitter/maple "0.2.2"
  1
+(defproject com.twitter/maple "0.2.3"
2 2
   :source-path "src/clj"
3 3
   :java-source-path "src/jvm"
4 4
   :description "All the Cascading taps we have to offer."
25  src/jvm/com/twitter/maple/jdbc/db/DBInputFormat.java
@@ -79,7 +79,9 @@ protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job)
79 79
             //statement.setFetchSize(Integer.MIN_VALUE);
80 80
             String query = getSelectQuery();
81 81
             try {
  82
+                LOG.info(query);
82 83
                 results = statement.executeQuery(query);
  84
+                LOG.info("done executing select query");
83 85
             } catch (SQLException exception) {
84 86
                 LOG.error("unable to execute select query: " + query, exception);
85 87
                 throw new IOException("unable to execute select query: " + query, exception);
@@ -120,8 +122,11 @@ protected String getSelectQuery() {
120 122
                 query.append(dbConf.getInputQuery());
121 123
 
122 124
             try {
123  
-                query.append(" LIMIT ").append(split.getLength());
124  
-                query.append(" OFFSET ").append(split.getStart());
  125
+                // Only add limit and offset if you have multiple chunks
  126
+                if(split.getChunks() > 1) {
  127
+                    query.append(" LIMIT ").append(split.getLength());
  128
+                    query.append(" OFFSET ").append(split.getStart());
  129
+                }
125 130
             } catch (IOException ex) {
126 131
                 //ignore, will not throw
127 132
             }
@@ -200,6 +205,7 @@ public void write(PreparedStatement arg0) throws SQLException {
200 205
     protected static class DBInputSplit implements InputSplit {
201 206
         private long end = 0;
202 207
         private long start = 0;
  208
+        private long chunks = 0;
203 209
 
204 210
         /** Default Constructor */
205 211
         public DBInputSplit() {
@@ -211,9 +217,11 @@ public DBInputSplit() {
211 217
          * @param start the index of the first row to select
212 218
          * @param end   the index of the last row to select
213 219
          */
214  
-        public DBInputSplit(long start, long end) {
  220
+        public DBInputSplit(long start, long end, long chunks) {
215 221
             this.start = start;
216 222
             this.end = end;
  223
+            this.chunks = chunks;
  224
+            LOG.info("creating DB input split with start: " + start + ", end: " + end + ", chunks: " + chunks);
217 225
         }
218 226
 
219 227
         /** {@inheritDoc} */
@@ -237,16 +245,23 @@ public long getLength() throws IOException {
237 245
             return end - start;
238 246
         }
239 247
 
  248
+        /** @return The total number of chucks accross all splits */
  249
+        public long getChunks() {
  250
+            return chunks;
  251
+        }
  252
+
240 253
         /** {@inheritDoc} */
241 254
         public void readFields(DataInput input) throws IOException {
242 255
             start = input.readLong();
243 256
             end = input.readLong();
  257
+            chunks = input.readLong();
244 258
         }
245 259
 
246 260
         /** {@inheritDoc} */
247 261
         public void write(DataOutput output) throws IOException {
248 262
             output.writeLong(start);
249 263
             output.writeLong(end);
  264
+            output.writeLong(chunks);
250 265
         }
251 266
     }
252 267
 
@@ -343,9 +358,9 @@ protected void setTransactionIsolationLevel(Connection connection) {
343 358
                 DBInputSplit split;
344 359
 
345 360
                 if (i + 1 == chunks)
346  
-                    split = new DBInputSplit(i * chunkSize, count);
  361
+                    split = new DBInputSplit(i * chunkSize, count, chunks);
347 362
                 else
348  
-                    split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize);
  363
+                    split = new DBInputSplit(i * chunkSize, i * chunkSize + chunkSize, chunks);
349 364
 
350 365
                 splits[i] = split;
351 366
             }

No commit comments for this range

Something went wrong with that request. Please try again.