/
abstract-parser.js
70 lines (61 loc) · 1.78 KB
/
abstract-parser.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import highland from 'highland'
import { defaults, difference } from 'lodash'
import csv from 'csv-parser'
import CONSTANTS from '../constants'
/* eslint class-methods-use-this:["error",{"exceptMethods":["_processData"]}] */
export default class AbstractParser {
constructor(conf = {}, moduleName) {
this.moduleName = moduleName
this.csvConfig = defaults(conf.csvConfig || {}, {
batchSize: CONSTANTS.standardOption.batchSize,
delimiter: CONSTANTS.standardOption.delimiter,
strictMode: CONSTANTS.standardOption.strictMode,
})
this.logger = defaults(conf.logger || {}, {
error: () => {},
warn: () => {},
info: () => {},
verbose: () => {},
})
}
_streamInput(input, output) {
let rowIndex = 1
return highland(input)
.through(
csv({
separator: this.csvConfig.delimiter,
strict: this.csvConfig.strictMode,
})
)
.stopOnError(err => {
this.logger.error(err)
return output.emit('error', err)
})
.batch(this.csvConfig.batchSize)
.doto(data => {
this.logger.verbose(`Parsed row-${rowIndex}: ${JSON.stringify(data)}`)
rowIndex += 1
})
.flatMap(highland)
.flatMap(data => data |> this._processData |> highland)
.stopOnError(err => {
this.logger.error(err)
return output.emit('error', err)
})
.doto(data =>
this.logger.verbose(
`Converted row-${rowIndex}: ${JSON.stringify(data)}`
)
)
}
_getMissingHeaders(data) {
const headerDiff = difference(
CONSTANTS.requiredHeaders[this.moduleName],
Object.keys(data)
)
return headerDiff
}
_processData() {
throw new Error('Method AbstractParser._processData has to be overridden!')
}
}