/
DataExtractorServiceSqlMap.java
73 lines (55 loc) · 4.55 KB
/
DataExtractorServiceSqlMap.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.service.impl;
import java.util.Map;
import org.jumpmind.db.platform.IDatabasePlatform;
public class DataExtractorServiceSqlMap extends AbstractSqlMap {
public DataExtractorServiceSqlMap(IDatabasePlatform platform,
Map<String, String> replacementTokens) {
super(platform, replacementTokens);
// @formatter:off
putSql("selectNodeIdsForExtractSql", "select node_id, queue from $(extract_request) where status=? and parent_request_id=0 group by node_id, queue");
putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and queue=? and status=? and parent_request_id=0 order by request_id");
putSql("selectExtractRequestForBatchSql", "select * from $(extract_request) where start_batch_id <= ? and end_batch_id >= ? and node_id = ? and load_id = ?");
putSql("selectExtractChildRequestForNodeSql", "select c.* from $(extract_request) c " +
"inner join $(extract_request) p on p.request_id = c.parent_request_id where p.node_id=? and p.queue=? and p.status=? and p.parent_request_id=0");
putSql("selectExtractChildRequestsByParentSql", "select * from $(extract_request) where parent_request_id = ?");
putSql("selectExtractChildRequestIdsMissed",
"select request_id from $(extract_request) where status = ? and parent_request_id > 0 "
+ "and parent_request_id in (select request_id from $(extract_request) where parent_request_id = 0 and status = ?)");
putSql("releaseExtractChildRequestFromParent",
"update $(extract_request) set parent_request_id = 0 where request_id = ?");
putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, queue, status, start_batch_id, end_batch_id, trigger_id, router_id, load_id, table_name, total_rows, parent_request_id, last_update_time, create_time) "
+ " values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
putSql("updateExtractRequestStatus", "update $(extract_request) set status=?, last_update_time=?, extracted_rows=?, extracted_millis=? where request_id=?");
putSql("updateExtractRequestLoadTime", "update $(extract_request) set loaded_time = (case when end_batch_id = ? then ? when 1 = 0 then last_update_time else null end), "
+ " loaded_rows = loaded_rows + ?, loaded_millis = loaded_millis + ?, last_loaded_batch_id = ?, "
+ " last_update_time=? where start_batch_id <= ? and end_batch_id >= ? and node_id=? and load_id=?");
putSql("updateExtractRequestTransferred", "update $(extract_request) set last_transferred_batch_id=?, transferred_rows = transferred_rows + ?, transferred_millis = ?"
+ " where start_batch_id <= ? and end_batch_id >= ? and node_id=? and load_id=? and (last_transferred_batch_id is null or last_transferred_batch_id < ?)");
putSql("restartExtractRequest", "update $(extract_request) set last_transferred_batch_id = null, transferred_rows = 0, transferred_millis = 0, "
+ "last_loaded_batch_id = null, loaded_rows = 0, loaded_millis = 0, parent_request_id = 0, status = ? "
+ "where request_id = ? and node_id = ?");
putSql("cancelExtractRequests", "update $(extract_request) set status=?, last_update_time=? where load_id = ?");
putSql("selectIncompleteTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is null order by request_id");
putSql("selectCompletedTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is not null order by request_id");
}
}