/
main.go
172 lines (148 loc) · 6.15 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package main
import (
"context"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/redshiftdataapiservice"
"github.com/aws/aws-sdk-go/service/redshiftdataapiservice/redshiftdataapiserviceiface"
"github.com/cevaris/ordered_map"
log "github.com/sirupsen/logrus"
"time"
)
// Declare redshiftclient client
var redshiftclient redshiftdataapiserviceiface.RedshiftDataAPIServiceAPI
type Redshift_Event struct {
Redshift_cluster_id string `json:"redshift_cluster_id"`
Redshift_database string `json:"redshift_database"`
Redshift_user string `json:"redshift_user"`
Redshift_iam_role string `json:"redshift_iam_role"`
Run_type string `json:"run_type"`
}
func main() {
// Create session
var sess = session.Must(session.NewSession())
// Initialize the redshift client
redshiftclient = redshiftdataapiservice.New(sess)
//Calling Handler Function
lambda.Start(HandleRequest)
}
func HandleRequest(ctx context.Context, name Redshift_Event) (string, error) {
log.Print("Inside Go Handler function!")
//var responses2 = make(map[string]string);
final_resp := ""
responses := ordered_map.NewOrderedMap()
// Cluster identifier for the Amazon Redshift cluster
redshift_cluster_id := name.Redshift_cluster_id
// Database name for the Amazon Redshift cluster
redshift_database := name.Redshift_database
// Database user in the Amazon Redshift cluster with access to execute relevant SQL queries
redshift_user := name.Redshift_user
// IAM Role of Amazon Redshift cluster having access to S3
redshift_iam_role := name.Redshift_iam_role
// run_type can be either asynchronous or synchronous; try tweaking based on your requirement
run_type := name.Run_type
if run_type != "synchronous" && run_type != "asynchronous" {
// Fatal functions call os.Exit(1) after writing the log message
log.Fatal("Invalid Event run_type. \n run_type has to be synchronous or asynchronous.")
}
isSynchronous := false
if run_type == "synchronous" {
isSynchronous = true
} else {
isSynchronous = false
}
log.Print("Run-Type Mode ", run_type)
// Initiate OrderedMap key value pair for query and its type
sql_statements := ordered_map.NewOrderedMap()
sql_statements.Set("CREATE", "CREATE TABLE IF NOT EXISTS public.region (\n"+
" R_REGIONKEY bigint NOT NULL,\n"+
" R_NAME varchar(25),\n"+" R_COMMENT varchar(152))\n"+"diststyle all;")
sql_statements.Set("COPY", "COPY region FROM 's3://redshift-immersionday-labs/data/region/region.tbl.lzo'\n"+
"iam_role '"+redshift_iam_role+"' \n"+"region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;")
sql_statements.Set("UPDATE", "UPDATE public.region SET r_regionkey= 5 WHERE r_name ='AFRICA';")
sql_statements.Set("DELETE", "DELETE FROM public.region where r_name = 'MIDDLE EAST';")
sql_statements.Set("SELECT", "SELECT r_regionkey, r_name FROM public.region;")
log.Print("Running sql queries in ", run_type, " mode!")
// Iterating over ordered map to execute each sql statement
iter := sql_statements.IterFunc()
for kv, ok := iter(); ok; kv, ok = iter() {
command:= kv.Key.(string)
query:= kv.Value.(string)
log.Print("Example of ", command, ":")
log.Print("Running Query ", query)
responses.Set(command, execute_sql_data_api(redshift_database, command, query, redshift_user, redshift_cluster_id, isSynchronous))
}
// returning resultset in execution ordered fashion
iter1 := responses.IterFunc()
for kv, ok := iter1(); ok; kv, ok = iter1() {
command:= kv.Key.(string)
status:= kv.Value.(string)
final_resp += command + ":" + status + " | "
}
return final_resp, nil
}
func execute_sql_data_api(redshift_database string, command string, query string, redshift_user string, redshift_cluster_id string, isSynchronous bool) string {
var max_wait_cycles = 20
var attempts = 0
var query_status = ""
done := false
// Calling Redshift Data API with executeStatement()
execstmt_req, execstmt_err := redshiftclient.ExecuteStatement(&redshiftdataapiservice.ExecuteStatementInput{
ClusterIdentifier: aws.String(redshift_cluster_id),
DbUser: aws.String(redshift_user),
Database: aws.String(redshift_database),
Sql: aws.String(query),
})
if execstmt_err != nil {
// logs error and exists
log.Fatal(execstmt_err)
}
descstmt_req, descstmt_err := redshiftclient.DescribeStatement(&redshiftdataapiservice.DescribeStatementInput{
Id: execstmt_req.Id,
})
query_status = aws.StringValue(descstmt_req.Status)
if descstmt_err != nil {
// logs error and exists
log.Fatal(descstmt_err)
}
//Wait until query is finished or max cycles limit has been reached.
for done == false && isSynchronous && attempts < max_wait_cycles {
attempts += 1
time.Sleep(1 * time.Second)
descstmt_req, descstmt_err := redshiftclient.DescribeStatement(&redshiftdataapiservice.DescribeStatementInput{
Id: execstmt_req.Id,
})
query_status = aws.StringValue(descstmt_req.Status)
if query_status == "FAILED" {
// Fatal functions call os.Exit(1) after writing the log message
log.Fatal("Query status: ", query_status, " .... for query--> ", query)
} else if query_status == "FINISHED" {
log.Print("Query status: ", query_status, " .... for query--> ", query)
done = true
if *descstmt_req.HasResultSet {
getresult_req, getresult_err := redshiftclient.GetStatementResult(&redshiftdataapiservice.GetStatementResultInput{
Id: execstmt_req.Id,
})
if getresult_err != nil {
// logs error and exists
log.Fatal(getresult_err)
}
log.Print(getresult_req.Records)
}
} else {
log.Print("Currently working... query status: ", query_status, " .... for query--> ", query)
}
if descstmt_err != nil {
// logs error and exists
log.Fatal(descstmt_err)
}
}
//Timeout Precaution
if done == false && attempts >= max_wait_cycles && isSynchronous {
log.Print("Query status: ", query_status, " .... for query--> ", query)
// Fatal functions call os.Exit(1) after writing the log message
log.Fatal("Limit for max_wait_cycles has been reached before the query was able to finish. We have exited out of the while-loop. You may increase the limit accordingly.")
}
return query_status
}