/
mixpanel_to_redshift.go
64 lines (58 loc) · 1.85 KB
/
mixpanel_to_redshift.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main
import (
"flag"
"fmt"
"log"
"strings"
"time"
"github.com/clever/pathio"
"github.com/clever/redshifter/mixpanel"
"github.com/clever/redshifter/redshift"
"github.com/segmentio/go-env"
)
var (
// TODO: include flag validation
awsRegion = env.MustGet("AWS_REGION")
jsonpathsFile = flag.String("jsonpathsfile", "", "s3 file with jsonpaths data.")
mixpanelEvents = flag.String("mixpanelevents", "", "Comma separated values of events to be exported.")
mixpanelExportDate = flag.String("exportdate",
time.Now().AddDate(0, 0, -1).Format("2006-01-02"),
"Date in YYYY-MM-DD format. Defaults to yesterday.")
mixpanelExportDir = flag.String("exportdir", "", "Directory to store the exported mixpanel data.")
redshiftTable = flag.String("redshifttable", "", "Name of the redshift table.")
exportFromMixpanel = flag.Bool("export", true, "Whether to export from mixpanel.")
copyToRedshift = flag.Bool("copy", true, "Whether to copy to redshift.")
)
func main() {
flag.Parse()
exportFile := fmt.Sprintf("%s/%s", *mixpanelExportDir, *mixpanelExportDate)
if *exportFromMixpanel {
mixpanelExport := mixpanel.NewExport()
log.Println("Exporting mixpanel data for", *mixpanelExportDate)
params := map[string]interface{}{
"event": strings.Split(*mixpanelEvents, ","),
"from_date": *mixpanelExportDate,
"to_date": *mixpanelExportDate,
}
body, err := mixpanelExport.Request("export", params)
if err != nil {
log.Fatal(err)
}
if err := pathio.Write(exportFile, body); err != nil {
log.Fatal(err)
}
}
if *copyToRedshift {
r, err := redshift.NewRedshift()
defer r.Close()
if err != nil {
log.Fatal(err)
}
if err := r.CopyJSONDataFromS3(*redshiftTable, exportFile, *jsonpathsFile, awsRegion); err != nil {
log.Fatal(err)
}
if err := r.VacuumAnalyze(); err != nil {
log.Fatal(err)
}
}
}