Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding activity s3Latest #6

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ A collection of AWS related Flogo contributions. This repository consists of ac
* [lambda](activity/lambda): Invoke Lambda function
* [sms](activity/sms): Send SMS message via SNS
* [sns](activity/sns): Send message via SNS
* [s3Latest](activity/s3Latest): Check s3 for a newer version of a local file

### Triggers
* [lambda](trigger/lambda): Trigger to run Flogo as AWS Lambda function
Expand Down
37 changes: 37 additions & 0 deletions activity/s3Latest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# AWS s3Latest
This activity allows you to check if an s3 file is newer than a local file and if so pull it down. The activity returns the name of the file that is newest (whether it is the original local file or the downloaded file).

## Installation

### Flogo CLI
```bash
flogo install github.com/project-flogo/aws-contrib/activity/s3Latest
```
## Configuration
To configure AWS credentials see [configuring-sdk](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html). Some of the possible options arefor the AWS credentials are either [env variables](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html) or [set up the aws cli environment](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html)

### Settings:
| Name | Type | Description
|:--- | :--- | :---

### Input:
| Name | Type | Description
|:--- | :--- | :---
| subject | string | The message subject
| message | any | The message, either a string, object or params
| Bucket |string | AWS S3 bucket
| Item |string | AWS item/prefix to check
| File2Check|string | local file/path to check
| Region |string | AWS Region
| CheckLocal|string | either 'file' or'dir' based on what is being checked locally
| CheckS3 |string | either 'item' or'prefix' depending on whether s3 is checking a specific item or a prefix

### Output:
| Name | Type | Description
|:--- | :--- | :---
| modelFile | string | the file that is newest


## Examples
Coming soon...

210 changes: 210 additions & 0 deletions activity/s3Latest/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package s3newestmodel

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"

"io/ioutil"

"fmt"
"os"
"time"

// "strings"

"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data/metadata"
)

func init() {
activity.Register(&Activity{}, New) //activity.Register(&Activity{}, New) to create instances using factory method 'New'
}

var activityMd = activity.ToMetadata(&Settings{}, &Input{}, &Output{})

//New optional factory method, should be used if one activity instance per configuration is desired
func New(ctx activity.InitContext) (activity.Activity, error) {
s := &Settings{}
err := metadata.MapToStruct(ctx.Settings(), s, true)
if err != nil {
return nil, err
}

// ctx.Logger().Debugf("Setting: %s, %s", s.ReplaceFile, s.UnZip)
// fmt.Println(s)
act := &Activity{settings: s} //add aSetting to instance

return act, nil
}

// Activity is an sample Activity that can be used as a base to create a custom activity
type Activity struct {
settings *Settings
}

// Metadata returns the activity's metadata
func (a *Activity) Metadata() *activity.Metadata {
return activityMd
}

// Eval implements api.Activity.Eval - Logs the Message
func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {

//Reading inputs
bucket := ctx.GetInput("bucket").(string)
item := ctx.GetInput("item").(string) // FYI - Item is the input, key is the output
f2check := ctx.GetInput("file2Check").(string)
region := ctx.GetInput("region").(string) // "us-east-1"
checks3 := ctx.GetInput("checkS3").(string)
checklocal := ctx.GetInput("checkLocal").(string)

// defining activity wide variables (str to be returned, do we download a file, mod time of file, etc)
var rtnfilestr string
download := false
var modifiedtime time.Time
timestamp := fmt.Sprintf("%d", time.Now().Unix())
filein := f2check
fileout := f2check

//Checking is local is looking at newsest in a dir or just at one file
if checklocal == "dir" {
ctx.Logger().Infof("checklocal is '%s' so we are checking the newest file in %s against s3", checklocal, f2check)
} else {
ctx.Logger().Infof("checklocal is '%s' so we are checking %s against s3", checklocal, f2check)
}

//Checking is local is looking at newsest in with a prefix or just at one item
if checks3 == "prefix" {
ctx.Logger().Infof("checks3 is '%s' so we are comparing the newest item with the prefix %s against local", checks3, item)
} else {
ctx.Logger().Infof("checks3 is '%s' so we are comparing %s against local", checks3, item)
}

// looking at local dir for newest file, if no files in directory defining an in/out file to fit with the file check below
if checklocal == "dir" {
// fileout=file+
files, err := ioutil.ReadDir((filein))
if err != nil {
return false, fmt.Errorf("Unable to read files in directory %s", f2check)
}
for _, f := range files {
t := f.ModTime()
if t.After(modifiedtime) {
modifiedtime = t
filein = filein + "/" + f.Name()
fileout = fileout + "/" + timestamp
}
}
if len(files) == 0 {
ctx.Logger().Infof("no file in %s", filein)
fileout = fileout + "/" + timestamp
filein = fileout
}
}

// checking if file exists and then chenging download at modtime appropriately
if file, err := os.Stat(filein); os.IsNotExist(err) {
ctx.Logger().Infof("%s does not exist, must download new file", filein)
download = true
} else {
loc, _ := time.LoadLocation("UTC")
modifiedtime = file.ModTime().In(loc)
ctx.Logger().Infof("%s was last modified at %s", filein, modifiedtime)
}

//setting up s3 env
sess, err := session.NewSession(&aws.Config{Region: aws.String(region)})
if err != nil {
return true, fmt.Errorf("Unable to start aws session, %v", err)
}
svc := s3.New(sess)

// checking if we are looking at s3 prefix or s3 item and checking times
// FYI - Item is the input, key is the output
var key string
if checks3 == "item" {

input := &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(item),
}

s3item, err := svc.GetObject(input)
if err != nil {
return false, fmt.Errorf("Unable to get info on item %s: %s", item, err)
}

t := *s3item.LastModified
if t.After(modifiedtime) {
key = item
modifiedtime = t
download = true
}

} else if checks3 == "prefix" {

params := &s3.ListObjectsV2Input{
Bucket: aws.String("flogo-ml"),
Prefix: aws.String(item),
}

resp, err := svc.ListObjectsV2(params)
if err != nil {
return true, fmt.Errorf("Unable to list items in bucket %q, %v", bucket, err)
}

for _, listitem := range resp.Contents {
t := *listitem.LastModified
k := *listitem.Key
if t.After(modifiedtime) && k[len(k)-1:] != "/" {
key = k
modifiedtime = t
download = true
}
}
}

