-
Notifications
You must be signed in to change notification settings - Fork 13
/
CopyData.py
316 lines (296 loc) · 12.2 KB
/
CopyData.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
"""
Copy data from the source to the destination, performing inserts and updates
as necessary to make the destination match the source. Note that deletes are
not performed, however.
"""
import cx_Logging
import cx_LoggingOptions
import cx_OptionParser
import cx_OracleUtils
import os
import Exceptions
import Options
# parse command line
parser = cx_OptionParser.OptionParser()
parser.AddOption(cx_OracleUtils.SchemaOption("source-schema"))
parser.AddOption(cx_OracleUtils.SchemaOption("dest-schema"))
parser.AddOption("--key-columns", metavar = "COLS",
help = "comma separated list of columns to use for checking if the "
"row exists")
parser.AddOption("--no-check-exists", default = 1, action = "store_false",
dest = "checkExists",
help = "do not check to see if the row exists in the target")
parser.AddOption("--no-check-modified", default = 1, action = "store_false",
dest = "checkModified",
help = "do not check to see if the row is identical to the row in the "
"destination")
parser.AddOption("--skip", metavar = "N", type = "int",
help = "number of rows to skip before starting the copy")
parser.AddOption("--row-limit", metavar = "N", type = "int",
help = "number of rows at which the copy will stop")
parser.AddOption(Options.COMMIT_POINT)
parser.AddOption(Options.REPORT_POINT)
parser.AddOption(Options.ARRAY_SIZE)
parser.AddOption(Options.MAX_LONG_SIZE)
parser.AddOption("--source-role", metavar = "ROLE",
help = "enable this <role> [identified by <password>] in the source "
"database immediately after connecting by calling "
"dbms_session.set_role")
parser.AddOption("--dest-role", metavar = "ROLE",
help = "enable this <role> [identified by <password>] in the target "
"database immediately after connecting by calling "
"dbms_session.set_role")
cx_LoggingOptions.AddOptions(parser)
parser.AddArgument("source", required = True,
help = "a select statement or the name of the table to query")
parser.AddArgument("destination",
help = "the name of a table or view to perform the insert and update "
"statements against")
options = parser.Parse()
cx_LoggingOptions.ProcessOptions(options)
# set up the source connection
sourceConnection = cx_OracleUtils.Connect(options.sourceSchema,
options.sourceRole)
sourceCursor = sourceConnection.cursor()
if options.arraySize:
sourceCursor.arraysize = options.arraySize
if options.maxLongSize:
sourceCursor.setoutputsize(options.maxLongSize)
# set up the destination connection
destConnection = cx_OracleUtils.Connect(options.destSchema, options.destRole)
cursor = destConnection.cursor()
# determine query to execute
sourceSQL = options.source.strip()
destinationTable = options.destination
if not sourceSQL.lower().startswith("select ") and os.path.isfile(sourceSQL):
sourceSQL = open(sourceSQL).read().strip()
elif " " not in sourceSQL:
if destinationTable is None:
destinationTable = sourceSQL
sourceInfo = cx_OracleUtils.GetObjectInfo(sourceConnection, sourceSQL)
if sourceInfo is None:
raise Exceptions.SourceTableNotFound(tableName = sourceSQL)
sourceTableOwner, sourceTableName, sourceTableType = sourceInfo
sourceSQL = "select * from %s.%s" % \
(cx_OracleUtils.IdentifierRepr(sourceTableOwner),
cx_OracleUtils.IdentifierRepr(sourceTableName))
if not destinationTable:
raise Exceptions.DestinationTableNotSpecified()
# verify the destination table exists
destInfo = cx_OracleUtils.GetObjectInfo(destConnection, destinationTable)
if destInfo is None:
raise Exceptions.TargetTableNotFound(tableName = destinationTable)
destTableOwner, destTableName, destTableType = destInfo
# determine columns in source query
colPos = 0
sourceColumns = {}
sourceCursor.execute(sourceSQL)
sourceVars = sourceCursor.fetchvars
for colName, colType, colDisplaySize, colInternalSize, colPrecision, \
colScale, colNullOk in sourceCursor.description:
isLob = colType in (sourceConnection.CLOB, sourceConnection.BLOB)
sourceColumns[colName] = (colPos, colType, isLob)
colPos += 1
# lookup columns on destination table
cursor.execute("""
select
column_name,
nullable
from all_tab_columns
where owner = :owner
and table_name = :name""",
owner = destTableOwner,
name = destTableName)
destColumns = {}
for name, nullable in cursor:
destColumns[name] = (nullable == "Y")
# determine the list of key columns to use, if necessary
keyColumns = []
if options.checkExists:
if options.keyColumns:
keyColumns = options.keyColumns.upper().split(",")
else:
cursor.execute("""
select constraint_name
from all_constraints
where owner = :owner
and table_name = :name
and constraint_type in ('P', 'U')
order by constraint_type""",
owner = destTableOwner,
name = destTableName)
row = cursor.fetchone()
if not row:
raise Exceptions.NoPrimaryOrUniqueConstraintOnTable()
constraintName, = row
cursor.execute("""
select column_name
from all_cons_columns
where owner = :owner
and constraint_name = :name""",
owner = destTableOwner,
name = constraintName)
keyColumns = [n for n, in cursor]
for name in keyColumns:
if name not in sourceColumns:
raise Exceptions.KeyColumnNotInSourceQuery(name = name)
# match the columns; all of the source or all of the destination columns must
# match for a valid copy
matchingColumns = [n for n in sourceColumns if n in destColumns]
if len(matchingColumns) not in (len(sourceColumns), len(destColumns)):
raise Exceptions.NotAllColumnsMatchByName()
# set up insert cursor
insertNames = [cx_OracleUtils.IdentifierRepr(n) for n in matchingColumns]
insertValues = [":%s" % (i + 1) for i, n in enumerate(matchingColumns)]
statement = "insert into %s.%s (%s) values (%s)" % \
(cx_OracleUtils.IdentifierRepr(destTableOwner),
cx_OracleUtils.IdentifierRepr(destTableName),
",".join(insertNames), ",".join(insertValues))
insertCursor = cursor
insertCursor.bindarraysize = sourceCursor.arraysize
insertCursor.prepare(statement)
vars = []
insertVars = []
for name in matchingColumns:
colPos, colType, isLob = sourceColumns[name]
sourceVar = sourceVars[colPos]
if options.checkExists or isLob:
targetVar = insertCursor.var(colType, sourceVar.bufferSize)
insertVars.append((sourceVar, targetVar, isLob))
else:
targetVar = sourceVar
vars.append(targetVar)
insertCursor.setinputsizes(*vars)
# set up exists cursor
if options.checkExists:
method = cx_OracleUtils.WhereClause
whereClauses = [method(n, ":%s" % (i + 1), destColumns[n], True) \
for i, n in enumerate(keyColumns)]
statement = "select count(*) from %s.%s where %s" % \
(cx_OracleUtils.IdentifierRepr(destTableOwner),
cx_OracleUtils.IdentifierRepr(destTableName),
" and ".join(whereClauses))
existsCursor = destConnection.cursor()
existsCursor.prepare(statement)
vars = []
existsVars = []
for name in keyColumns:
colPos, colType, isLob = sourceColumns[name]
sourceVar = sourceVars[colPos]
targetVar = existsCursor.var(colType, sourceVar.bufferSize)
vars.append(targetVar)
existsVars.append((sourceVar, targetVar, isLob))
existsCursor.setinputsizes(*vars)
# set up update cursor
updateCursor = None
if options.checkExists and len(keyColumns) != len(matchingColumns):
updateColumns = [n for n in matchingColumns if n not in keyColumns] + \
keyColumns
setClauses = ["%s = :%s" % (cx_OracleUtils.IdentifierRepr(n), i + 1) \
for i, n in enumerate(updateColumns) if n not in keyColumns]
whereClauses = [method(n, ":%s" % (i + 1), destColumns[n], True) \
for i, n in enumerate(updateColumns) if n in keyColumns]
statement = "update %s.%s set %s where %s" % \
(cx_OracleUtils.IdentifierRepr(destTableOwner),
cx_OracleUtils.IdentifierRepr(destTableName),
",".join(setClauses), " and ".join(whereClauses))
if options.checkModified:
additionalWhereClauses = \
[method(n, ":%s" % (i + 1), destColumns[n], False) \
for i, n in enumerate(updateColumns) if n not in keyColumns]
statement += " and (%s)" % " or ".join(additionalWhereClauses)
updateCursor = destConnection.cursor()
updateCursor.bindarraysize = sourceCursor.arraysize
updateCursor.prepare(statement)
vars = []
updateVars = []
for name in updateColumns:
colPos, colType, isLob = sourceColumns[name]
sourceVar = sourceVars[colPos]
targetVar = updateCursor.var(colType, sourceVar.bufferSize)
updateVars.append((sourceVar, targetVar, isLob))
vars.append(targetVar)
updateCursor.setinputsizes(*vars)
# tell user what is happening
cx_Logging.Trace("Copying data...")
cx_Logging.Trace(" Source: %s", sourceSQL)
cx_Logging.Trace(" Destination: %s", destinationTable)
# skip rows that are not of interest
while options.skip:
cx_Logging.Trace(" Rows left to skip: %s", options.skip)
rowsToFetch = min(sourceCursor.arraysize, options.skip)
options.skip -= sourceCursor.fetchraw(rowsToFetch)
# initialize counters used in performing the copy
insertedRows = 0
modifiedRows = 0
unmodifiedRows = 0
insertPos = 0
updatePos = 0
lastCommitted = 0
lastReported = 0
totalRowsFetched = 0
iter = list(range(sourceCursor.arraysize))
reportPoint = options.reportPoint
commitPoint = options.commitPoint
rowLimit = options.rowLimit
if reportPoint is None and commitPoint is not None:
reportPoint = commitPoint
# perform the copy
while True:
rowsFetched = sourceCursor.fetchraw()
if rowLimit is not None and totalRowsFetched + rowsFetched > rowLimit:
rowsFetched = rowLimit - totalRowsFetched
if not rowsFetched:
break
totalRowsFetched += rowsFetched
if not insertVars:
insertPos = rowsFetched
else:
if rowsFetched != sourceCursor.arraysize:
iter = list(range(rowsFetched))
for pos in iter:
exists = 0
if options.checkExists:
for sourceVar, targetVar, isLob in existsVars:
targetVar.copy(sourceVar, pos, 0)
existsCursor.execute(None, [])
exists, = existsCursor.fetchone()
if not exists:
targetPos = insertPos
targetVars = insertVars
insertPos += 1
elif updateCursor:
targetPos = updatePos
targetVars = updateVars
updatePos += 1
else:
unmodifiedRows += 1
targetVars = []
for sourceVar, targetVar, isLob in targetVars:
if isLob:
lob = sourceVar.getvalue(pos)
targetVar.setvalue(targetPos,
lob and lob.read())
else:
targetVar.copy(sourceVar, pos, targetPos)
if insertPos:
insertCursor.executemanyprepared(insertPos)
insertedRows += insertPos
insertPos = 0
if updatePos:
updateCursor.executemanyprepared(updatePos)
modifiedRows += updateCursor.rowcount
unmodifiedRows += (updatePos - updateCursor.rowcount)
updatePos = 0
if reportPoint and totalRowsFetched - lastReported >= reportPoint:
lastReported = totalRowsFetched
cx_Logging.Trace(" %s rows processed", totalRowsFetched)
if commitPoint and totalRowsFetched - lastCommitted >= commitPoint:
lastCommitted = totalRowsFetched
destConnection.commit()
destConnection.commit()
# print out final statistics
cx_Logging.Trace("%s rows retrieved from source.", totalRowsFetched)
cx_Logging.Trace("%s rows created in destination.", insertedRows)
cx_Logging.Trace("%s rows modified in destination.", modifiedRows)
cx_Logging.Trace("%s rows unmodified in destination.", unmodifiedRows)