Skip to content

Commit

Permalink
Merge pull request #1283 from Shuffle/1.4.0
Browse files Browse the repository at this point in the history
1.3.1
  • Loading branch information
frikky committed Dec 7, 2023
2 parents e493d08 + acd0f1c commit d970aa8
Show file tree
Hide file tree
Showing 35 changed files with 3,739 additions and 1,732 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ SHUFFLE_SWARM_BRIDGE_DEFAULT_MTU=1500 # 1500 by default
# Used for auto-cleanup of containers. REALLY important at scale. Set to false to see all container info.
SHUFFLE_MEMCACHED=
SHUFFLE_CONTAINER_AUTO_CLEANUP=true
SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=3 # The amount of concurrent executions Orborus can handle. This is a soft limit, but it's recommended to keep it low.
SHUFFLE_ORBORUS_EXECUTION_CONCURRENCY=5 # The amount of concurrent executions Orborus can handle. This is a soft limit, but it's recommended to keep it low.
SHUFFLE_HEALTHCHECK_DISABLED=false
SHUFFLE_ELASTIC=true
SHUFFLE_LOGS_DISABLED=false
Expand All @@ -93,4 +93,4 @@ SHUFFLE_OPENSEARCH_PROXY=
SHUFFLE_OPENSEARCH_INDEX_PREFIX=
SHUFFLE_OPENSEARCH_SKIPSSL_VERIFY=true

