-
Notifications
You must be signed in to change notification settings - Fork 0
/
csv.ts
96 lines (86 loc) · 3.18 KB
/
csv.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import { etlHelper, Format, Source } from '@caldwell619/etl-helper'
import { parseFloatStrict, durableParseFloat } from '@caldwell619/durable-parse-float'
import { z } from 'zod'
import { readFileSync } from 'fs'
import { resolve } from 'path'
import { csvInputSchema, CovidResponseOutput, validateOutput } from './schema'
const csvPath = resolve(process.cwd(), 'src', 'daily.csv')
const csvData = readFileSync(csvPath)
type CovidResponseCsv = z.infer<typeof csvInputSchema>
const validateInput = (input: CovidResponseCsv): boolean => {
const result = csvInputSchema.safeParse(input)
if (!result.success) {
console.error(result.error)
// Do something with the error, reporting, DLQ, or if it meets your special criteria, return `true`
}
return result.success
}
const urlSource: Source<CovidResponseCsv> = {
url: 'https://api.covidtracking.com/v1/us/daily.csv',
}
/**
* This may or may not be necessary depending on your use case.
* Some engines convert these strings automatically upon insert.
* If that's the case, you can just use the same schema and this step is unnecessary
*/
const transformer = (input: CovidResponseCsv): CovidResponseOutput => {
return {
hash: input.hash,
lastModified: input.lastModified,
date: parseFloatStrict(input.date),
states: parseFloatStrict(input.states),
positive: durableParseFloat(input.positive),
negative: durableParseFloat(input.negative),
pending: durableParseFloat(input.pending),
hospitalizedCurrently: durableParseFloat(input.hospitalizedCurrently),
hospitalizedCumulative: durableParseFloat(input.hospitalizedCumulative),
inIcuCurrently: durableParseFloat(input.inIcuCurrently),
inIcuCumulative: durableParseFloat(input.inIcuCumulative),
onVentilatorCurrently: durableParseFloat(input.onVentilatorCurrently),
onVentilatorCumulative: durableParseFloat(input.onVentilatorCumulative),
dateChecked: input.dateChecked,
death: durableParseFloat(input.death),
hospitalized: durableParseFloat(input.hospitalized),
totalTestResults: durableParseFloat(input.totalTestResults),
recovered: null,
total: durableParseFloat(input.total),
posNeg: durableParseFloat(input.posNeg),
deathIncrease: durableParseFloat(input.deathIncrease),
hospitalizedIncrease: durableParseFloat(input.hospitalizedIncrease),
negativeIncrease: durableParseFloat(input.negativeIncrease),
positiveIncrease: durableParseFloat(input.positiveIncrease),
totalTestResultsIncrease: durableParseFloat(input.totalTestResultsIncrease),
ingestionDate: new Date(),
}
}
const urlProvided = async () => {
await etlHelper<CovidResponseCsv, CovidResponseOutput>({
source: urlSource,
format: Format.CSV,
validateInput,
transformer,
validateOutput,
persist: async outputs => {
console.log(outputs.length)
},
})
}
const dataProvided = async () => {
await etlHelper<CovidResponseCsv, CovidResponseOutput>({
source: {
data: csvData,
},
format: Format.CSV,
validateInput,
transformer,
validateOutput,
persist: async outputs => {
console.log(outputs.length)
},
})
}
const csv = async () => {
await urlProvided()
await dataProvided()
}
csv()