Skip to content
This repository has been archived by the owner on Nov 7, 2018. It is now read-only.

Commit

Permalink
Merge pull request #88 from 18F/less-memory
Browse files Browse the repository at this point in the history
use less memory by indexing each row as it is parsed
  • Loading branch information
ultrasaurus committed Jul 31, 2015
2 parents 124482a + 5a03545 commit 8e79cef
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
18 changes: 0 additions & 18 deletions lib/data_magic.rb
Expand Up @@ -149,24 +149,6 @@ def self.create_index(es_index_name = nil, field_types={})
es_index_name
end

# row: a hash (keys may be strings or symbols)
# new_fields: hash current_name : new_name
# returns a hash (which may be a subset of row) where keys are new_name
# with value of corresponding row[current_name]
def self.map_field_names(row, new_fields, options={})
mapped = {}
row.each do |key, value|
new_key = new_fields[key.to_sym] || new_fields[key.to_s]
if new_key
value = value.to_f if new_key.include? "location"
mapped[new_key] = value
elsif options[:import] == 'all'
mapped[key] = value
end
end
mapped
end

# get the real index name when given either
# api: api endpoint configured in data.yaml
# index: index name
Expand Down
58 changes: 38 additions & 20 deletions lib/data_magic/index.rb
Expand Up @@ -4,15 +4,25 @@

module DataMagic

def self.parse_rows(data, fields, options, additional)
parsed = CSV.parse(
data,
headers: true,
header_converters: lambda { |str| str.strip.to_sym }
)
parsed.map { |row| parse_row(row, fields, options, additional) }
# row: a hash (keys may be strings or symbols)
# new_fields: hash current_name : new_name
# returns a hash (which may be a subset of row) where keys are new_name
# with value of corresponding row[current_name]
def self.map_field_names(row, new_fields, options={})
mapped = {}
row.each do |key, value|
new_key = new_fields[key.to_sym] || new_fields[key.to_s]
if new_key
value = value.to_f if new_key.include? "location"
mapped[new_key] = value
elsif options[:import] == 'all'
mapped[key] = value
end
end
mapped
end


def self.parse_row(row, fields, options, additional)
row = row.to_hash
row = map_field_names(row, fields, options) unless fields.empty?
Expand Down Expand Up @@ -43,26 +53,34 @@ def self.import_csv(data, options={})

new_field_names = options[:fields] || {}
new_field_names = new_field_names.merge(additional_fields)
num_rows = 0
headers = nil
begin
rows = parse_rows(data, new_field_names, options, additional_data)
CSV.parse(
data,
headers: true,
header_converters: lambda { |str| str.strip.to_sym }
) do|row|
row = parse_row(row, new_field_names, options, additional_data)
headers ||= row.keys.map(&:to_s)
client.index({
index: es_index_name,
id: get_id(row),
type: 'document',
body: row,
})
num_rows += 1
end

rescue Exception => e
Config.logger.error e.message
rows = []
end
rows.each { |row|
client.index({
index: es_index_name,
id: get_id(row),
type: 'document',
body: row,
})
}

raise InvalidData, "invalid file format or zero rows" if rows.length == 0

raise InvalidData, "invalid file format or zero rows" if num_rows == 0
client.indices.refresh index: es_index_name

fields = rows.map(&:keys).flatten.uniq
return [rows.length, fields]
return [num_rows, headers]
end

def self.import_with_dictionary(options = {})
Expand Down

0 comments on commit 8e79cef

Please sign in to comment.