-
Notifications
You must be signed in to change notification settings - Fork 5
Conversation
d8af868
to
e6284d6
Compare
Historical data can be imported from S3 (the data source for Athena), if needed, however that means that the memory store retention of Timestream has to be as long as the oldest entry that should be imported, which increases costs: pricing for memory store is 720 times higher than magnetic storage, so we can't just add a long memory store for every new user. After importing data, the memory retention period can be changed to a shorter period again.
For reference here is an import script: import { stackOutput } from '@bifravst/cloudformation-helpers'
import { CloudFormation, S3, TimestreamWrite } from 'aws-sdk'
import { promises as fs } from 'fs'
import * as path from 'path'
import { v4 } from 'uuid'
import { CORE_STACK_NAME } from './cdk/stacks/stackName'
import { messageToTimestreamRecords } from './historicalData/messageToTimestreamRecords'
import { shadowUpdateToTimestreamRecords } from './historicalData/shadowUpdateToTimestreamRecords'
import { storeRecordsInTimeseries } from './historicalData/storeRecordsInTimeseries'
const s3 = new S3({
region: process.env.AWS_DEFAULT_REGION,
})
const timestream = new TimestreamWrite({
region: process.env.AWS_DEFAULT_REGION,
})
const chunk = (arr: any[], size: number): any[][] =>
arr.reduce(
(chunks, el, i) =>
(i % size ? chunks[chunks.length - 1].push(el) : chunks.push([el])) &&
chunks,
[],
)
const fetchFiles = async () => {
try {
await fs.mkdir(path.resolve(process.cwd(), 'messages'))
const files = (await s3
.listObjects({
Bucket: process.env.HISTORICAL_DATA_BUCKET as string,
Prefix: 'updates',
})
.promise()
.then(({ Contents }) => Contents?.map(({ Key }) => Key))) as string[]
await Promise.all(
files?.map((file) =>
s3
.getObject({
Bucket: process.env.HISTORICAL_DATA_BUCKET as string,
Key: file,
})
.promise()
.then(async (res) => {
await fs.writeFile(
path.resolve(process.cwd(), 'messages', v4()),
res.Body as string,
'utf-8',
)
console.log(file, 'written')
}),
),
)
} catch {}
}
const main = async () => {
const { historicaldataTableInfo } = await stackOutput(
new CloudFormation({ region: process.env.AWS_DEFAULT_REGION }),
)(CORE_STACK_NAME)
console.log(historicaldataTableInfo)
const [DatabaseName, TableName] = historicaldataTableInfo.split('|')
const store = storeRecordsInTimeseries({
timestream,
DatabaseName,
TableName,
})
await fetchFiles()
const files = await fs.readdir(path.resolve(process.cwd(), 'messages'))
const records = (
await Promise.all(
files.map((file) =>
fs
.readFile(path.resolve(process.cwd(), 'messages', file), 'utf-8')
.then((s) =>
s
.split('\n')
.map((l) => {
const event = JSON.parse(l)
if ('reported' in event) {
return shadowUpdateToTimestreamRecords(event)
}
if ('message' in event) {
return messageToTimestreamRecords(event)
}
console.error({
error: 'Event ignored',
event,
})
return []
})
.flat()
.filter((s) => s !== undefined),
),
),
)
).flat() as TimestreamWrite.Records
await chunk(records, 100).reduce(
(p: Promise<void>, c: Record<string, any>[], k) =>
p.then(async () => {
console.log(`${(k + 1) * 100} / ${records.length}`)
return store(c)
}),
Promise.resolve(),
)
}
main() |
BREAKING CHANGE: This changes the historical data storage to Timestream See [this comment](#702 (comment)) in case you want to migrate existing device data.
BREAKING CHANGE: The switches the historical data storage to Timestream If you need to migrate existing historical data, [see this comment](bifravst/aws#702 (comment)).
See #394
If you need to migrate existing historical data, see this comment.