-
Notifications
You must be signed in to change notification settings - Fork 14
/
census-import.data.service.js
100 lines (93 loc) · 3.15 KB
/
census-import.data.service.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
const mssql = require('mssql') /* special */
const config = require('../config')
const R = require('ramda')
let pool
module.exports.initPool = async function initPool (context) {
const poolConfig = {
database: config.Sql.Database,
server: config.Sql.Server,
port: config.Sql.Port,
requestTimeout: 10 * 60 * 1000,
connectionTimeout: config.Sql.Timeout,
user: config.Sql.PupilCensus.Username,
password: config.Sql.PupilCensus.Password,
pool: {
min: 1,
max: 3
},
options: {
appName: config.Sql.Application.Name, // docker default
encrypt: config.Sql.Encrypt
}
}
pool = new mssql.ConnectionPool(poolConfig)
pool.on('error', err => {
context.log('SQL Pool Error:', err)
})
await pool.connect()
return pool
}
/**
* Create census import staging table
* @param {Object} context
* @param {Object} pool
* @param {String} censusTable
* @param {Array} blobContent
* @return {Object}
*/
module.exports.sqlLoadStagingTable = async (context, pool, censusTable, blobContent) => {
if (!pool) {
await this.initPool(context)
}
const table = new mssql.Table(censusTable)
table.create = true
table.columns.add('id', mssql.Int, { nullable: false, primary: true, identity: true })
table.columns.add('lea', mssql.NVarChar(mssql.MAX), { nullable: false })
table.columns.add('estab', mssql.NVarChar(mssql.MAX), { nullable: false })
table.columns.add('upn', mssql.NVarChar(mssql.MAX), { nullable: false })
table.columns.add('surname', mssql.NVarChar(mssql.MAX), { nullable: false })
table.columns.add('forename', mssql.NVarChar(mssql.MAX), { nullable: false })
table.columns.add('middlenames', mssql.NVarChar(mssql.MAX), { nullable: false })
table.columns.add('gender', mssql.NVarChar(mssql.MAX), { nullable: false })
table.columns.add('dob', mssql.NVarChar(mssql.MAX), { nullable: false })
for (let i = 1; i < blobContent.length; i++) {
const row = blobContent[i]
table.rows.add(i, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8])
}
const request = new mssql.Request(pool)
const result = await request.bulk(table)
return result.rowsAffected
}
/**
* Execute store procedure to load pupils from staging to pupils table
* @param {Object} context
* @param {Object} pool
* @param {String} censusTable
* @param {Number} jobId
* @return {Object}
*/
module.exports.sqlLoadPupilsFromStaging = async (context, pool, censusTable, jobId) => {
if (!pool) {
await this.initPool(context)
}
const sql = `
DECLARE @citt mtc_census_import.censusImportTableType
INSERT INTO @citt SELECT * FROM ${censusTable}
EXEC mtc_census_import.spPupilCensusImportFromStaging @censusImportTable = @citt, @jobId = ${jobId}
`
const request = new mssql.Request(pool)
const result = await request.query(sql)
return R.head(result.recordset)
}
/**
* Delete census import staging table
* @param {Object} context
* @param {Object} pool
* @param {String} censusTable
* @return {Object}
*/
module.exports.sqlDeleteStagingTable = async (context, pool, censusTable) => {
const request = new mssql.Request(pool)
const sql = `DROP TABLE ${censusTable};`
return request.query(sql)
}