Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of AWS opensearch serverless #1021

Merged
merged 2 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,9 @@ Options
Elasticsearch versioning types. Should be `internal`, `external`, `external_gte`, `force`.
NB : Type validation is handled by the bulk endpoint and not by elasticsearch-dump

--openSearchServerless
Set to true to run dump from AWS OpenSearch serverless collection.
(default : false)

AWS specific options
--------------------
Expand Down Expand Up @@ -680,6 +683,9 @@ AWS specific options
--s3AccessKeyId
AWS access key ID

--s3SessionToken
AWS session token in case of using temporary credentials

--s3Compress
gzip data before sending to s3

Expand Down Expand Up @@ -916,7 +922,7 @@ because of its simplicity. This detection is disabled by default, to enable use
- Elasticdump v6.68.0 added support for specifying a file containing the searchBody
- Elasticdump v6.85.0 added support for ignoring auto columns in CSV
- Elasticdump v6.86.0 added support for searchBodyTemplate which allows the searchBody to be transformed

- Elasticdump v6.110.1 added support for AWS OpenSearch serverless collection. Note: by default, [AWS OpenSearch serverless does not support `/_search?scroll` API and PUT `_bulk`](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-genref.html#serverless-operations). As a workaround, the dump is implemented using `_search` and POST `_bulk` API only. This may affect the performance of the dump.


## Articles on Elasticdump
Expand Down
10 changes: 5 additions & 5 deletions bin/elasticdump
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ const packageData = require(path.join(__dirname, '..', 'package.json'))
const { isUrl } = require(path.join(__dirname, '..', 'lib', 'is-url.js'))
const ArgParser = require(path.join(__dirname, '..', 'lib', 'argv.js'))
const versionCheck = require(path.join(__dirname, '..', 'lib', 'version-check.js'))
require('aws-sdk/lib/maintenance_mode_message').suppress = true;

require('aws-sdk/lib/maintenance_mode_message').suppress = true

// For future developers. If you add options here, be sure to add the option to test suite tests where necessary
const defaults = {
Expand Down Expand Up @@ -61,6 +60,7 @@ const defaults = {
awsUrlRegex: null,
s3AccessKeyId: null,
s3SecretAccessKey: null,
s3SessionToken: null,
s3Region: null,
s3Endpoint: null,
s3SSLEnabled: true,
Expand Down Expand Up @@ -142,15 +142,16 @@ const defaults = {
csvIncludeEndRowDelimiter: false,

// opensearch compatability
'force-os-version': '7.10.2'
'force-os-version': '7.10.2',
// opensearch serverless
openSearchServerless: false
}
const options = {}

versionCheck()

const args = new ArgParser({ options, parseJSONOpts: true })
args.parse(argv, defaults)

if (argv.debug === true) {
process
.on('unhandledRejection', (reason, p) => {
Expand All @@ -174,7 +175,6 @@ if (argv.help === true) {
console.error(`File \`${options.output}\` already exists, quitting`)
process.exit(1)
}

const dumper = new Elasticdump(options)

dumper.on('log', function (message) { args.log('log', message) })
Expand Down
2 changes: 1 addition & 1 deletion bin/multielasticdump
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const ArgParser = require(path.join(__dirname, '..', 'lib', 'argv.js'))
const addAuth = require(path.join(__dirname, '..', 'lib', 'add-auth.js'))
const versionCheck = require(path.join(__dirname, '..', 'lib', 'version-check.js'))
const AWS = require('aws-sdk')
require('aws-sdk/lib/maintenance_mode_message').suppress = true;
require('aws-sdk/lib/maintenance_mode_message').suppress = true
const initAws = require(path.join(__dirname, '..', 'lib', 'init-aws.js'))
const requestUtils = require(path.join(__dirname, '..', 'lib', 'request.js'))
const aws4signer = require(path.join(__dirname, '..', 'lib', 'aws4signer.js'))
Expand Down
11 changes: 10 additions & 1 deletion lib/aws4signer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ const aws4 = require('aws4')
const AWS = require('aws-sdk')
const path = require('path')
const os = require('os')

const crypto = require('crypto')
let credentials // lazily loaded, see below
let isAwsCredentials = false

const calculateSHA256 = (body) => {
if (!body) return 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' // SHA256 hash of an empty string
return crypto.createHash('sha256').update(body).digest('hex')
}

const aws4signer = async (esRequest, parent) => {
// Consider deprecating - insecure to use on command line and credentials can be found by default at ~/.aws/credentials or as environment variables
const useAwsCredentials = ((typeof parent.options.awsAccessKeyId === 'string') && (typeof parent.options.awsSecretAccessKey === 'string'))
Expand Down Expand Up @@ -67,6 +72,10 @@ const aws4signer = async (esRequest, parent) => {

esRequest.headers = Object.assign({ host: urlObj.hostname, 'Content-Type': 'application/json' }, esRequest.headers)
esRequest.path = `${urlObj.pathname}?${urlObj.searchParams.toString()}`

if (parent.options.openSearchServerless) {
esRequest.headers = Object.assign({ 'X-Amz-Content-SHA256': calculateSHA256(esRequest.body) }, esRequest.headers)
}
return aws4.sign(esRequest, credentials)
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/init-aws.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const initAws = (options) => {
AWS.config.update({
accessKeyId: options.s3AccessKeyId,
secretAccessKey: options.s3SecretAccessKey,
sessionToken: options.s3SessionToken,
sslEnabled: options.s3SSLEnabled,
s3ForcePathStyle: options.s3ForcePathStyle
})
Expand Down
26 changes: 18 additions & 8 deletions lib/transports/__es__/_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,25 @@ class Base {
return paramsObj ? `${prefix}${Object.keys(paramsObj).map(key => `${key}=${paramsObj[key]}`).join('&')}` : ''
}

setSearchBody () {
if (!this.searchBody) {
if (this.ESversion >= 5) {
this.searchBody = { query: { match_all: {} }, stored_fields: ['*'], _source: true }
} else {
this.searchBody = { query: { match_all: {} }, fields: ['*'], _source: true }
}
}
}

version (prefix, callback) {
if (this.ESversion) { return callback() }

if (this.parent.options.openSearchServerless) {
this.parent.emit('debug', `opensearch serverless does not support version check, assuming major version: ${this.ESversion}`)
this.setSearchBody()
return callback()
}

const esRequest = {
url: `${this.base.host}/`,
method: 'GET'
Expand Down Expand Up @@ -55,14 +72,7 @@ class Base {
this.parent.emit('debug', `cannot discover elasticsearch ${prefix} major version, assuming: ${this.ESversion}`)
}

if (!this.searchBody) {
if (this.ESversion >= 5) {
this.searchBody = { query: { match_all: {} }, stored_fields: ['*'], _source: true }
} else {
this.searchBody = { query: { match_all: {} }, fields: ['*'], _source: true }
}
}

this.setSearchBody()
callback()
})
}).catch(callback)
Expand Down
24 changes: 8 additions & 16 deletions lib/transports/__es__/_data.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,8 @@ const { scrollResultSet, safeDecodeURIComponent } = require('./_helpers')
class Data {
async getData (limit, offset, callback) {
let searchRequest, uri
let searchBodyTmp

if (this.parent.options.searchWithTemplate) {
searchBodyTmp = await this.renderTemplate(this.searchBody.id, this.searchBody.params)
.then(result => {
return result
})
.catch(error => {
throw new Error(error)
})
} else {
searchBodyTmp = this.searchBody
}

const searchBody = searchBodyTmp
const searchBody = await this.searchWithTemplate(this.searchBody)

if (offset >= this.totalSearchResults && this.totalSearchResults !== 0) {
callback(null, [])
Expand Down Expand Up @@ -106,11 +93,12 @@ class Data {
const additionalParams = this.paramsToString(this.parent.options[`${this.options.type}-params`] || this.parent.options.params)

const thisUrl = `${this.base.url}/_bulk${additionalParams}`

// Note: OpenSearch Serverless does not support PUT _bulk API, instead POST _bulk API should be used
// List of supported endpoints: https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-genref.html#serverless-operations
const payload = {
url: thisUrl,
body: '',
method: 'PUT',
method: this.parent.options.openSearchServerless ? 'POST' : 'PUT',
headers: Object.assign({
'User-Agent': 'elasticdump',
'Content-Type': 'application/x-ndjson'
Expand All @@ -121,6 +109,10 @@ class Data {
const bulkAction = this.parent.options.bulkAction || 'index'

data.forEach(elem => {
if (this.parent.options.openSearchServerless) {
delete elem._id
delete elem.fields
}
const actionMeta = { [bulkAction]: {} }

// use index from base otherwise fallback to elem
Expand Down
37 changes: 37 additions & 0 deletions lib/transports/__es__/_serverless.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const jsonParser = require('../../jsonparser.js')
const aws4signer = require('../../aws4signer')
const _ = require('lodash')

class Serverless {
async getOpenSearchServerlessData (limit, offset, callback) {
const searchBody = await this.searchWithTemplate(this.searchBody)
searchBody.size = limit
searchBody.from = this.currentOffset ? this.currentOffset : offset
const additionalParams = this.paramsToString(this.parent.options[`${this.options.type}-params`] || this.parent.options.params, '&')
// Note: OpenSearch Serverless does not support _search?scroll API. As a workaround pure _search is used
// List of supported endpoints: https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-genref.html#serverless-operations
const uri = `${this.base.url}/_search${additionalParams}`

const searchRequest = {
uri,
method: 'GET',
body: jsonParser.stringify(searchBody)
}
aws4signer(searchRequest, this.parent).then((res) => {
this.baseRequest(searchRequest, (err, response) => {
err = this.handleError(err, response)
if (err) {
return callback(err, [])
}

const body = jsonParser.parse(response.body, this.parent)
const hits = _.get(body, 'hits.hits', [])

this.currentOffset = offset + limit
return callback(null, hits)
})
}).catch(callback)
}
}

module.exports = Serverless
10 changes: 10 additions & 0 deletions lib/transports/__es__/_template.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ class Template {
)
}

searchWithTemplate (searchBody) {
return new Promise((resolve, reject) => {
if (this.parent.options.searchWithTemplate) {
this.renderTemplate(searchBody.id, searchBody.params).then(resolve).catch(reject)
} else {
resolve(searchBody)
}
})
}

_transformTemplates (data, type) {
return _.reduce(data[`${type}s`], (obj, tmpl) => {
obj[tmpl.name] = tmpl[type]
Expand Down
3 changes: 2 additions & 1 deletion lib/transports/__es__/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ module.exports = {
Setting: require('./_setting'),
Template: require('./_template'),
Script: require('./_script'),
Data: require('./_data')
Data: require('./_data'),
Serverless: require('./_serverless')
}
10 changes: 7 additions & 3 deletions lib/transports/elasticsearch.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ const {
Policy,
Setting,
Template,
Serverless,
Script
} = require('./__es__')

class elasticsearch extends Many(Base, Alias, Analyzer, Mapping, Policy, Setting, Template, Script, Data) {
class elasticsearch extends Many(Base, Alias, Analyzer, Mapping, Policy, Setting, Template, Script, Data, Serverless) {
constructor (parent, url, options) {
super()
this.base = parseBaseURL(url, options)
Expand Down Expand Up @@ -84,7 +85,11 @@ class elasticsearch extends Many(Base, Alias, Analyzer, Mapping, Policy, Setting
if (err) { return callback(err) }

if (type === 'data') {
this.getData(limit, offset, callback)
if (this.parent.options.openSearchServerless) {
this.getOpenSearchServerlessData(limit, offset, callback)
} else {
this.getData(limit, offset, callback)
}
} else if (type === 'mapping') {
this.getMapping(limit, offset, callback)
} else if (type === 'analyzer' || type === 'settings' || type === 'index') {
Expand Down Expand Up @@ -125,7 +130,6 @@ class elasticsearch extends Many(Base, Alias, Analyzer, Mapping, Policy, Setting
const type = this.parent.options.type
this.version('output', err => {
if (err) { return callback(err) }

if (type === 'data') {
this.setData(data, limit, offset, callback)
} else if (type === 'mapping') {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"node": ">=10.0.0"
},
"dependencies": {
"JSONStream": "npm:@search-dump/jsonstream@^1.4.0",
"async": "^2.6.4",
"aws-sdk": "2.1472.0",
"aws4": "^1.12.0",
Expand All @@ -38,6 +37,7 @@
"fast-csv": "4.3.6",
"http-status": "^1.5.1",
"ini": "^2.0.0",
"JSONStream": "npm:@search-dump/jsonstream@^1.4.0",
"lodash": "^4.17.21",
"lossless-json": "^1.0.5",
"minimist": "^1.2.8",
Expand Down
Loading