DEBUG_MODE=false
DEBUG_MODE=false
6 changes: 3 additions & 3 deletions .github/workflows/dockerbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: dockerbuild
on:
push:
branches:
- main
- 1.4.0
paths:
- "**"
- "!.github/**"
Expand Down Expand Up @@ -77,9 +77,9 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache
tags: |
ghcr.io/shuffle/shuffle-${{ matrix.app }}:${{ matrix.version }}
ghcr.io/shuffle/shuffle-${{ matrix.app }}:latest
ghcr.io/shuffle/shuffle-${{ matrix.app }}:nightly
${{ secrets.DOCKERHUB_USERNAME }}/shuffle-${{ matrix.app }}:${{ matrix.version }}
${{ secrets.DOCKERHUB_USERNAME }}/shuffle-${{ matrix.app }}:latest
${{ secrets.DOCKERHUB_USERNAME }}/shuffle-${{ matrix.app }}:nightly
- name: Image digest
run: echo ${{ steps.docker_build.outputs.digest }}
248 changes: 177 additions & 71 deletions backend/app_sdk/app_base.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions backend/go-app/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ require (
github.com/gorilla/mux v1.8.0
github.com/h2non/filetype v1.1.3
github.com/satori/go.uuid v1.2.0
github.com/shuffle/shuffle-shared v0.4.88
golang.org/x/crypto v0.9.0
github.com/shuffle/shuffle-shared v0.5.30
golang.org/x/crypto v0.14.0
google.golang.org/api v0.125.0
google.golang.org/grpc v1.55.0
gopkg.in/src-d/go-git.v4 v4.13.1
Expand Down
16 changes: 16 additions & 0 deletions backend/go-app/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,22 @@ github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shuffle/shuffle-shared v0.4.66 h1:Aw4qOp0VsVJrRzW1sJhEy4OY4fRGlFErUD5+93RXL6g=
github.com/shuffle/shuffle-shared v0.4.66/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.4.80 h1:03OL+O8prwL9zq6Gnb9SRORPWi5+ThO0jPoxk+xctOo=
github.com/shuffle/shuffle-shared v0.4.80/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.4.95 h1:xr92/03/uQeJiDme9S8/vgF1KWyQgJ1KQXVE7nQMKis=
github.com/shuffle/shuffle-shared v0.4.95/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.4.96 h1:iaIB/HP9eKpw9DMMJZhSLDbKdHJt075kFYLHg9AaiiM=
github.com/shuffle/shuffle-shared v0.4.96/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.4.97 h1:1c8LdNteMykKNEV97vwP63oSP2tV/Uso3O4TC+oxdFQ=
github.com/shuffle/shuffle-shared v0.4.97/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.4.98 h1:pgsLdWUpxZ/q+eHpAjOCH9icOsmuO5u2olmirOldy5A=
github.com/shuffle/shuffle-shared v0.4.98/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.5.11 h1:Eqbs9o8E49QAL5/6aV6BfFtWSjLIvgET7AL3fa4OQTg=
github.com/shuffle/shuffle-shared v0.5.11/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.5.14 h1:d14u1e4k+qKgnf4Insq4x2S+0MMKlDqdyTTyVP3puRA=
github.com/shuffle/shuffle-shared v0.5.14/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shuffle/shuffle-shared v0.5.29 h1:n4vThl7v3mFVXbrIW71XREFdmZZo7mOBAWxnsdiNjDk=
github.com/shuffle/shuffle-shared v0.5.29/go.mod h1:X613gbo0dT3fnYvXDRwjQZyLC+T49T2nSQOrCV5QMlI=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
Expand Down
38 changes: 28 additions & 10 deletions backend/go-app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1965,6 +1965,7 @@ func executeCloudAction(action shuffle.CloudSyncJob, apikey string) error {
return err
}

defer newresp.Body.Close()
respBody, err := ioutil.ReadAll(newresp.Body)
if err != nil {
return err
Expand Down Expand Up @@ -3513,15 +3514,13 @@ func remoteOrgJobHandler(org shuffle.Org, interval int) error {
)

req.Header.Add("Authorization", fmt.Sprintf(`Bearer %s`, org.SyncConfig.Apikey))

//log.Printf("[INFO] Sending org sync with autho %s", org.SyncConfig.Apikey)

newresp, err := client.Do(req)
if err != nil {
//log.Printf("Failed request in org sync: %s", err)
return err
}

defer newresp.Body.Close()
respBody, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[ERROR] Failed body read in job sync: %s", err)
Expand Down Expand Up @@ -3574,6 +3573,12 @@ func runInitEs(ctx context.Context) {
log.Printf("[DEBUG] Getting organizations for Elasticsearch/Opensearch")
activeOrgs, err := shuffle.GetAllOrgs(ctx)

log.Printf("[DEBUG] Got %d organizations to look into. If this is 0, we wait 10 more seconds until DB is ready and try again.", len(activeOrgs))
if len(activeOrgs) == 0 {
time.Sleep(10 * time.Second)
activeOrgs, err = shuffle.GetAllOrgs(ctx)
}

setUsers := false
_ = setUsers
if err != nil {
Expand Down Expand Up @@ -3655,7 +3660,6 @@ func runInitEs(ctx context.Context) {
log.Printf("Successfully updated org to have users!")
}
}

}
}
}
Expand Down Expand Up @@ -3688,6 +3692,10 @@ func runInitEs(ctx context.Context) {
orgId = activeOrgs[0].Id
}

if len(schedule.Org) == 36 {
orgId = schedule.Org
}

