/
analyzeCUR.go
802 lines (694 loc) · 26 KB
/
analyzeCUR.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/BurntSushi/toml"
"github.com/andyfase/CURDashboard/go/curconvert"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/jcxplorer/cwlogger"
)
/*
Structs Below are used to contain configuration parsed in
*/
type General struct {
Namespace string
}
type RI struct {
Enabled bool `toml:"enableRIanalysis"`
TotalUtilization bool `toml:"enableRITotalUtilization"`
PercentThreshold int `toml:"riPercentageThreshold"`
TotalThreshold int `toml:"riTotalThreshold"`
CwName string
CwNameTotal string
CwDimension string
CwDimensionTotal string
CwType string
Sql string
Ignore map[string]int
}
type Metric struct {
Enabled bool
Hourly bool
Daily bool
Type string
SQL string
CwName string
CwDimension string
CwType string
}
type Athena struct {
DbSQL string `toml:"create_database"`
TablePrefix string `toml:"table_prefix"`
TableSQL string `toml:"create_table"`
DbName string `toml:"database_name"`
}
type AthenaResponse struct {
Rows []map[string]string
}
type MetricConfig struct {
Substring map[string]string
}
type Config struct {
General General
RI RI
Athena Athena
MetricConfig MetricConfig
Metrics []Metric
}
/*
End of configuraton structs
*/
var defaultConfigPath = "./analyzeCUR.config"
var maxConcurrentQueries = 5
func getInstanceMetadata(sess *session.Session) map[string]interface{} {
c := &http.Client{
Timeout: 100 * time.Millisecond,
}
resp, err := c.Get("http://169.254.169.254/latest/dynamic/instance-identity/document")
var m map[string]interface{}
if err == nil {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
err = json.Unmarshal(body, &m)
if err != nil {
log.Fatalln("Could not parse MetaData, erorr: " + err.Error())
}
}
}
// if we havent obtained instance meta-data fetch account from STS - likely were not on EC2
_, ok := m["region"].(string)
if !ok {
svc := sts.New(sess)
result, err := svc.GetCallerIdentity(&sts.GetCallerIdentityInput{})
if err == nil {
m = make(map[string]interface{})
m["accountId"] = *result.Account
m["region"] = *sess.Config.Region
}
}
return m
}
/*
Function reads in and validates command line parameters
*/
func getParams(configFile *string, sourceBucket *string, destBucket *string, account *string, curReportName *string, curReportPath *string, curDestPath *string, dateOverride *string) error {
// Define input command line config parameter and parse it
flag.StringVar(configFile, "config", defaultConfigPath, "Input config file for analyzeDBR")
flag.StringVar(sourceBucket, "bucket", "", "AWS Bucket where CUR files sit")
flag.StringVar(destBucket, "destbucket", "", "AWS Bucket where where Parquet files will be uploaded (Optional - use as override-only) ")
flag.StringVar(account, "account", "", "AWS Account #")
flag.StringVar(curReportName, "reportname", "", "CUR Report Name")
flag.StringVar(curReportPath, "reportpath", "", "CUR Report PAth")
flag.StringVar(curDestPath, "destpath", "", "Destination Path for converted CUR to be uploaded too")
flag.StringVar(dateOverride, "date", "", "Optional date flag to over-ride the processing CUR month")
flag.Parse()
// check input against defined regex's
regexEmpty := regexp.MustCompile(`^$`)
regexAccount := regexp.MustCompile(`^\d+$`)
if regexEmpty.MatchString(*sourceBucket) {
return errors.New("Config Error: Must provide valid AWS DBR bucket")
}
if !regexAccount.MatchString(*account) {
return errors.New("Config Error: Must provide valid AWS account number")
}
if regexEmpty.MatchString(*curReportName) {
return errors.New("Config Error: Must provide valid CUR Report Name")
}
if destBucket == nil || len(*destBucket) < 1 {
*destBucket = *sourceBucket
}
return nil
}
/*
Function reads in configuration file provided in configFile input
Config file is stored in TOML format
*/
func getConfig(conf *Config, configFile string) error {
// check for existance of file
if _, err := os.Stat(configFile); err != nil {
return errors.New("Config File " + configFile + " does not exist")
}
// read file
b, err := ioutil.ReadFile(configFile)
if err != nil {
return errors.New("Error Reading TOML config file: " + err.Error())
}
// parse TOML config file into struct
if _, err := toml.Decode(string(b), &conf); err != nil {
return errors.New("Error Decoding TOML config file: " + err.Error())
}
return nil
}
/*
Function substitutes parameters into SQL command.
Input map contains key (thing to look for in input sql) and if found replaces with given value)
*/
func substituteParams(sql string, params map[string]string) string {
for sub, value := range params {
sql = strings.Replace(sql, sub, value, -1)
}
return sql
}
/*
Function takes SQL to send to Athena converts into JSON to send to Athena HTTP proxy and then sends it.
Then recieves responses in JSON which is converted back into a struct and returned
*/
func sendQuery(svc *athena.Athena, db string, sql string, account string, region string) (AthenaResponse, error) {
var results AthenaResponse
var s athena.StartQueryExecutionInput
s.SetQueryString(sql)
var q athena.QueryExecutionContext
q.SetDatabase(db)
s.SetQueryExecutionContext(&q)
var r athena.ResultConfiguration
r.SetOutputLocation("s3://aws-athena-query-results-" + account + "-" + region + "/")
s.SetResultConfiguration(&r)
result, err := svc.StartQueryExecution(&s)
if err != nil {
return results, errors.New("Error Querying Athena, StartQueryExecution: " + err.Error())
}
var qri athena.GetQueryExecutionInput
qri.SetQueryExecutionId(*result.QueryExecutionId)
var qrop *athena.GetQueryExecutionOutput
duration := time.Duration(2) * time.Second // Pause for 2 seconds
for {
qrop, err = svc.GetQueryExecution(&qri)
if err != nil {
return results, errors.New("Error Querying Athena, GetQueryExecution: " + err.Error())
}
if *qrop.QueryExecution.Status.State != "RUNNING" {
break
}
time.Sleep(duration)
}
if *qrop.QueryExecution.Status.State != "SUCCEEDED" {
return results, errors.New("Error Querying Athena, completion state is NOT SUCCEEDED, state is: " + *qrop.QueryExecution.Status.State)
}
var ip athena.GetQueryResultsInput
ip.SetQueryExecutionId(*result.QueryExecutionId)
// loop through results (paginated call)
var colNames []string
err = svc.GetQueryResultsPages(&ip,
func(page *athena.GetQueryResultsOutput, lastPage bool) bool {
for row := range page.ResultSet.Rows {
if len(colNames) < 1 { // first row contains column names - which we use in any subsequent rows to produce map[columnname]values
for j := range page.ResultSet.Rows[row].Data {
colNames = append(colNames, *page.ResultSet.Rows[row].Data[j].VarCharValue)
}
} else {
result := make(map[string]string)
skip := false
for j := range page.ResultSet.Rows[row].Data {
if j < len(colNames) {
if page.ResultSet.Rows[row].Data[j].VarCharValue == nil {
skip = true
break
}
result[colNames[j]] = *page.ResultSet.Rows[row].Data[j].VarCharValue
}
}
if len(result) > 0 && !skip {
results.Rows = append(results.Rows, result)
}
}
}
if lastPage {
return false // return false to end paginated calls
}
return true // keep going if there are more pages to fetch
})
if err != nil {
return results, errors.New("Error Querying Athena, GetQueryResultsPages: " + err.Error())
}
return results, nil
}
/*
Function takes metric data (from Athena etal) and sends through to cloudwatch.
*/
func sendMetric(svc *cloudwatch.CloudWatch, data AthenaResponse, cwNameSpace string, cwName string, cwType string, cwDimensionName string, interval string) error {
input := cloudwatch.PutMetricDataInput{}
input.Namespace = aws.String(cwNameSpace)
i := 0
for row := range data.Rows {
// skip metric if dimension or value is empty
if len(data.Rows[row]["dimension"]) < 1 || len(data.Rows[row]["value"]) < 1 {
continue
}
// send Metric Data as we have reached 20 records, and clear MetricData Array
if i >= 20 {
_, err := svc.PutMetricData(&input)
if err != nil {
return errors.New("Could sending CW Metric: " + err.Error())
}
input.MetricData = nil
i = 0
}
var t time.Time
if interval == "hourly" {
t, _ = time.Parse("2006-01-02T15", data.Rows[row]["date"])
} else {
t, _ = time.Parse("2006-01-02", data.Rows[row]["date"])
}
v, _ := strconv.ParseFloat(data.Rows[row]["value"], 64)
metric := cloudwatch.MetricDatum{
MetricName: aws.String(cwName),
Timestamp: aws.Time(t),
Unit: aws.String(cwType),
Value: aws.Float64(v),
}
// Dimension can be a single or comma seperated list of values, or key/values
// presence of "=" sign in value designates key=value. Otherwise input cwDimensionName is used as key
d := strings.Split(data.Rows[row]["dimension"], ",")
for i := range d {
var dname, dvalue string
if strings.Contains(d[i], "=") {
dTuple := strings.Split(d[i], "=")
dname = dTuple[0]
dvalue = dTuple[1]
} else {
dname = cwDimensionName
dvalue = d[i]
}
cwD := cloudwatch.Dimension{
Name: aws.String(dname),
Value: aws.String(dvalue),
}
metric.Dimensions = append(metric.Dimensions, &cwD)
}
// add interval metric
metric.Dimensions = append(metric.Dimensions, &cloudwatch.Dimension{Name: aws.String("interval"), Value: aws.String(interval)})
// append to overall Metric Data
input.MetricData = append(input.MetricData, &metric)
i++
}
// if we still have data to send - send it
if len(input.MetricData) > 0 {
_, err := svc.PutMetricData(&input)
if err != nil {
return errors.New("Could sending CW Metric: " + err.Error())
}
}
return nil
}
/*
Function processes a single hours worth of RI usage and compares against available RIs to produce % utiization / under-utilization
*/
func riUtilizationHour(svc *cloudwatch.CloudWatch, date string, used map[string]map[string]map[string]int, azRI map[string]map[string]map[string]int, regionRI map[string]map[string]int, conf Config, region string) error {
// // Perform Deep Copy of both RI maps.
// // We need a copy of the maps as we decrement the RI's available by the hourly usage and a map is a pointer
// // hence decrementing the original maps will affect the pass-by-reference data
// cpy := deepcopy.Copy(azRI)
// t_azRI, ok := cpy.(map[string]map[string]map[string]int)
// if !ok {
// return errors.New("could not copy AZ RI map")
// }
// cpy = deepcopy.Copy(regionRI)
// t_regionRI, ok := cpy.(map[string]map[string]int)
// if !ok {
// return errors.New("could not copy Regional RI map")
// }
// // Iterate through used hours decrementing any available RI's per hour's that were used
// // AZ specific RI's are first checked and then regional RI's
// for az := range used {
// for instance := range used[az] {
// // check if azRI for this region even exist
// _, ok := t_azRI[az][instance]
// if ok {
// for platform := range used[az][instance] {
// // check if azRI for this region and platform even exists
// _, ok2 := t_azRI[az][instance][platform]
// if ok2 {
// // More RI's than we used
// if t_azRI[az][instance][platform] >= used[az][instance][platform] {
// t_azRI[az][instance][platform] -= used[az][instance][platform]
// used[az][instance][platform] = 0
// } else {
// // Less RI's than we used
// used[az][instance][platform] -= t_azRI[az][instance][platform]
// t_azRI[az][instance][platform] = 0
// }
// }
// }
// }
// // check if regionRI even exists and that instance used is in the right region
// _, ok = t_regionRI[instance]
// if ok && az[:len(az)-1] == region {
// for platform := range used[az][instance] {
// // if we still have more used instances check against regional RI's
// if used[az][instance][platform] > 0 && t_regionRI[instance][platform] > 0 {
// if t_regionRI[instance][platform] >= used[az][instance][platform] {
// t_regionRI[instance][platform] -= used[az][instance][platform]
// used[az][instance][platform] = 0
// } else {
// used[az][instance][platform] -= t_regionRI[instance][platform]
// t_regionRI[instance][platform] = 0
// }
// }
// }
// }
// }
// }
// // Now loop through the temp RI data to check if any RI's are still available
// // If they are and the % of un-use is above the configured threshold then colate for sending to cloudwatch
// // We sum up the total of regional and AZ specific RI's so that we get one instance based metric regardless of region or AZ RI
// i_unused := make(map[string]map[string]int)
// i_total := make(map[string]map[string]int)
// var unused int
// var total int
// for az := range t_azRI {
// for instance := range t_azRI[az] {
// _, ok := i_unused[instance]
// if !ok {
// i_unused[instance] = make(map[string]int)
// i_total[instance] = make(map[string]int)
// }
// for platform := range t_azRI[az][instance] {
// i_total[instance][platform] = azRI[az][instance][platform]
// i_unused[instance][platform] = t_azRI[az][instance][platform]
// total += azRI[az][instance][platform]
// unused += t_azRI[az][instance][platform]
// }
// }
// }
// for instance := range t_regionRI {
// for platform := range t_regionRI[instance] {
// _, ok := i_unused[instance]
// if !ok {
// i_unused[instance] = make(map[string]int)
// i_total[instance] = make(map[string]int)
// }
// i_total[instance][platform] += regionRI[instance][platform]
// i_unused[instance][platform] += t_regionRI[instance][platform]
// total += regionRI[instance][platform]
// unused += t_regionRI[instance][platform]
// }
// }
// // loop over per-instance utilization and build metrics to send
// metrics := AthenaResponse{}
// for instance := range i_unused {
// _, ok := conf.RI.Ignore[instance]
// if !ok { // instance not on ignore list
// for platform := range i_unused[instance] {
// percent := (float64(i_unused[instance][platform]) / float64(i_total[instance][platform])) * 100
// if int(percent) > conf.RI.PercentThreshold && i_total[instance][platform] > conf.RI.TotalThreshold {
// metrics.Rows = append(metrics.Rows, map[string]string{"dimension": "instance=" + instance + ",platform=" + platform, "date": date, "value": strconv.FormatInt(int64(percent), 10)})
// }
// }
// }
// }
// // send per instance type under-utilization
// if len(metrics.Rows) > 0 {
// if err := sendMetric(svc, metrics, conf.General.Namespace, conf.RI.CwName, conf.RI.CwType, conf.RI.CwDimension); err != nil {
// log.Fatal(err)
// }
// }
// // If confured send overall total utilization
// if conf.RI.TotalUtilization {
// percent := 100 - ((float64(unused) / float64(total)) * 100)
// total := AthenaResponse{}
// total.Rows = append(total.Rows, map[string]string{"dimension": "hourly", "date": date, "value": strconv.FormatInt(int64(percent), 10)})
// if err := sendMetric(svc, total, conf.General.Namespace, conf.RI.CwNameTotal, conf.RI.CwType, conf.RI.CwDimensionTotal); err != nil {
// log.Fatal(err)
// }
// }
return nil
}
/*
Main RI function. Gest RI and usage data (from Athena).
Then loops through every hour and calls riUtilizationHour to process each hours worth of data
*/
func riUtilization(sess *session.Session, svcAthena *athena.Athena, conf Config, key string, secret string, region string, account string, date string) error {
// svc := ec2.New(sess)
// params := &ec2.DescribeReservedInstancesInput{
// DryRun: aws.Bool(false),
// Filters: []*ec2.Filter{
// {
// Name: aws.String("state"),
// Values: []*string{
// aws.String("active"),
// },
// },
// },
// }
// resp, err := svc.DescribeReservedInstances(params)
// if err != nil {
// return err
// }
// az_ri := make(map[string]map[string]map[string]int)
// region_ri := make(map[string]map[string]int)
// // map in number of RI's available both AZ specific and regional
// for i := range resp.ReservedInstances {
// ri := resp.ReservedInstances[i]
// // Trim VPC identifier of Platform type as its not relevant for RI Utilization calculations
// platform := strings.TrimSuffix(*ri.ProductDescription, " (Amazon VPC)")
// if *ri.Scope == "Availability Zone" {
// _, ok := az_ri[*ri.AvailabilityZone]
// if !ok {
// az_ri[*ri.AvailabilityZone] = make(map[string]map[string]int)
// }
// _, ok = az_ri[*ri.AvailabilityZone][*ri.InstanceType]
// if !ok {
// az_ri[*ri.AvailabilityZone][*ri.InstanceType] = make(map[string]int)
// }
// az_ri[*ri.AvailabilityZone][*ri.InstanceType][platform] += int(*ri.InstanceCount)
// } else if *ri.Scope == "Region" {
// _, ok := region_ri[*ri.InstanceType]
// if !ok {
// region_ri[*ri.InstanceType] = make(map[string]int)
// }
// region_ri[*ri.InstanceType][platform] += int(*ri.InstanceCount)
// }
// }
// // Fetch RI hours used
// data, err := sendQuery(svcAthena, conf.Athena.DbName, substituteParams(conf.RI.Sql, map[string]string{"**DATE**": date}), region, account)
// if err != nil {
// log.Fatal(err)
// }
// // loop through response data and generate map of hourly usage, per AZ, per instance, per platform
// hours := make(map[string]map[string]map[string]map[string]int)
// for row := range data.Rows {
// _, ok := hours[data.Rows[row]["date"]]
// if !ok {
// hours[data.Rows[row]["date"]] = make(map[string]map[string]map[string]int)
// }
// _, ok = hours[data.Rows[row]["date"]][data.Rows[row]["az"]]
// if !ok {
// hours[data.Rows[row]["date"]][data.Rows[row]["az"]] = make(map[string]map[string]int)
// }
// _, ok = hours[data.Rows[row]["date"]][data.Rows[row]["az"]][data.Rows[row]["instance"]]
// if !ok {
// hours[data.Rows[row]["date"]][data.Rows[row]["az"]][data.Rows[row]["instance"]] = make(map[string]int)
// }
// v, _ := strconv.ParseInt(data.Rows[row]["hours"], 10, 64)
// hours[data.Rows[row]["date"]][data.Rows[row]["az"]][data.Rows[row]["instance"]][data.Rows[row]["platform"]] += int(v)
// }
// // Create new cloudwatch client.
// svcCloudwatch := cloudwatch.New(sess)
// // Iterate through each hour and compare the number of instances used vs the number of RIs available
// // If RI leftover percentage is > 1% push to cloudwatch
// for hour := range hours {
// if err := riUtilizationHour(svcCloudwatch, hour, hours[hour], az_ri, region_ri, conf, region); err != nil {
// return err
// }
// }
return nil
}
func processCUR(sourceBucket string, reportName string, reportPath string, destPath string, destBucket string, logger *cwlogger.Logger, dateOverride string) ([]curconvert.CurColumn, string, string, error) {
var t1 time.Time
var err error
if len(dateOverride) == 8 {
t1, err = time.Parse("20060102", dateOverride)
if err != nil {
return nil, "", "", errors.New("Could not parse given date ovrride: " + dateOverride + ", " + err.Error())
}
} else {
t1 = time.Now()
}
t1First := time.Date(t1.Year(), t1.Month(), 1, 0, 0, 0, 0, time.Local)
t2 := t1First.AddDate(0, 1, 0)
t2First := time.Date(t2.Year(), t2.Month(), 1, 0, 0, 0, 0, time.Local)
curDate := fmt.Sprintf("%d%02d01-%d%02d01", t1First.Year(), t1First.Month(), t2First.Year(), t2First.Month())
manifest := reportPath + "/" + curDate + "/" + reportName + "-Manifest.json"
// Set or extend destPath
destPathDate := fmt.Sprintf("%d%02d", t1First.Year(), t1First.Month())
var destPathFull string
if len(destPath) < 1 {
destPathFull = "parquet-cur/" + destPathDate
} else {
destPathFull = destPath + "/" + destPathDate
}
// Init CUR Converter
cc := curconvert.NewCurConvert(sourceBucket, manifest, destBucket, destPathFull)
// Check current months manifest exists
if err := cc.CheckCURExists(); err != nil {
if err.(awserr.Error).Code() != s3.ErrCodeNoSuchKey {
return nil, "", "", errors.New("Error fetching CUR Manifest: " + err.Error())
}
if t1.Day() > 3 {
return nil, "", "", errors.New("Error fetching CUR Manifest, NoSuchKey and too delayed: " + err.Error())
}
// Regress to processing last months CUR. Error is ErrCodeNoSuchKey and still early in the month
doLog(logger, "Reseting to previous months CUR for "+reportName)
t1First = t1First.AddDate(0, 0, -1)
t2First = t2First.AddDate(0, 0, -1)
curDate = fmt.Sprintf("%d%02d01-%d%02d01", t1First.Year(), t1First.Month(), t2First.Year(), t2First.Month())
destPathDate = fmt.Sprintf("%d%02d", t1First.Year(), t1First.Month())
manifest := reportPath + "/" + curDate + "/" + reportName + "-Manifest.json"
cc.SetSourceManifest(manifest)
if len(destPath) < 1 {
destPathFull = "parquet-cur/" + destPathDate
} else {
destPathFull = destPath + "/" + destPathDate
}
cc.SetDestPath(destPathFull)
}
// Convert CUR
if err := cc.ConvertCur(); err != nil {
return nil, "", "", errors.New("Could not convert CUR: " + err.Error())
}
cols, err := cc.GetCURColumns()
if err != nil {
return nil, "", "", errors.New("Could not obtain CUR columns: " + err.Error())
}
return cols, "s3://" + destBucket + "/" + destPathFull + "/", destPathDate, nil
}
func createAthenaTable(svcAthena *athena.Athena, dbName string, tablePrefix string, sql string, columns []curconvert.CurColumn, s3Path string, date string, region string, account string) error {
var cols string
for col := range columns {
cols += "`" + columns[col].Name + "` " + columns[col].Type + ",\n"
}
cols = cols[:strings.LastIndex(cols, ",")]
sql = substituteParams(sql, map[string]string{"**DBNAME**": dbName, "**PREFIX**": tablePrefix, "**DATE**": date, "**COLUMNS**": cols, "**S3**": s3Path})
if _, err := sendQuery(svcAthena, dbName, sql, region, account); err != nil {
return err
}
return nil
}
func doLog(logger *cwlogger.Logger, m string) {
if logger != nil {
logger.Log(time.Now(), m)
}
log.Println(m)
}
func main() {
/// initialize AWS GO client
sess := session.Must(session.NewSessionWithOptions(session.Options{SharedConfigState: session.SharedConfigEnable}))
// Grab instance meta-data
meta := getInstanceMetadata(sess)
// re-init session now we have the region we are in
sess = sess.Copy(&aws.Config{Region: aws.String(meta["region"].(string))})
// Check if running on EC2
_, ec2 := meta["instanceId"].(string)
var logger *cwlogger.Logger
if ec2 { // Init Cloudwatch Logger class if were running on EC2
logger, err := cwlogger.New(&cwlogger.Config{
LogGroupName: "CURdashboard",
Client: cloudwatchlogs.New(sess),
})
if err != nil {
log.Fatal("Could not initalize Cloudwatch logger: " + err.Error())
}
defer logger.Close()
logger.Log(time.Now(), "CURDasboard running on "+meta["instanceId"].(string)+" in "+meta["availabilityZone"].(string))
}
// read in command line params
var configFile, account, sourceBucket, destBucket, curReportName, curReportPath, curDestPath, dateOverride string
if err := getParams(&configFile, &sourceBucket, &destBucket, &account, &curReportName, &curReportPath, &curDestPath, &dateOverride); err != nil {
doLog(logger, err.Error())
return
}
// read in config file
var conf Config
if err := getConfig(&conf, configFile); err != nil {
doLog(logger, err.Error())
}
// convert CUR
columns, s3Path, curDate, err := processCUR(sourceBucket, curReportName, curReportPath, curDestPath, destBucket, logger, dateOverride)
if err != nil {
doLog(logger, err.Error())
}
// initialize Athena class
svcAthena := athena.New(sess)
svcCW := cloudwatch.New(sess)
// make sure Athena DB exists - dont care about results
if _, err := sendQuery(svcAthena, "default", conf.Athena.DbSQL, meta["region"].(string), account); err != nil {
doLog(logger, "Could not create Athena Database: "+err.Error())
}
// make sure current Athena table exists
if err := createAthenaTable(svcAthena, conf.Athena.DbName, conf.Athena.TablePrefix, conf.Athena.TableSQL, columns, s3Path, curDate, meta["region"].(string), account); err != nil {
doLog(logger, "Could not create Athena Table: "+err.Error())
}
// // If RI analysis enabled - do it
// if conf.RI.Enabled {
// if err := riUtilization(sess, svcAthena, conf, key, secret, meta["region"].(string), account, date); err != nil {
// doLog(logger, err.Error())
// }
// }
// struct for a query job
type job struct {
svc *athena.Athena
db string
account string
region string
interval string
metric Metric
}
// channels for parallel execution
jobs := make(chan job)
done := make(chan bool)
// create upto maxConcurrentQueries workers to process metric jobs
for w := 0; w < maxConcurrentQueries; w++ {
go func() {
for {
j, ok := <-jobs
if !ok {
done <- true
return
}
sql := substituteParams(j.metric.SQL, map[string]string{"**DBNAME**": conf.Athena.DbName, "**DATE**": curDate, "**INTERVAL**": conf.MetricConfig.Substring[j.interval]})
results, err := sendQuery(j.svc, j.db, sql, j.region, j.account)
if err != nil {
doLog(logger, "Error querying Athena, SQL: "+sql+" , Error: "+err.Error())
continue
}
if err := sendMetric(svcCW, results, conf.General.Namespace, j.metric.CwName, j.metric.CwType, j.metric.CwDimension, j.interval); err != nil {
doLog(logger, "Error sending metric, name: "+j.metric.CwName+" , Error: "+err.Error())
}
}
}()
}
// pass every enabled metric into channel for processing
for metric := range conf.Metrics {
if conf.Metrics[metric].Enabled {
if conf.Metrics[metric].Hourly {
jobs <- job{svcAthena, conf.Athena.DbName, account, meta["region"].(string), "hourly", conf.Metrics[metric]}
}
if conf.Metrics[metric].Daily {
jobs <- job{svcAthena, conf.Athena.DbName, account, meta["region"].(string), "daily", conf.Metrics[metric]}
}
}
}
close(jobs)
// wait for jobs to complete
for w := 0; w < maxConcurrentQueries; w++ {
<-done
}
}