forked from taggledevel2/ratchet
/
redshift_writer.go
154 lines (130 loc) · 4.61 KB
/
redshift_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package processors
import (
"database/sql"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/indosatppi/ratchet/v3/data"
"github.com/indosatppi/ratchet/v3/util"
)
type redshiftManifest struct {
Entries []redshiftManifestEntry `json:"entries"`
}
type redshiftManifestEntry struct {
URL string `json:"url"`
Mandatory bool `json:"mandatory"`
}
// RedshiftWriter gets data into a Redshift table by first uploading data batches to S3.
// Once all data is uploaded to S3, the appropriate "COPY" command is executed against the
// database to import the data files.
//
// This processor is not set up to do any fancy merging; rather, it writes every row received
// to the table defined. An ideal use case is writing data to a temporary table that is later
// merged into your production dataset.
type RedshiftWriter struct {
awsID string
awsSecret string
awsRegion string
bucket string
config *aws.Config
db *sql.DB
prefix string
tableName string
manifestEntries []redshiftManifestEntry
data []string
BatchSize int
Compress bool
manifestPath string
// If the file name should be a fixed width, specify that here.
// Files uploaded to S3 will be zero-padded to this width.
// Defaults to 10.
FileNameWidth int
}
// NewRedshiftProcessor returns a reference to a new Redshift Processor
func NewRedshiftWriter(db *sql.DB, tableName, awsID, awsSecret, awsRegion, bucket, prefix string) *RedshiftWriter {
p := RedshiftWriter{
awsID: awsID,
awsSecret: awsSecret,
awsRegion: awsRegion,
bucket: bucket,
db: db,
prefix: prefix,
tableName: tableName,
BatchSize: 1000,
Compress: true,
FileNameWidth: 10,
}
creds := credentials.NewStaticCredentials(awsID, awsSecret, "")
p.config = aws.NewConfig().WithRegion(awsRegion).WithDisableSSL(true).WithCredentials(creds)
return &p
}
// ProcessData stores incoming data in a local var. Once enough data has been received (as defined
// by r.BatchSize), it will write a file out to S3 and reset the local var
func (r *RedshiftWriter) ProcessData(d data.JSON, outputChan chan data.JSON, killChan chan error) {
objects, err := data.ObjectsFromJSON(d)
util.KillPipelineIfErr(err, killChan)
for _, obj := range objects {
dd, err := data.NewJSON(obj)
util.KillPipelineIfErr(err, killChan)
r.data = append(r.data, string(dd))
// Flush the data if we've hit the threshold of records
if r.BatchSize > 0 && len(r.data) >= r.BatchSize {
r.flushFiles(killChan)
}
}
}
// Finish writes any remaining records to a file on S3, creates the manifest file, and then
// kicks off the query to import the S3 files into the Redshift table
func (r *RedshiftWriter) Finish(outputChan chan data.JSON, killChan chan error) {
r.flushFiles(killChan)
r.createManifest(killChan)
r.copyToRedshift(killChan)
}
func (r *RedshiftWriter) flushFiles(killChan chan error) {
formatString := fmt.Sprintf("%%0%vv", r.FileNameWidth)
fileSuffix := fmt.Sprintf(formatString, len(r.manifestEntries))
fileName := fmt.Sprintf("%vfile.%v", r.prefix, fileSuffix)
_, err := util.WriteS3Object(r.data, r.config, r.bucket, fileName, "\n", r.Compress)
util.KillPipelineIfErr(err, killChan)
if r.Compress {
fileName += ".gz"
}
entry := redshiftManifestEntry{
URL: fmt.Sprintf("s3://%v/%v", r.bucket, fileName),
Mandatory: true,
}
r.manifestEntries = append(r.manifestEntries, entry)
r.data = nil
}
func (r *RedshiftWriter) createManifest(killChan chan error) {
manifest := redshiftManifest{Entries: r.manifestEntries}
manifestData, err := data.NewJSON(manifest)
util.KillPipelineIfErr(err, killChan)
dd := []string{string(manifestData)}
r.manifestPath = fmt.Sprintf("%vfile.manifest", r.prefix)
_, err = util.WriteS3Object(dd, r.config, r.bucket, r.manifestPath, "\n", false)
util.KillPipelineIfErr(err, killChan)
}
func (r *RedshiftWriter) copyToRedshift(killChan chan error) {
err := util.ExecuteSQLQuery(r.db, r.copyQuery())
util.KillPipelineIfErr(err, killChan)
}
func (r *RedshiftWriter) copyQuery() string {
compression := ""
if r.Compress {
compression = "GZIP"
}
query := fmt.Sprintf(`
COPY %v
FROM 's3://%v/%v'
REGION '%v'
CREDENTIALS 'aws_access_key_id=%v;aws_secret_access_key=%v'
MANIFEST
JSON 'auto'
%v
`, r.tableName, r.bucket, r.manifestPath, r.awsRegion, r.awsID, r.awsSecret, compression)
return query
}
func (r *RedshiftWriter) String() string {
return "RedshiftWriter"
}