/
import.rb
260 lines (231 loc) · 7.7 KB
/
import.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
module Coupler
module Models
class Import < Sequel::Model
include CommonModel
include Jobify
# NOTE: yoinked from FasterCSV
# A Regexp used to find and convert some common Date formats.
DateMatcher = / \A(?: (\w+,?\s+)?\w+\s+\d{1,2},?\s+\d{2,4} |
\d{4}-\d{2}-\d{2} )\z /x
# A Regexp used to find and convert some common DateTime formats.
DateTimeMatcher =
/ \A(?: (\w+,?\s+)?\w+\s+\d{1,2}\s+\d{1,2}:\d{1,2}:\d{1,2},?\s+\d{2,4} |
\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2} )\z /x
many_to_one :project
plugin :serialization
serialize_attributes :marshal, :field_types, :field_names
mount_uploader :data, DataUploader
def data=(value)
result = super
self.name ||= File.basename(data.file.original_filename).sub(/\.\w+?$/, "").gsub(/[_-]+/, " ").capitalize
discover_fields
result
end
def primary_key_sym
primary_key_name.to_sym
end
def table_name
:"import_#{id}"
end
def preview
if @preview.nil?
@preview = []
FasterCSV.open(data.file.file) do |csv|
csv.rewind
csv.shift if self.has_headers
50.times do |i|
row = csv.shift
break if row.nil?
@preview << row
end
end
end
@preview
end
def import!(&progress)
project.local_database do |db|
column_info = []
column_names = []
column_types = []
field_names.each_with_index do |name, i|
name_sym = name.to_sym
column_names << name_sym
column_types << {
:name => name_sym,
:type =>
case field_types[i]
when 'integer' then Integer
when 'string' then String
end,
:null => !(name == primary_key_name)
}
end
column_names << :dup_key_count
column_types << {:name => :dup_key_count, :type => Integer}
db.create_table!(table_name) do
columns.push(*column_types)
end
ds = db[table_name]
key_frequencies = Hash.new { |h, k| h[k] = 0 }
buffer = ImportBuffer.new(column_names, ds)
skip = has_headers
primary_key_index = field_names.index(primary_key_name)
io = File.open(data.file.file, 'rb')
csv = FasterCSV.new(io)
csv.each do |row|
if skip
# skip header if necessary
skip = false
next
end
# convert values
row.each_with_index do |value, i|
row[i] = coerce(value, field_types[i])
end
key = row[primary_key_index]
num = key_frequencies[key] += 1
row.push(num > 1 ? num : nil)
self.has_duplicate_keys = true if num > 1
buffer.add(row)
if block_given?
yield io.pos
end
end
buffer.flush
primary_key = self.primary_key_sym
if has_duplicate_keys
# flag duplicate primary keys
key_frequencies.each_pair do |key, count|
next if count == 1
ds.filter(primary_key => key, :dup_key_count => nil).update(:dup_key_count => 1)
end
else
# alter table to set primary key
db.alter_table(table_name) do
drop_column(:dup_key_count)
add_primary_key([primary_key])
end
end
end
update(:occurred_at => Time.now)
!has_duplicate_keys
end
def dataset
project.local_database do |db|
yield(db[table_name])
end
end
def repair_duplicate_keys!(rows_to_remove = nil)
pkey = primary_key_sym
project.local_database do |db|
ds = db[table_name]
if rows_to_remove
filtered_ds = nil
rows_to_remove.each_pair do |key, dups|
hsh = {pkey => key, :dup_key_count => dups}
filtered_ds = filtered_ds ? filtered_ds.or(hsh) : ds.filter(hsh)
end
filtered_ds.delete if filtered_ds
end
# only reassign keys if there is more than 1 duplicate per key
keys = ds.group(pkey).having { count(pkey) > 1 }.select_map(pkey)
current_key = nil
next_key = ds.order(pkey).last[pkey].next
ds.filter(pkey => keys).order(:dup_key_count).each do |row|
# skip the first one, since it'll retain the key
if current_key != row[pkey]
current_key = row[pkey]
else
ds.filter(pkey => row[pkey], :dup_key_count => row[:dup_key_count]).
update(pkey => next_key)
next_key = next_key.next
end
end
db.alter_table(table_name) do
drop_column(:dup_key_count)
add_primary_key([pkey])
end
end
end
private
def discover_fields
FasterCSV.open(data.file.file) do |csv|
csv.rewind
count = 0
types = []
type_counts = []
headers = csv.shift
if headers.any? { |h| h !~ /[A-Za-z_$]/ }
row = headers
headers = nil
self.has_headers = false
else
self.has_headers = true
headers.each_with_index do |name, i|
if name =~ /^id$/i
self.primary_key_name = name
end
end
row = csv.shift
end
while row && count < 50
row.each_with_index do |value, i|
hash = type_counts[i] ||= {}
type =
case value
when /^\d+$/ then 'integer'
else 'string'
end
hash[type] = (hash[type] || 0) + 1
end
row = csv.shift
count += 1
end
type_counts.each_with_index do |type_count, i|
types[i] = type_count.max { |a, b| a[1] <=> b[1] }[0]
end
self.field_types = types
self.field_names = headers
end
end
def validate
super
validates_presence [:project_id, :name, :field_names, :primary_key_name]
if name && project_id
validates_unique [:project_id, :name]
end
if field_names.is_a?(Array)
validates_includes field_names, [:primary_key_name]
expected = field_types.length
if field_names.length != expected
errors.add(:field_names, "must be of length #{expected}")
end
# check for duplicate field names
duplicates = {}
field_names.inject(Hash.new(0)) do |hash, field_name|
num = hash[field_name] += 1
duplicates[field_name] = num if num > 1
hash
end
if !duplicates.empty?
message = "have duplicates (%s)" %
duplicates.inject("") { |s, (k, v)| s + "#{k} x #{v}, " }.chomp(", ")
errors.add(:field_names, message)
end
end
end
def coerce(value, type)
return nil if value.nil?
case type
when "integer"
chr = value[0]
return nil if chr.nil?
ord = chr.ord
(ord.nil? || ord < 48 || ord > 57) ? nil : value.to_i
else
value
end
end
end
end
end