_, _, err := handleExecution(schedule.WorkflowId, shuffle.Workflow{}, request, orgId)
if err != nil {
log.Printf("[WARNING] Failed to execute %s: %s", schedule.WorkflowId, err)
Expand All @@ -3697,15 +3705,21 @@ func runInitEs(ctx context.Context) {

for _, schedule := range schedules {
if strings.ToLower(schedule.Environment) == "cloud" {
log.Printf("Skipping cloud schedule")
log.Printf("[DEBUG] Skipping cloud schedule")
continue
}

// FIXME: Add a randomized timer to avoid all schedules running at the same time
// Many are at 5 minutes / 1 hour. The point is to spread these out
// a bit instead of all of them starting at the exact same time

//log.Printf("Schedule: %#v", schedule)
//log.Printf("Schedule time: every %d seconds", schedule.Seconds)
jobret, err := newscheduler.Every(schedule.Seconds).Seconds().NotImmediately().Run(job(schedule))
if err != nil {
log.Printf("Failed to schedule workflow: %s", err)
log.Printf("[ERROR] Failed to start schedule for workflow %s: %s", schedule.WorkflowId, err)
} else {
log.Printf("[DEBUG] Successfully started schedule for workflow %s", schedule.WorkflowId)
}

scheduledJobs[schedule.Id] = jobret
Expand Down Expand Up @@ -3977,7 +3991,7 @@ func runInitEs(ctx context.Context) {

r, err := git.Clone(storer, fs, cloneOptions)
if err != nil {
log.Printf("[WARNING] Failed loading repo into memory (init): %s", err)
log.Printf("[ERROR] Failed loading repo into memory (init): %s", err)
}

dir, err := fs.ReadDir("")
Expand Down Expand Up @@ -4014,7 +4028,7 @@ func runInitEs(ctx context.Context) {
}
_, err = git.Clone(storer, fs, cloneOptions)
if err != nil {
log.Printf("[WARNING] Failed loading repo %s into memory: %s", apis, err)
log.Printf("[ERROR] Failed loading repo %s into memory: %s", apis, err)
} else if err == nil && len(workflowapps) < 10 {
log.Printf("[INFO] Finished git clone. Looking for updates to the repo.")
dir, err := fs.ReadDir("")
Expand All @@ -4030,7 +4044,7 @@ func runInitEs(ctx context.Context) {


if os.Getenv("SHUFFLE_HEALTHCHECK_DISABLED") != "true" {
healthcheckInterval := 15
healthcheckInterval := 30
log.Printf("[INFO] Starting healthcheck job every %d minute. Stats available on /api/v1/health/stats. Disable with SHUFFLE_HEALTHCHECK_DISABLED=true", healthcheckInterval)
job := func() {
// Prepare a fake http.responsewriter
Expand Down Expand Up @@ -4725,7 +4739,7 @@ func initHandlers() {
log.Printf("[DEBUG] Initialized Shuffle database connection. Setting up environment.")

if elasticConfig == "elasticsearch" {
time.Sleep(5 * time.Second)
time.Sleep(10 * time.Second)
go runInitEs(ctx)
} else {
//go shuffle.runInit(ctx)
Expand Down Expand Up @@ -4787,6 +4801,7 @@ func initHandlers() {
// App specific
// From here down isnt checked for org specific
r.HandleFunc("/api/v1/apps/{key}/execute", executeSingleAction).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/apps/{key}/run", executeSingleAction).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/apps/categories", shuffle.GetActiveCategories).Methods("GET", "OPTIONS")
r.HandleFunc("/api/v1/apps/categories/run", shuffle.RunCategoryAction).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/apps/upload", handleAppZipUpload).Methods("POST", "OPTIONS")
Expand Down Expand Up @@ -4824,11 +4839,14 @@ func initHandlers() {
/* Everything below here increases the counters*/
r.HandleFunc("/api/v1/workflows", shuffle.GetWorkflows).Methods("GET", "OPTIONS")
r.HandleFunc("/api/v1/workflows", shuffle.SetNewWorkflow).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/workflows/search", shuffle.HandleWorkflowRunSearch).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/workflows/schedules", shuffle.HandleGetSchedules).Methods("GET", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/executions", shuffle.GetWorkflowExecutions).Methods("GET", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/executions/{key}/rerun", checkUnfinishedExecution).Methods("GET", "POST", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/executions/{key}/abort", shuffle.AbortExecution).Methods("GET", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/schedule", scheduleWorkflow).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/workflows/download_remote", loadSpecificWorkflows).Methods("POST", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/run", executeWorkflow).Methods("GET", "POST", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/execute", executeWorkflow).Methods("GET", "POST", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/schedule/{schedule}", stopSchedule).Methods("DELETE", "OPTIONS")
r.HandleFunc("/api/v1/workflows/{key}/stream", shuffle.HandleStreamWorkflow).Methods("GET", "OPTIONS")
Expand Down

0 comments on commit d970aa8

Please sign in to comment.