Skip to content

Commit

Permalink
Merge pull request #11 from datasparq-ai/release/v0.6.1
Browse files Browse the repository at this point in the history
Release v0.6.1
  • Loading branch information
matt-sparq committed Nov 13, 2023
2 parents cd928fd + 030b640 commit 8830ee2
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 39 deletions.
38 changes: 18 additions & 20 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/datasparq-ai/houston/model"
"github.com/gorilla/mux"
"golang.org/x/crypto/acme/autocert"
"math/rand"
"net"
"net/http"
"os"
Expand All @@ -27,9 +26,9 @@ type API struct {
protocol string // this is set to either 'http' or 'https' depending on config.TLSConfig
}

// New creates the Houston API object.
// New creates an instance of the Houston API object.
// It will create or connect to a database depending on the settings in the config file.
// local db will only persist while program is running.
// local db will only persist while the program is running.
func New(configPath string) *API {
initLog()

Expand Down Expand Up @@ -379,14 +378,19 @@ func (a *API) updateActiveOrCompletedMissions(key string, databaseField string,

txnFunc := func(activeMissions string) (string, error) {
var newList []string
activeList := strings.Split(activeMissions, ",")

// remove missions to be removed
for _, activeMission := range activeList {
for _, missionToRemove := range missionsToRemove {
if activeMission != missionToRemove {
newList = append(newList, activeMission)
if activeMissions != "" {
activeList := strings.Split(activeMissions, ",")

// remove missions to be removed
Loop:
for _, activeMission := range activeList {
for _, missionToRemove := range missionsToRemove {
if activeMission == missionToRemove {
continue Loop
}
}
newList = append(newList, activeMission)
}
}

Expand Down Expand Up @@ -416,7 +420,6 @@ func (a *API) updateActiveOrCompletedMissions(key string, databaseField string,
}
}
return err

}

// ActiveMissions finds all missions for a plan. If plan doesn't exist then an empty list is returned.
Expand All @@ -425,7 +428,8 @@ func (a *API) ActiveMissions(key string, plan string) []string {
if missions == "" {
return []string{}
}
return strings.Split(missions, ",")
missionsList := strings.Split(missions, ",")
return missionsList
}

// AllActiveMissions finds all missions in the database for the key provided.
Expand Down Expand Up @@ -542,6 +546,7 @@ func (a *API) CompletedMissions(key string) []string {
}

// SavePlan stores a new plan in the database if that plan is valid. Current behaviour is to overwrite existing plans.
// The 'active' key for the plan, and any existing missions, will be unaffected if the plan already exists.
func (a *API) SavePlan(key string, plan model.Plan) error {

// convert plan to mission for validation of graph only
Expand All @@ -553,12 +558,8 @@ func (a *API) SavePlan(key string, plan model.Plan) error {

planBytes, _ := json.Marshal(plan)
keyLog.Infof("Converted Plan '%s' to Mission", plan.Name)
p, _ := a.db.Get(key, "p|"+plan.Name)
err = a.db.Set(key, "p|"+plan.Name, string(planBytes))
// if plan already exists, do not re-create the 'active' key
if p == "" {
err = a.db.Set(key, "a|"+plan.Name, "")
}

if err != nil {
keyLog.Errorf("Error when saving plan to database: %v", err)
log.Warnf("User %s encountered error when saving plan to database: %v", key, err)
Expand All @@ -580,6 +581,7 @@ func (a *API) ListPlans(key string) ([]string, error) {

keyLog.Debugf("Found %v saved plans", len(plans))

// find plans that aren't saved, but have missions associated
activePlans, err := a.db.List(key, "a|")
keyLog.Debugf("Found %v active plans", len(activePlans))

Expand Down Expand Up @@ -738,7 +740,3 @@ func (a *API) Run() {
}

}

func init() {
rand.Seed(time.Now().UnixNano()) // change random seed
}
8 changes: 1 addition & 7 deletions api/router-key.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,7 @@ func (a *API) GetKey(w http.ResponseWriter, r *http.Request) {
// @Router /api/v1/key [get]
func (a *API) GetKeyWebhook(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]

userAgent := r.UserAgent()
fmt.Printf("UserAgent:: %s", userAgent)

// key has been checked by checkKey middleware

key := vars["key"] // key has been checked by checkKey middleware
http.Redirect(w, r, fmt.Sprintf("/?key=%v", key), http.StatusMovedPermanently)
}

Expand Down
1 change: 0 additions & 1 deletion api/router-middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (a *API) checkKey(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
key := ""
// the webhook route does not require a key in the header, because it's in the path
fmt.Println(r.URL.Path)
if strings.HasPrefix(r.URL.Path, "/api/v1/key/") {
key = strings.TrimPrefix(r.URL.Path, "/api/v1/key/")
} else {
Expand Down
5 changes: 4 additions & 1 deletion api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (
"math/rand"
"net/http"
"strings"
"time"

"github.com/datasparq-ai/houston/mission"
"github.com/datasparq-ai/houston/model"
)

var random = rand.New(rand.NewSource(time.Now().UnixNano()))

// reservedKeys can't be used as mission names or keys
var reservedKeys = []string{"u", "n", "a", "c", "m", "all"}

Expand All @@ -25,7 +28,7 @@ var disallowedCharacters = "| ,\n\r\t%&<>{}[]\\?;\"'`:"
func createRandomString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
b[i] = letters[random.Intn(len(letters))]
}
return string(b)
}
Expand Down
14 changes: 11 additions & 3 deletions demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ import (
// demo creates a new API instance and runs demonstration missions
func demo(createCmd *cobra.Command) {
configPath, _ := createCmd.Flags().GetString("config")
stressTest, _ := createCmd.Flags().GetBool("stress-test")

missionsToRun := 30
var timeBetweenMissions float32 = 5000.0
if stressTest {
missionsToRun = 500
timeBetweenMissions = 5.0
}

s := "\u001B[1;38;2;58;145;172m"
e := "\u001B[0m"
Expand Down Expand Up @@ -95,16 +103,16 @@ func demo(createCmd *cobra.Command) {
go func() {

missionCount := 1
for missionCount < 30 {
time.Sleep(time.Duration(rand.Float32()*5000.0) * time.Millisecond)
for missionCount < missionsToRun {
time.Sleep(time.Duration(rand.Float32()*timeBetweenMissions) * time.Millisecond)
//fmt.Printf(">>> %[1]vhouston start%[2]v \u001B[1m-p apollo -i apollo-12%[2]v\n", s, e)
missionId := fmt.Sprintf("apollo-%v", 11+missionCount)
_, err = a.CreateMissionFromPlan("demo", "apollo", missionId)
if err != nil {
panic(err)
}

time.Sleep(time.Duration(rand.Float32()*1000.0) * time.Millisecond)
time.Sleep(time.Duration(rand.Float32()*timeBetweenMissions/5) * time.Millisecond)
a.UpdateStageState("demo", missionId, "engine-ignition", "started", false)
res, _ = a.UpdateStageState("demo", missionId, "engine-ignition", "finished", false)
go continueMission(a, "demo", missionId, res.Next)
Expand Down
2 changes: 1 addition & 1 deletion docs/database_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Redis Schema:
<api key>|p|<plan-name>: # plan, stored as JSON string, identical to mission without stage timings
name: "apollo" # plan name
stages: [] # list of stages
<api key>|a|<plan-name>: m1,m2,m3 # active, list of active mission IDs (strings) for a plan
<api key>|a|<plan-name>: m1,m2,m3 # active, list of mission IDs (strings) for a plan, which get removed when deleted
<api key>|<mission id>: # mission, stored as json string, made as small as possible
n: apollo # name (plan name)
i: <mission_id> # id
Expand Down
12 changes: 7 additions & 5 deletions docs/plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Houston plans are definitions of workflows. They are represented in YAML or JSON. Multiple plans can be associated with
one key.

Plans must be a valid directed acyclic graphs (DAG), meaning it can't have any loops, and all stages must be connected.

Plan definitions have the following attributes:
- name `string`: Name of the plan
- services `[]Service`: (optional) List of services used by the plan, see [Services](./services.md)
Expand All @@ -19,22 +21,22 @@ services:
trigger:
method: pubsub
topic: topic-for-stage
- name: my-other-service
trigger:
method: http
url: https://example.com/api/houston

stages:
- name: blastoff
service: my-service
params:
my_param: foo
- name: main-engine-cutoff
service: my-other-service
service: my-service
upstream:
- blastoff
```

This creates a plan with two stages connected in a straight line. 'blastoff' is upstream of 'main-engine-cutoff',
meaning 'blastoff' must finish before 'main-engine-cutoff' can start. 'blastoff' has no dependencies and so can start
at any time.

Plans should be 'saved' using the Houston client. Saved plans can be referenced by name when starting a mission.

## Stages
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
Use: "version",
Short: "Print the version number",
Run: func(c *cobra.Command, args []string) {
fmt.Println("v0.6.0")
fmt.Println("v0.6.1")
},
}
return
Expand Down Expand Up @@ -147,6 +147,7 @@ func main() {
},
}
createCmd.Flags().String("config", "", "path to a config file")
createCmd.Flags().Bool("stress-test", false, "activate stress test with 500+ missions running at the same time")
return
}())

Expand Down

0 comments on commit 8830ee2

Please sign in to comment.