forked from aliyun/aliyun-log-go-sdk
-
Notifications
You must be signed in to change notification settings - Fork 1
/
etl_example.go
106 lines (91 loc) · 2.64 KB
/
etl_example.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"encoding/json"
"fmt"
"time"
sls "github.com/aliyun/aliyun-log-go-sdk"
)
const (
endpoint = "your endpoint" // https://help.aliyun.com/document_detail/29008.html
accessKeyId = "your akId"
accessKeySecret = "your akSecret"
securityToken = ""
projectName = "your project name"
logStoreName = "your logstore name"
etlJobName = "your etl job name"
etlScript = "your etl script"
)
func main() {
// create the client with ak and endpoint
client := sls.CreateNormalInterface(endpoint, accessKeyId, accessKeySecret, securityToken)
// create the ETL Job
if err := client.CreateETL(projectName, getETLJob(etlJobName, etlScript)); err != nil {
fmt.Println(err)
}
// get the ETL job
if etlJob, err := client.GetETL(projectName, etlJobName); err != nil {
fmt.Println(err)
} else {
detail, _ := json.Marshal(etlJob)
fmt.Println(string(detail))
etlJob.Configuration.Script = "e_set(\"k\", \"v\")"
// update the ETL Job
if err := client.UpdateETL(projectName, *etlJob); err != nil {
fmt.Println(err)
}
// update and restart the ETL Job
if err := client.RestartETL(projectName, *etlJob); err != nil {
fmt.Println(err)
}
}
// list the ETL jobs under the project
if etlJobs, err := client.ListETL(projectName, 0, 10); err != nil {
fmt.Println(err)
} else {
detail, _ := json.Marshal(etlJobs.Results)
fmt.Println(string(detail))
fmt.Println(etlJobs.Total)
fmt.Println(etlJobs.Count)
}
// stop the ETL Job
if err := client.StopETL(projectName, etlJobName); err != nil {
fmt.Println(err)
}
// start the ETL Job
if err := client.StartETL(projectName, etlJobName); err != nil {
fmt.Println(err)
}
}
func getETLJob(etlJobName string, etlScript string) sls.ETL {
// configuration for ETL output target (sink); you may have one or more sink configurations
sink := sls.ETLSink{
Name: "target0",
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
Endpoint: endpoint,
Project: projectName,
Logstore: "target_logstore_name",
}
config := sls.ETLConfiguration{
Version: 2,
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
Logstore: logStoreName,
FromTime: time.Now().Unix(),
Script: etlScript,
Parameters: map[string]string{},
ETLSinks: []sls.ETLSink{sink},
}
schedule := sls.ETLSchedule{
Type: "Resident",
}
etljob := sls.ETL{
Configuration: config,
DisplayName: "ETL Job DisplayName",
Description: "This ETL job is created by aliyun-log-go-sdk",
Name: etlJobName,
Schedule: schedule,
Type: "ETL",
}
return etljob
}