// downloading file or not
if download {
ctx.Logger().Infof("downloading %s modified at %s", key, modifiedtime)

file, err := os.Create(fileout)

if err != nil {
return false, fmt.Errorf("Unable to download item %q, %v", item, err)
}

downloader := s3manager.NewDownloader(sess)
_, err = downloader.Download(file,
&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return false, fmt.Errorf("Unable to download item %q, %v", item, err)
}

ctx.Logger().Infof("Sucessfully downladed %s as %s", key, fileout)
rtnfilestr = fileout
} else {
ctx.Logger().Infof("Newer file not found, no need to download file.")
// fileout = f2check
if checklocal == "file" {
rtnfilestr = filein
} else {
rtnfilestr = "noFile"
}

}

// returning string of file that is newest (either downloaded or if it is already newest)
output := &Output{ModelFile: rtnfilestr}
err = ctx.SetOutputObject(output)
if err != nil {
return true, err
}

return true, nil
}
51 changes: 51 additions & 0 deletions activity/s3Latest/activity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package s3newestmodel

import (
"testing"

"github.com/project-flogo/core/activity"
"github.com/stretchr/testify/assert"
)

func TestRegister(t *testing.T) {

ref := activity.GetRef(&Activity{})
act := activity.Get(ref)

assert.NotNil(t, act)
}

func TestEval(t *testing.T) {
// settings := &Settings{ReplaceFile: true, UnZip: false}
// iCtx := test.NewActivityInitContext(settings, nil)
// act, err := New(iCtx)

// tc := test.NewActivityContext(act.Metadata())
// var p []interface{}
// p = append(p, 0)
// input := &Input{
// Bucket: "flogo-ml",
// // Item: "model_tests/Archive_20190315.zip",
// File2Check: "key.zip",
// CheckLocal: "file",
// // CheckS3: "item",
// Item: "model_tests/",
// // File2Check: "model_dir",
// // CheckLocal: "dir",
// CheckS3: "prefix",

// Region: "us-east-1",
// }
// err = tc.SetInputObject(input)
// assert.Nil(t, err)

// done, err := act.Eval(tc)
// assert.True(t, done)
// assert.Nil(t, err)

// output := &Output{}
// tc.GetOutputObject(output)
// fmt.Println("name of newest file is returned:", output.ModelFile)
// // assert.Nil(t, err)
// // assert.Equal(t, "data has been inserted into database", output.Output)
}
46 changes: 46 additions & 0 deletions activity/s3Latest/descriptor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"name": "sample-activity",
"type": "flogo:activity",
"version": "0.0.1",
"title": "Sample Activity",
"description": "Sample Activity",
"homepage": "https://github.com/project-flogo/tree/master/examples/activity",
"settings": [
{
"name": "driverName",
"type": "string",
"required": true
},
{
"name": "psqlInfo",
"type": "string",
"required": true
}
],
"input": [
{
"name": "ind",
"type": "integer",
"required": true
},
{
"name": "act",
"type": "integer",
"required": true
}, {
"name": "pred",
"type": "array",
"required": true
}, {
"name": "t",
"type": "integer",
"required": true
}
],
"output": [
{
"name": "Output",
"type": "string"
}
]
}
12 changes: 12 additions & 0 deletions activity/s3Latest/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/abramvandergeest/outlierdatalogging

go 1.11

require (
github.com/aws/aws-sdk-go v1.19.41
github.com/lib/pq v1.0.0
github.com/project-flogo/core v0.9.0-beta.1
github.com/stretchr/testify v1.3.0
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
)
22 changes: 22 additions & 0 deletions activity/s3Latest/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
github.com/aws/aws-sdk-go v1.19.41 h1:veutzvQP/lOmYmtX26S9mTFJLO6sp7/UsxFcCjglu4A=
github.com/aws/aws-sdk-go v1.19.41/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-flogo/core v0.9.0-beta.1 h1:tiRv5Lv6U1SnDJh6vB10y8AnEdF8/Zmahj8WgCDqS6I=
github.com/project-flogo/core v0.9.0-beta.1/go.mod h1:dzmBbQfNNC0g0KClKYQxxGJLe53MHafg75Vhmw2TW8U=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=