Skip to content

Commit

Permalink
Various changes
Browse files Browse the repository at this point in the history
- Use URL syntax for declaring parameters to module-based transforms
- Add anonymize transform
- Add yarn lock
  • Loading branch information
datashaman committed May 15, 2017
1 parent 75a3501 commit 59f5a84
Show file tree
Hide file tree
Showing 6 changed files with 1,833 additions and 3 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ Usage: elasticdump --input SOURCE --output DESTINATION [OPTIONS]
value of field 'f1':
doc._source["f2"] = doc._source.f1 * 2;
May be used multiple times.
Additionally, transform may be performed by a module. See [Module Transform](#module-transform) below.
--awsChain
Use [standard](https://aws.amazon.com/blogs/security/a-new-and-standardized-way-to-manage-credentials-in-the-aws-sdks/) location and ordering for resolving credentials including environment variables, config files, EC2 and ECS metadata locations
_Recommended option for use with AWS_
Expand Down Expand Up @@ -324,9 +325,27 @@ For loading files that you have dumped from multielasticsearch, `--direction` sh

The new options, `--parallel` is how many forks should be run simultaneously and `--match` is used to filter which indexes should be dumped/loaded (regex).

## Module Transform

When specifying the `transform` option, prefix the value with `@` (a curl convention) to load the top-level function which is called with the document and the parsed arguments to the module.

Uses a pseudo-URL format to specify arguments to the module as follows. Given:

elasticdump --transform='@./transforms/my-transform?param1=value&param2=another-value'

with a module at `./transforms/my-transform.js` with the following:

module.exports = function (doc, options) {
// do something to doc
};

will load module `./transforms/my-transform.js', and execute the function with `doc` and `options` = `{"param1": "value", "param2": "another-value"}`.

An example transform for anonymizing data on-the-fly can be found in the `transforms` folder.

## Notes

- this tool is likley to require Elasticsearch version 1.0.0 or higher
- this tool is likely to require Elasticsearch version 1.0.0 or higher
- elasticdump (and elasticsearch in general) will create indices if they don't exist upon import
- when exporting from elasticsearch, you can have export an entire index (`--input="http://localhost:9200/index"`) or a type of object from that index (`--input="http://localhost:9200/index/type"`). This requires ElasticSearch 1.2.0 or higher
- If elasticsearch is in a sub-directory, index and type must be provided with a separate argument (`--input="http://localhost:9200/sub/directory --input-index=index/type"`). Using `--input-index=/` will include all indices and types.
Expand Down
26 changes: 24 additions & 2 deletions elasticdump.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,24 @@ var https = require('https')
var path = require('path')
var EventEmitter = require('events').EventEmitter
var isUrl = require('./lib/is-url')
var url = require('url')
var vm = require('vm')
var addAuth = require('./lib/add-auth')

var getParams = function (query) {
if (!query) {
return {}
}

return (/^[?#]/.test(query) ? query.slice(1) : query)
.split('&')
.reduce((params, param) => {
let [ key, value ] = param.split('=')
params[key] = value ? decodeURIComponent(value.replace(/\+/g, ' ')) : ''
return params
}, { })
}

var elasticdump = function (input, output, options) {
var self = this

Expand Down Expand Up @@ -81,8 +96,15 @@ var elasticdump = function (input, output, options) {
self.options.transform = [self.options.transform]
}
self.modifiers = self.options.transform.map(function (transform) {
var modificationScriptText = '(function(doc) { ' + transform + ' })'
return new vm.Script(modificationScriptText).runInThisContext()
if (transform[0] === '@') {
return function (doc) {
var parsed = url.parse(transform.slice(1))
return require(parsed.pathname)(doc, getParams(parsed.query))
}
} else {
var modificationScriptText = '(function(doc) { ' + transform + ' })'
return new vm.Script(modificationScriptText).runInNewContext()
}
})
}
}
Expand Down
8 changes: 8 additions & 0 deletions test/test-resources/transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
var crypto = require('crypto')

module.exports = function (doc) {
doc._source.bar = crypto
.createHash('md5')
.update(doc._source.foo)
.digest('hex')
}
49 changes: 49 additions & 0 deletions test/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var request = require('request')
var should = require('should')
var path = require('path')
var async = require('async')
var crypto = require('crypto')
var Elasticdump = require(path.join(__dirname, '..', 'elasticdump.js'))

var clear = function (callback) {
Expand Down Expand Up @@ -95,3 +96,51 @@ describe('multiple transform scripts should be executed for written documents',
})
})
})

describe('external transform module should be executed for written documents', function () {
before(function (done) {
this.timeout(1000 * 20)
clear(function (error) {
if (error) { return done(error) }
setup(function (error) {
if (error) { return done(error) }
var jobs = []

var dataOptions = {
limit: 100,
offset: 0,
debug: true,
type: 'data',
input: baseUrl + '/source_index',
output: baseUrl + '/destination_index',
scrollTime: '10m',
transform: '@./test/test-resources/transform'
}

var dataDumper = new Elasticdump(dataOptions.input, dataOptions.output, dataOptions)

dataDumper.on('error', function (error) { throw (error) })

jobs.push(function (next) { dataDumper.dump(next) })
jobs.push(function (next) { setTimeout(next, 5001) })

async.series(jobs, done)
})
})
})

after(function (done) { clear(done) })

it('documents should have the new field computed by external transform module', function (done) {
var url = baseUrl + '/destination_index/_search'
request.get(url, function (err, response, body) {
should.not.exist(err)
body = JSON.parse(body)
body.hits.total.should.equal(2)
body.hits.hits.forEach(function (doc) {
doc._source.bar.should.equal(crypto.createHash('md5').update(doc._source.foo).digest('hex'))
})
done()
})
})
})
77 changes: 77 additions & 0 deletions transforms/anonymize.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
var crypto = require('crypto')

function anonymize (thing, options = {}) {
if (
thing === null ||
thing instanceof Date
) {
return thing
}

options = Object.assign({
domain: 'example.com',
mobile: '+1-555-123-4567',
blacklist: []
}, options)

if (typeof options.blacklist === 'string') {
options.blacklist = options.blacklist.split(',')
}

switch (typeof thing) {
case 'object':
Object
.keys(thing)
.reduce(function (object, key) {
if (options.blacklist.includes(key)) {
return object
}

switch (typeof object[key]) {
case 'object':
anonymize(object[key], options)
break
case 'string':
object[key] = anonymize(object[key], options)
break
default:
}

return object
}, thing)
break
case 'string':
// If it looks like a date or datetime, leave it alone
if (/^\d{4}-[01]\d-[0-3]\d(?:[T ][0-2]\d:[0-5]\d:[0-5]\d)?$/.test(thing)) {
return thing
}

return [
[
// If it looks like an email, replace it with a hashed variant with the configured domain
/(([^<>()[\]\\.,;:\s@"]+(\.[^<>()[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))/ig,
function (found) {
return crypto
.createHash('md5')
.update(found)
.digest('hex')
.slice(0, 11) +
'@' + options.domain
}
],
[
// If it looks like a mobile number, replace it with the configured one
/[+0][-0-9\s]{6,}[0-9]/g,
options.mobile
]
].reduce(function (string, replacement) {
return string.replace(replacement[0], replacement[1])
}, thing)
default:
return thing
}
}

module.exports = function (doc, options = {}) {
anonymize(doc._source, options)
}
Loading

0 comments on commit 59f5a84

Please sign in to comment.