From 7b3663204fc4887bdc68d0724ec492f1a06bc6e6 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Thu, 29 May 2025 03:03:42 +0000 Subject: [PATCH 01/62] feat: add modal for creating new workflows and update sidebar links --- .../common/modal_create_workflow.html | 577 ++++++++++++++++++ .../common/sidebar.tmpl.html | 10 +- .../common/workflow_list.tmpl.html | 13 +- .../akoflow_admin_handler_tmpl/home.tmpl.html | 1 + 4 files changed, 596 insertions(+), 5 deletions(-) create mode 100644 pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/modal_create_workflow.html diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/modal_create_workflow.html b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/modal_create_workflow.html new file mode 100644 index 0000000..4b88204 --- /dev/null +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/modal_create_workflow.html @@ -0,0 +1,577 @@ +{{ define "modal_create_workflow" }} + + + + + + +{{ end }} \ No newline at end of file diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html index 0328def..328a16c 100755 --- a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html @@ -26,26 +26,28 @@ + + + + {{end}} \ No newline at end of file diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/home.tmpl.html b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/home.tmpl.html index 4ac3340..3a5ad2d 100755 --- a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/home.tmpl.html +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/home.tmpl.html @@ -10,6 +10,7 @@
{{ template "workflow_list" }} + {{ template "modal_create_workflow"}} From 60498668d04beaceacca1102bb19783526738287 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 16:47:42 +0000 Subject: [PATCH 02/62] fix: update documentation links to point to the GitHub wiki --- .github/workflows/gh-akf-new-release.yaml | 6 +++--- README.md | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/gh-akf-new-release.yaml b/.github/workflows/gh-akf-new-release.yaml index d615c02..74ddf7d 100644 --- a/.github/workflows/gh-akf-new-release.yaml +++ b/.github/workflows/gh-akf-new-release.yaml @@ -129,13 +129,13 @@ jobs: Download the appropriate binary for your platform, make it executable, and add it to your PATH. ### Download Client - For more information, visit [AkôFlow Client documentation](https://akoflow.com/docs/). + For more information, visit [AkôFlow Client documentation](https://github.com/UFFeScience/akoflow/wiki/). ### Download Server - For more information, visit [AkôFlow Server documentation](https://akoflow.com/docs/). + For more information, visit [AkôFlow Server documentation](https://github.com/UFFeScience/akoflow/wiki/). ## Documentation - For more information, visit [AkôFlow documentation](https://akoflow.com/docs/). + For more information, visit [AkôFlow documentation](https://github.com/UFFeScience/akoflow/wiki/). draft: false prerelease: false files: | diff --git a/README.md b/README.md index cecee53..253688f 100755 --- a/README.md +++ b/README.md @@ -37,6 +37,6 @@ AkôFlow originated as a final undergraduate project and has since expanded with ## Documentation Full documentation is available at: -📘 https://akoflow.com/docs +📘 https://github.com/UFFeScience/akoflow/wiki ⸻ From 400c57dd73d04bd0fa71537ffda79ecff4244496 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 16:48:08 +0000 Subject: [PATCH 03/62] fix: update launch configuration and improve environment variable setup --- .vscode/launch.json | 6 +----- pkg/server/config/config.go | 4 +++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 1544457..bfbd4a1 100755 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,11 +6,7 @@ "type": "go", "request": "launch", "mode": "debug", - "program": "cmd/server/main.go", - "env": { - "K8S_API_SERVER_HOST": "localhost", - "K8S_API_SERVER_TOKEN": "dummy", - } + "program": "cmd/server/main.go" }, { "name": "Launch AkoFlow Client Run", diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index 7fd2aa3..38d8821 100755 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -45,7 +45,9 @@ func SetupEnv() { os.Setenv("K8S_API_SERVER_HOST", hostEnvByKube) } - os.Setenv("K8S_API_SERVER_TOKEN", tokenData) + if tokenData != "" { + os.Setenv("K8S_API_SERVER_TOKEN", tokenData) + } } From fa4cb76ecfae4bcd518638db3a4f1ee076e293e2 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 17:00:02 +0000 Subject: [PATCH 04/62] feat: implement node and workflow execution repositories, add schedule handler and templates --- pkg/server/config/app_container.go | 36 ++-- .../connector/connector_k8s/connector_k8s.go | 7 + .../connector_node_k8s/connector_node.go | 162 ++++++++++++++++++ .../database/model/node_metrics_model.go | 35 ++++ pkg/server/database/model/node_model.go | 37 ++++ pkg/server/database/model/runtime_model.go | 1 + pkg/server/database/model/schedule_model.go | 30 ++++ .../model/workflow_execution_model.go | 35 ++++ .../node_metrics_repository.go | 44 +++++ .../node_repository/node_repository.go | 134 +++++++++++++++ .../workflow_execution_repository.go | 57 ++++++ .../akoflow_admin_handler.go | 9 + .../common/sidebar.tmpl.html | 3 +- .../schedules.tmpl.html | 17 ++ pkg/server/engine/httpserver/httpserver.go | 1 + .../engine/scripts/monitor_disk_storage.sh | 16 ++ .../engine/scripts/monitor_files_storage.sh | 17 ++ .../health_check_runtime_k8s_service.go | 43 +++++ .../kubernetes_runtime_service.go | 8 +- .../make_k8s_activity_service.go | 2 +- 20 files changed, 679 insertions(+), 15 deletions(-) create mode 100644 pkg/server/connector/connector_k8s/connector_node_k8s/connector_node.go create mode 100644 pkg/server/database/model/node_metrics_model.go create mode 100644 pkg/server/database/model/node_model.go create mode 100644 pkg/server/database/model/schedule_model.go create mode 100644 pkg/server/database/model/workflow_execution_model.go create mode 100644 pkg/server/database/repository/node_metrics_repository/node_metrics_repository.go create mode 100644 pkg/server/database/repository/node_repository/node_repository.go create mode 100644 pkg/server/database/repository/workflow_execution_repository/workflow_execution_repository.go create mode 100644 pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html create mode 100644 pkg/server/engine/scripts/monitor_disk_storage.sh create mode 100644 pkg/server/engine/scripts/monitor_files_storage.sh diff --git a/pkg/server/config/app_container.go b/pkg/server/config/app_container.go index 43a4aa6..8a8d47c 100755 --- a/pkg/server/config/app_container.go +++ b/pkg/server/config/app_container.go @@ -14,8 +14,11 @@ import ( "github.com/ovvesley/akoflow/pkg/server/database/repository/activity_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/logs_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/metrics_repository" + "github.com/ovvesley/akoflow/pkg/server/database/repository/node_metrics_repository" + "github.com/ovvesley/akoflow/pkg/server/database/repository/node_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/runtime_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/storages_repository" + "github.com/ovvesley/akoflow/pkg/server/database/repository/workflow_execution_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/workflow_repository" ) @@ -37,12 +40,15 @@ type EnvVars struct { } type AppContainerRepository struct { - WorkflowRepository workflow_repository.IWorkflowRepository - ActivityRepository activity_repository.IActivityRepository - LogsRepository logs_repository.ILogsRepository - MetricsRepository metrics_repository.IMetricsRepository - StoragesRepository storages_repository.IStorageRepository - RuntimeRepository runtime_repository.IRuntimeRepository + WorkflowRepository workflow_repository.IWorkflowRepository + ActivityRepository activity_repository.IActivityRepository + LogsRepository logs_repository.ILogsRepository + MetricsRepository metrics_repository.IMetricsRepository + StoragesRepository storages_repository.IStorageRepository + RuntimeRepository runtime_repository.IRuntimeRepository + NodeRepository node_repository.INodeRepository + NodeMetricsRepository node_metrics_repository.INodeMetricsRepository + WorkflowExecutionRepository workflow_execution_repository.IWorkflowExecutionRepository } type AppContainerConnector struct { @@ -103,6 +109,9 @@ func MakeAppContainer() AppContainer { metricsRepository := metrics_repository.New() storagesRepository := storages_repository.New() runtimeRepository := runtime_repository.New() + nodeRepository := node_repository.New() + nodesMetricsRepository := node_metrics_repository.New() + workflowExecutionRepository := workflow_execution_repository.New() // create the Connector instances k8sConnector := connector_k8s.New() @@ -118,12 +127,15 @@ func MakeAppContainer() AppContainer { appContainer := AppContainer{ DefaultNamespace: DEFAULT_NAMESPACE, Repository: AppContainerRepository{ - WorkflowRepository: workflowRepository, - ActivityRepository: activityRepository, - LogsRepository: logsRepository, - MetricsRepository: metricsRepository, - StoragesRepository: storagesRepository, - RuntimeRepository: runtimeRepository, + WorkflowRepository: workflowRepository, + ActivityRepository: activityRepository, + LogsRepository: logsRepository, + MetricsRepository: metricsRepository, + StoragesRepository: storagesRepository, + RuntimeRepository: runtimeRepository, + NodeRepository: nodeRepository, + NodeMetricsRepository: nodesMetricsRepository, + WorkflowExecutionRepository: workflowExecutionRepository, }, Connector: AppContainerConnector{ K8sConnector: k8sConnector, diff --git a/pkg/server/connector/connector_k8s/connector_k8s.go b/pkg/server/connector/connector_k8s/connector_k8s.go index ed250d9..c11a5aa 100755 --- a/pkg/server/connector/connector_k8s/connector_k8s.go +++ b/pkg/server/connector/connector_k8s/connector_k8s.go @@ -11,6 +11,7 @@ import ( "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s/connector_job_k8s" "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s/connector_metrics_k8s" "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s/connector_namespace_k8s" + "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s/connector_node_k8s" "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s/connector_pod_k8s" "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s/connector_pvc_k8s" "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s/connector_role" @@ -82,6 +83,8 @@ type IConnector interface { // HealthCheck checks the health of the Kubernetes API. // API Endpoint: /healthz Healthz(*runtime_entity.Runtime) connector_healthz.IConnectorHealthz + + Nodes(*runtime_entity.Runtime) connector_node_k8s.IConnectorNodeK8s } func NewClient() *http.Client { @@ -154,3 +157,7 @@ func (c *Connector) StorageClass(r *runtime_entity.Runtime) connector_storage_cl func (c *Connector) Healthz(r *runtime_entity.Runtime) connector_healthz.IConnectorHealthz { return connector_healthz.New(r) } + +func (c *Connector) Nodes(r *runtime_entity.Runtime) connector_node_k8s.IConnectorNodeK8s { + return connector_node_k8s.New(r) +} diff --git a/pkg/server/connector/connector_k8s/connector_node_k8s/connector_node.go b/pkg/server/connector/connector_k8s/connector_node_k8s/connector_node.go new file mode 100644 index 0000000..61eb9f2 --- /dev/null +++ b/pkg/server/connector/connector_k8s/connector_node_k8s/connector_node.go @@ -0,0 +1,162 @@ +package connector_node_k8s + +import ( + "crypto/tls" + "encoding/json" + "net/http" + + "github.com/ovvesley/akoflow/pkg/server/entities/runtime_entity" +) + +type ConnectorNodeK8s struct { + client *http.Client + runtime *runtime_entity.Runtime +} + +type IConnectorNodeK8s interface { + ListNodes() ConnectorNodeK8sResponse +} + +func New(runtime *runtime_entity.Runtime) IConnectorNodeK8s { + return &ConnectorNodeK8s{ + client: newClient(), + runtime: runtime, + } +} + +func newClient() *http.Client { + return &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + } +} + +type Node struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + Status string `json:"status"` + CpuMax string `json:"cpu_max"` + MemoryMax string `json:"memory_max"` + NetworkMax string `json:"network_max"` + DiskMax string `json:"disk_max"` + OsImage string `json:"os_image"` + CreatedAt string `json:"created_at"` +} + +func (n Node) GetCpuMax() float64 { + cpuMax := n.CpuMax + if cpuMax == "" { + return 0.0 + } + var cpu float64 + if err := json.Unmarshal([]byte(cpuMax), &cpu); err != nil { + return 0.0 + } + return cpu +} + +func (n Node) GetNodeMemoryMax() float64 { + memoryMax := n.MemoryMax + if memoryMax == "" { + return 0.0 + } + if len(memoryMax) > 2 && memoryMax[len(memoryMax)-2:] == "Ki" { + memoryMax = memoryMax[:len(memoryMax)-2] + } + var memory float64 + if err := json.Unmarshal([]byte(memoryMax), &memory); err != nil { + return 0.0 + } + return memory / 1024.0 +} + +func (n Node) GetNodeNetworkMax() float64 { + networkMax := n.NetworkMax + if networkMax == "" { + return 0.0 + } + var network float64 + if err := json.Unmarshal([]byte(networkMax), &network); err != nil { + return 0.0 + } + return network +} + +type ConnectorNodeK8sResponse struct { + Success bool `json:"success"` + Message string `json:"message"` + Data []Node `json:"data,omitempty"` +} + +func (c ConnectorNodeK8s) ListNodes() ConnectorNodeK8sResponse { + host := c.runtime.GetMetadataApiServerHost() + + req, err := http.NewRequest("GET", "https://"+host+"/api/v1/nodes", nil) + + if err != nil { + return ConnectorNodeK8sResponse{ + Success: false, + Message: "Failed to create request: " + err.Error(), + } + } + + req.Header.Set("Authorization", "Bearer "+c.runtime.GetMetadataApiServerToken()) + + resp, err := c.client.Do(req) + if err != nil { + return ConnectorNodeK8sResponse{ + Success: false, + Message: "Failed to execute request: " + err.Error(), + } + } + + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return ConnectorNodeK8sResponse{ + Success: false, + Message: "Failed to list nodes, status code: " + resp.Status, + } + } + + var data any + + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return ConnectorNodeK8sResponse{ + Success: false, + Message: "Failed to decode response: " + err.Error(), + } + } + + var nodes []Node + for _, item := range data.(map[string]interface{})["items"].([]interface{}) { + name := item.(map[string]interface{})["metadata"].(map[string]interface{})["name"].(string) + memoryMax := item.(map[string]interface{})["status"].(map[string]interface{})["allocatable"].(map[string]interface{})["memory"] + cpuMax := item.(map[string]interface{})["status"].(map[string]interface{})["allocatable"].(map[string]interface{})["cpu"] + networkMax := item.(map[string]interface{})["status"].(map[string]interface{})["allocatable"].(map[string]interface{})["ephemeral-storage"] + DiskMax := item.(map[string]interface{})["status"].(map[string]interface{})["capacity"].(map[string]interface{})["ephemeral-storage"] + osImage := item.(map[string]interface{})["status"].(map[string]interface{})["nodeInfo"].(map[string]interface{})["osImage"].(string) + createdAt := item.(map[string]interface{})["metadata"].(map[string]interface{})["creationTimestamp"].(string) + + nodes = append(nodes, Node{ + Name: name, + Status: "Ready", // Assuming all nodes are ready, adjust as needed + CpuMax: cpuMax.(string), + MemoryMax: memoryMax.(string), + NetworkMax: networkMax.(string), + DiskMax: DiskMax.(string), + OsImage: osImage, + CreatedAt: createdAt, + }) + + } + + return ConnectorNodeK8sResponse{ + Success: true, + Message: "Nodes listed successfully", + Data: nodes, + } + +} diff --git a/pkg/server/database/model/node_metrics_model.go b/pkg/server/database/model/node_metrics_model.go new file mode 100644 index 0000000..679ce67 --- /dev/null +++ b/pkg/server/database/model/node_metrics_model.go @@ -0,0 +1,35 @@ +package model + +import "github.com/ovvesley/akoflow/pkg/server/database" + +type NodeMetrics struct { + ID int `db:"id" sql:"INTEGER PRIMARY KEY AUTOINCREMENT"` + NodeID int `db:"node_id"` + CpuUsage float64 `db:"cpu_usage"` + CpuMemory float64 `db:"cpu_memory"` + MemoryUsage float64 `db:"memory_usage"` + MemoryLimit float64 `db:"memory_limit"` + NetworkUsage float64 `db:"network_usage"` + NetworkLimit float64 `db:"network_limit"` + Timestamp string `db:"timestamp"` +} + +func (NodeMetrics) TableName() string { + return "nodes_metrics" +} + +func (n NodeMetrics) GetColumns() []string { + return database.GenericGetColumns(n) +} + +func (n NodeMetrics) GetPrimaryKey() string { + return database.GenericGetPrimaryKey(n) +} + +func (n NodeMetrics) GetClausulePrimaryKey() string { + return database.GenericGetClausulePrimaryKey(n) +} + +func (n NodeMetrics) GetColumnType(column string) string { + return database.GenericGetColumnType(n, column) +} diff --git a/pkg/server/database/model/node_model.go b/pkg/server/database/model/node_model.go new file mode 100644 index 0000000..c9daec8 --- /dev/null +++ b/pkg/server/database/model/node_model.go @@ -0,0 +1,37 @@ +package model + +import "github.com/ovvesley/akoflow/pkg/server/database" + +type Node struct { + Name string `db:"name" sql:"TEXT NOT NULL UNIQUE"` + Runtime string `db:"runtime" sql:"TEXT NOT NULL"` + Status int `db:"status" sql:"INTEGER NOT NULL"` + CPUUsage float64 `db:"cpu_usage"` + CPUMax float64 `db:"cpu_max"` + MemoryUsage float64 `db:"memory_usage"` + MemoryLimit float64 `db:"memory_limit"` + NetworkLimit float64 `db:"network_limit"` + NetworkUsage float64 `db:"network_usage"` + CreatedAt string `db:"created_at"` + UpdatedAt string `db:"updated_at"` +} + +func (Node) TableName() string { + return "nodes" +} + +func (n Node) GetColumns() []string { + return database.GenericGetColumns(n) +} + +func (n Node) GetPrimaryKey() string { + return database.GenericGetPrimaryKey(n) +} + +func (n Node) GetClausulePrimaryKey() string { + return database.GenericGetClausulePrimaryKey(n) +} + +func (n Node) GetColumnType(column string) string { + return database.GenericGetColumnType(n, column) +} diff --git a/pkg/server/database/model/runtime_model.go b/pkg/server/database/model/runtime_model.go index dee9027..dd37645 100644 --- a/pkg/server/database/model/runtime_model.go +++ b/pkg/server/database/model/runtime_model.go @@ -6,6 +6,7 @@ type Runtime struct { Name string `db:"name" sql:"TEXT PRIMARY KEY"` Status int `db:"status"` Metadata string `db:"metadata"` + MaxNodes int `db:"max_nodes"` CreatedAt string `db:"created_at"` UpdatedAt string `db:"updated_at"` DeletedAt string `db:"deleted_at"` diff --git a/pkg/server/database/model/schedule_model.go b/pkg/server/database/model/schedule_model.go new file mode 100644 index 0000000..affdce2 --- /dev/null +++ b/pkg/server/database/model/schedule_model.go @@ -0,0 +1,30 @@ +package model + +import "github.com/ovvesley/akoflow/pkg/server/database" + +type ScheduleModel struct { + ID int `db:"id" sql:"INTEGER PRIMARY KEY AUTOINCREMENT"` + Type string `db:"type"` + Code string `db:"code"` + Namespace string `db:"namespace"` +} + +func (ScheduleModel) TableName() string { + return "workflow_executions" +} + +func (w ScheduleModel) GetColumns() []string { + return database.GenericGetColumns(w) +} + +func (w ScheduleModel) GetPrimaryKey() string { + return database.GenericGetPrimaryKey(w) +} + +func (w ScheduleModel) GetClausulePrimaryKey() string { + return database.GenericGetClausulePrimaryKey(w) +} + +func (w ScheduleModel) GetColumnType(column string) string { + return database.GenericGetColumnType(w, column) +} diff --git a/pkg/server/database/model/workflow_execution_model.go b/pkg/server/database/model/workflow_execution_model.go new file mode 100644 index 0000000..a937327 --- /dev/null +++ b/pkg/server/database/model/workflow_execution_model.go @@ -0,0 +1,35 @@ +package model + +import "github.com/ovvesley/akoflow/pkg/server/database" + +type WorkflowExecution struct { + ID int `db:"id" sql:"INTEGER PRIMARY KEY AUTOINCREMENT"` + Namespace string `db:"namespace"` + Name string `db:"name"` + WorkflowID int `db:"workflow_id"` + Status string `db:"status"` + Runtime string `db:"runtime"` + CreatedAt string `db:"created_at"` + UpdatedAt string `db:"updated_at"` + DeletedAt string `db:"deleted_at"` +} + +func (WorkflowExecution) TableName() string { + return "workflow_executions" +} + +func (w WorkflowExecution) GetColumns() []string { + return database.GenericGetColumns(w) +} + +func (w WorkflowExecution) GetPrimaryKey() string { + return database.GenericGetPrimaryKey(w) +} + +func (w WorkflowExecution) GetClausulePrimaryKey() string { + return database.GenericGetClausulePrimaryKey(w) +} + +func (w WorkflowExecution) GetColumnType(column string) string { + return database.GenericGetColumnType(w, column) +} diff --git a/pkg/server/database/repository/node_metrics_repository/node_metrics_repository.go b/pkg/server/database/repository/node_metrics_repository/node_metrics_repository.go new file mode 100644 index 0000000..4e16b24 --- /dev/null +++ b/pkg/server/database/repository/node_metrics_repository/node_metrics_repository.go @@ -0,0 +1,44 @@ +package node_metrics_repository + +import ( + "github.com/ovvesley/akoflow/pkg/server/database/model" + "github.com/ovvesley/akoflow/pkg/server/database/repository" +) + +type NodeMetricsRepository struct { + tableName string +} + +const STATUS_READY = 1 +const STATUS_NOT_READY = 0 + +type INodeMetricsRepository interface { + CreateOrUpdate(name string, status int, metadata map[string]string) +} + +func New() INodeMetricsRepository { + + database := repository.Database{} + model := model.NodeMetrics{} + c := database.Connect() + err := repository.CreateOrVerifyTable(c, model) + if err != nil { + return nil + } + + err = c.Close() + if err != nil { + return nil + } + + return &NodeMetricsRepository{ + tableName: model.TableName(), + } +} + +func (nmr *NodeMetricsRepository) CreateOrUpdate(name string, status int, metadata map[string]string) { + // Implementation for creating or updating a node metrics entry + // This would typically involve inserting or updating the record in the database + // using the provided name, status, and metadata. + +} diff --git a/pkg/server/database/repository/node_repository/node_repository.go b/pkg/server/database/repository/node_repository/node_repository.go new file mode 100644 index 0000000..388d5dd --- /dev/null +++ b/pkg/server/database/repository/node_repository/node_repository.go @@ -0,0 +1,134 @@ +package node_repository + +import ( + "strings" + + "github.com/ovvesley/akoflow/pkg/server/database/model" + "github.com/ovvesley/akoflow/pkg/server/database/repository" +) + +type NodeRepository struct { + tableName string +} + +const STATUS_READY = 1 +const STATUS_NOT_READY = 0 + +type INodeRepository interface { + CreateOrUpdate(runtime string, node model.Node) error +} + +func New() INodeRepository { + + database := repository.Database{} + model := model.Node{} + c := database.Connect() + err := repository.CreateOrVerifyTable(c, model) + if err != nil { + return nil + } + + err = c.Close() + if err != nil { + return nil + } + + return &NodeRepository{ + tableName: model.TableName(), + } +} + +func (nr *NodeRepository) GetByName(name string) (*model.Node, error) { + database := repository.Database{} + c := database.Connect() + defer c.Close() + + var node model.Node + cols := node.GetColumns() + rows, err := c.Query("SELECT "+strings.Join(cols, ", ")+" FROM "+nr.tableName+" WHERE name = ?", name) + if err != nil { + return nil, err + } + defer rows.Close() + + if rows.Next() { + err = rows.Scan( + &node.Name, + &node.Runtime, + &node.Status, + &node.CPUUsage, + &node.CPUMax, + &node.MemoryUsage, + &node.MemoryLimit, + &node.NetworkLimit, + &node.NetworkUsage, + &node.CreatedAt, + &node.UpdatedAt, + ) + + if err != nil { + return nil, err + } + return &node, nil + } + + return nil, nil // Node not found + +} + +func (nr *NodeRepository) UpdateNode(node model.Node) error { + database := repository.Database{} + c := database.Connect() + defer c.Close() + + // Prepare the update statement + stmt, err := c.Prepare("UPDATE " + nr.tableName + " SET status = ?, cpu_usage = ?, cpu_max = ?, memory_usage = ?, memory_limit = ?, network_limit = ?, network_usage = ?, updated_at = datetime('now') WHERE name = ?") + if err != nil { + return err + } + defer stmt.Close() + + // Execute the update statement + _, err = stmt.Exec(node.Status, node.CPUUsage, node.CPUMax, node.MemoryUsage, node.MemoryLimit, node.NetworkLimit, node.NetworkUsage, node.Name) + if err != nil { + return err + } + + return nil +} + +// CreateOrUpdate creates or updates a node in the database. +// If the node already exists, it updates the existing record. +// If the node does not exist, it creates a new record. +// The function takes the node name, status, and metadata as parameters. +func (nr *NodeRepository) CreateOrUpdate(runtime string, node model.Node) error { + database := repository.Database{} + c := database.Connect() + defer c.Close() + + existingNode, err := nr.GetByName(node.Name) + if err != nil { + return err + } + + if existingNode != nil { + // Node exists, update it + node.Name = existingNode.Name // Ensure we update the correct record + return nr.UpdateNode(node) + } + + // Node does not exist, create it + stmt, err := c.Prepare("INSERT INTO " + nr.tableName + " (name, runtime, status, cpu_usage, cpu_max, memory_usage, memory_limit, network_limit, network_usage, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))") + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec(node.Name, node.Runtime, node.Status, node.CPUUsage, node.CPUMax, node.MemoryUsage, node.MemoryLimit, node.NetworkLimit, node.NetworkUsage) + if err != nil { + return err + } + + return nil + +} diff --git a/pkg/server/database/repository/workflow_execution_repository/workflow_execution_repository.go b/pkg/server/database/repository/workflow_execution_repository/workflow_execution_repository.go new file mode 100644 index 0000000..4946292 --- /dev/null +++ b/pkg/server/database/repository/workflow_execution_repository/workflow_execution_repository.go @@ -0,0 +1,57 @@ +package workflow_execution_repository + +import ( + "github.com/ovvesley/akoflow/pkg/server/database/model" + "github.com/ovvesley/akoflow/pkg/server/database/repository" +) + +type WorkflowExecutionRepository struct { + tableName string +} + +const STATUS_RUNNING = 1 +const STATUS_COMPLETED = 2 +const STATUS_FAILED = 3 +const STATUS_CANCELLED = 4 + +type IWorkflowExecutionRepository interface { + CreateOrUpdate(workflowID string, status int, metadata map[string]string) +} + +func New() IWorkflowExecutionRepository { + + database := repository.Database{} + model := model.WorkflowExecution{} + c := database.Connect() + err := repository.CreateOrVerifyTable(c, model) + if err != nil { + return nil + } + + err = c.Close() + if err != nil { + return nil + } + + return &WorkflowExecutionRepository{ + tableName: model.TableName(), + } +} + +func (wer *WorkflowExecutionRepository) CreateOrUpdate(workflowID string, status int, metadata map[string]string) { + // Implementation for creating or updating a workflow execution entry + // This would typically involve inserting or updating the record in the database + // using the provided workflowID, status, and metadata. + + // Example implementation (pseudo-code): + /* + c := repository.Database.Connect() + defer c.Close() + + query := fmt.Sprintf("INSERT INTO %s (workflow_id, status, metadata) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE status = ?, metadata = ?", wer.tableName) + _, err := c.Exec(query, workflowID, status, metadata, status, metadata) + if err != nil { + log.Error("Failed to create or update workflow execution:", err) + } + */ +} diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler.go b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler.go index cf6b0ab..d38554b 100755 --- a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler.go +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler.go @@ -46,3 +46,12 @@ func (h *AkoflowAdminHandler) Runtime(w http.ResponseWriter, r *http.Request) { return } } + +func (h *AkoflowAdminHandler) Schedule(w http.ResponseWriter, r *http.Request) { + scheduleTemplate := h.renderViewProvider.TemplateInstance("schedules.tmpl.html") + err := scheduleTemplate.Execute(w, map[string]interface{}{}) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html index 328a16c..3218a47 100755 --- a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/common/sidebar.tmpl.html @@ -26,11 +26,12 @@ + +{{end}} \ No newline at end of file diff --git a/pkg/server/engine/httpserver/httpserver.go b/pkg/server/engine/httpserver/httpserver.go index 3b997ea..bf61900 100755 --- a/pkg/server/engine/httpserver/httpserver.go +++ b/pkg/server/engine/httpserver/httpserver.go @@ -35,6 +35,7 @@ func StartServer() { http.HandleFunc("GET /akoflow-admin/", http_config.KernelHandler(akoflow_admin_handler.New().Home)) http.HandleFunc("GET /akoflow-admin/runtimes", http_config.KernelHandler(akoflow_admin_handler.New().Runtime)) + http.HandleFunc("GET /akoflow-admin/schedules", http_config.KernelHandler(akoflow_admin_handler.New().Schedule)) http.HandleFunc("GET /akoflow-admin/workflows/{namespace}/{workflowId}/", http_config.KernelHandler(akoflow_admin_handler.New().WorkflowDetail)) http.HandleFunc("GET /akoflow-api/workflows/", http_config.KernelHandler(workflow_api_handler.New().ListAllWorkflows)) diff --git a/pkg/server/engine/scripts/monitor_disk_storage.sh b/pkg/server/engine/scripts/monitor_disk_storage.sh new file mode 100644 index 0000000..1d75eeb --- /dev/null +++ b/pkg/server/engine/scripts/monitor_disk_storage.sh @@ -0,0 +1,16 @@ +df -h > /tmp/du_output.txt; +echo "Preparing to start request"; +body=$(cat /tmp/du_output.txt); +body_length=$(printf %s "$body" | wc -c); +echo "Start request"; +{ + echo -ne "POST /akoflow-server/internal/storage/` + path + `/?activityId=$ACTIVITY_ID HTTP/1.1\r\n"; + echo -ne "Host: $AKOFLOW_SERVER_SERVICE_SERVICE_HOST\r\n"; + echo -ne "Content-Type: text/plain\r\n"; + echo -ne "Content-Length: $body_length\r\n"; + echo -ne "Connection: close\r\n"; + echo -ne "\r\n"; + echo -ne "$body"; +} | nc $AKOFLOW_SERVER_SERVICE_SERVICE_HOST ` + port + `; + +echo "End request"; \ No newline at end of file diff --git a/pkg/server/engine/scripts/monitor_files_storage.sh b/pkg/server/engine/scripts/monitor_files_storage.sh new file mode 100644 index 0000000..0317a5a --- /dev/null +++ b/pkg/server/engine/scripts/monitor_files_storage.sh @@ -0,0 +1,17 @@ +ls -lR $ACTIVITY_MOUNT_PATH > /tmp/du_output.txt; +echo "Preparing to start request"; +body=$(cat /tmp/du_output.txt); +body_length=$(printf %s "$body" | wc -c); + +echo "Start request"; +{ + echo -ne "POST /akoflow-server/internal/storage/` + path + `/?activityId=$ACTIVITY_ID HTTP/1.1\r\n"; + echo -ne "Host: $AKOFLOW_SERVER_SERVICE_SERVICE_HOST\r\n"; + echo -ne "Content-Type: text/plain\r\n"; + echo -ne "Content-Length: $body_length\r\n"; + echo -ne "Connection: close\r\n"; + echo -ne "\r\n"; + echo -ne "$body"; +} | nc $AKOFLOW_SERVER_SERVICE_SERVICE_HOST ` + port + `; + +echo "End request"; diff --git a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/health_check_runtime_k8s_service.go b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/health_check_runtime_k8s_service.go index e2b4abd..78e2188 100644 --- a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/health_check_runtime_k8s_service.go +++ b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/health_check_runtime_k8s_service.go @@ -3,18 +3,22 @@ package kubernetes_runtime_service import ( "github.com/ovvesley/akoflow/pkg/server/config" "github.com/ovvesley/akoflow/pkg/server/connector/connector_k8s" + "github.com/ovvesley/akoflow/pkg/server/database/model" + "github.com/ovvesley/akoflow/pkg/server/database/repository/node_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/runtime_repository" ) type HealthCheckRuntimeK8sService struct { k8sConnector connector_k8s.IConnector runtimeRepository runtime_repository.IRuntimeRepository + nodeRepository node_repository.INodeRepository } func NewHealthCheckRuntimeK8sService() *HealthCheckRuntimeK8sService { return &HealthCheckRuntimeK8sService{ k8sConnector: config.App().Connector.K8sConnector, runtimeRepository: config.App().Repository.RuntimeRepository, + nodeRepository: config.App().Repository.NodeRepository, } } func (h *HealthCheckRuntimeK8sService) HealthCheck(runtime string) bool { @@ -39,3 +43,42 @@ func (h *HealthCheckRuntimeK8sService) HealthCheck(runtime string) bool { return true } + +func (h *HealthCheckRuntimeK8sService) DiscoverNode(runtime string) bool { + runtimeEntity, err := h.runtimeRepository.GetByName(runtime) + if err != nil { + config.App().Logger.Infof("WORKER: Runtime not found %s", runtime) + return false + } + + response := h.k8sConnector.Nodes(runtimeEntity).ListNodes() + + if !response.Success { + config.App().Logger.Infof("WORKER: Node discovery failed for runtime %s", runtime) + return false + } + + for _, node := range response.Data { + node := model.Node{ + Name: node.Name, + Runtime: runtime, + Status: node_repository.STATUS_READY, + CPUUsage: 0.0, // Assuming initial CPU usage is 0 + CPUMax: node.GetCpuMax(), + MemoryUsage: 0.0, // Assuming initial memory usage is 0 + MemoryLimit: node.GetNodeMemoryMax(), + NetworkLimit: node.GetNodeNetworkMax(), + NetworkUsage: 0.0, // Assuming initial network usage is 0 + } + err := h.nodeRepository.CreateOrUpdate(runtime, node) + if err != nil { + config.App().Logger.Error("WORKER: Failed to create or update node %s for runtime %s: %v", node.Name, runtime, err) + return false + } + + } + + config.App().Logger.Infof("WORKER: Node discovery successful for runtime %s", runtime) + return true + +} diff --git a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go index 891b578..acab08c 100755 --- a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go +++ b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/kubernetes_runtime_service.go @@ -53,5 +53,11 @@ func (k *KubernetesRuntimeService) GetLogs(wf workflow_entity.Workflow, wfa work } func (k *KubernetesRuntimeService) HealthCheck(runtime string) bool { - return NewHealthCheckRuntimeK8sService().HealthCheck(runtime) + helthCheck := NewHealthCheckRuntimeK8sService() + + helthCheck.HealthCheck(runtime) + helthCheck.DiscoverNode(runtime) + + return true + } diff --git a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go index b9631bc..4da1ef4 100755 --- a/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go +++ b/pkg/server/runtimes/kubernetes_runtime/kubernetes_runtime_service/make_k8s_activity_service.go @@ -91,7 +91,7 @@ func (m *MakeK8sActivityService) addCommandToMonitorFilesStorage(command string, func (m *MakeK8sActivityService) addCommandToMonitorDiskSpecStorage(command string, path string) string { port := m.getPortAkoFlowServer() - command += `df -h > /tmp/du_output.txt; echo "Preparing to start request"; body=$(cat /tmp/du_output.txt); body_length=$(printf %s "$body" | wc -c); echo "Start request"; { echo -ne "POST /akoflow-server/internal/storage/` + path + `/?activityId=$ACTIVITY_ID HTTP/1.1\r\n"; echo -ne "Host: $AKOFLOW_SERVER_SERVICE_SERVICE_HOST\r\n"; echo -ne "Content-Type: text/plain\r\n"; echo -ne "Content-Length: $body_length\r\n"; echo -ne "Connection: close\r\n"; echo -ne "\r\n"; echo -ne "$body"; } | nc $AKOFLOW_SERVER_SERVICE_SERVICE_HOST ` + port + `; echo "End request"; ` + command += `df -h > /tmp/du_output.txt; echo "Preparing to start request"; body=$(cat /tmp/du_output.txt); body_length=$(printf %s "$body" | wc -c); echo "Start request"; { echo -ne "POST /akoflow-server/internal/storage/` + path + `/?activityId=$ACTIVITY_ID HTTP/1.1\r\n"; echo -ne "Host: host.docker.internal\r\n"; echo -ne "Content-Type: text/plain\r\n"; echo -ne "Content-Length: $body_length\r\n"; echo -ne "Connection: close\r\n"; echo -ne "\r\n"; echo -ne "$body"; } | nc host.docker.internal ` + port + `; echo "End request"; ` return command } From fb6652be581edbc42e4fd82777ec367ee09a7862 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 17:01:21 +0000 Subject: [PATCH 05/62] fix: correct table name in ScheduleModel to match database schema --- pkg/server/database/model/schedule_model.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/database/model/schedule_model.go b/pkg/server/database/model/schedule_model.go index affdce2..c0ac730 100644 --- a/pkg/server/database/model/schedule_model.go +++ b/pkg/server/database/model/schedule_model.go @@ -10,7 +10,7 @@ type ScheduleModel struct { } func (ScheduleModel) TableName() string { - return "workflow_executions" + return "schedules" } func (w ScheduleModel) GetColumns() []string { From d964a77c7c5e3a89986dd8bb4f1f4b14804001fe Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:20 +0000 Subject: [PATCH 06/62] feat: add ApiScheduleType struct for schedule entity representation --- .../types/types_api/api_entities_schedule_entity.go | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 pkg/server/types/types_api/api_entities_schedule_entity.go diff --git a/pkg/server/types/types_api/api_entities_schedule_entity.go b/pkg/server/types/types_api/api_entities_schedule_entity.go new file mode 100644 index 0000000..a029ecf --- /dev/null +++ b/pkg/server/types/types_api/api_entities_schedule_entity.go @@ -0,0 +1,10 @@ +package types_api + +type ApiScheduleType struct { + ID int `json:"id"` + Type string `json:"type"` + Code string `json:"code"` + Name string `json:"name,omitempty"` // Optional field, not always present + CreatedAt string `json:"createdAt"` + UpdatedAt string `json:"updatedAt"` +} From 757dc8c71fd513811bbcd7b077c45337e6eeba05 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:24 +0000 Subject: [PATCH 07/62] feat: implement ListSchedulesApiService for retrieving all schedules --- .../list_schedules_api_service.go | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 pkg/server/services/list_schedules_api_service/list_schedules_api_service.go diff --git a/pkg/server/services/list_schedules_api_service/list_schedules_api_service.go b/pkg/server/services/list_schedules_api_service/list_schedules_api_service.go new file mode 100644 index 0000000..d4dd712 --- /dev/null +++ b/pkg/server/services/list_schedules_api_service/list_schedules_api_service.go @@ -0,0 +1,39 @@ +package list_schedules_api_service + +import ( + "github.com/ovvesley/akoflow/pkg/server/config" + "github.com/ovvesley/akoflow/pkg/server/database/repository/schedule_repository" + "github.com/ovvesley/akoflow/pkg/server/types/types_api" +) + +type ListSchedulesApiService struct { + scheduleRepository schedule_repository.IScheduleRepository +} + +func New() *ListSchedulesApiService { + return &ListSchedulesApiService{ + scheduleRepository: config.App().Repository.ScheduleRepository, + } +} + +func (h *ListSchedulesApiService) ListAllSchedules() ([]types_api.ApiScheduleType, error) { + schedulesEngine, err := h.scheduleRepository.ListAllSchedules() + + if err != nil { + return nil, err + } + + schedulesApi := make([]types_api.ApiScheduleType, 0, len(schedulesEngine)) + for _, schedule := range schedulesEngine { + schedulesApi = append(schedulesApi, types_api.ApiScheduleType{ + ID: schedule.ID, + Type: schedule.Type, + Code: schedule.Code, + Name: schedule.Name, + CreatedAt: "", + UpdatedAt: "", + }) + } + + return schedulesApi, nil +} From 997aa0c443ca7afa2e61c26ae3fed2a1925dc544 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:30 +0000 Subject: [PATCH 08/62] feat: implement GetScheduleApiService for retrieving schedule details by name --- .../get_schedule_api_service.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 pkg/server/services/get_schedule_api_service/get_schedule_api_service.go diff --git a/pkg/server/services/get_schedule_api_service/get_schedule_api_service.go b/pkg/server/services/get_schedule_api_service/get_schedule_api_service.go new file mode 100644 index 0000000..9adb590 --- /dev/null +++ b/pkg/server/services/get_schedule_api_service/get_schedule_api_service.go @@ -0,0 +1,36 @@ +package get_schedule_api_service + +import ( + "github.com/ovvesley/akoflow/pkg/server/config" + "github.com/ovvesley/akoflow/pkg/server/database/repository/schedule_repository" + "github.com/ovvesley/akoflow/pkg/server/types/types_api" +) + +type GetScheduleApiService struct { + scheduleRepository schedule_repository.IScheduleRepository +} + +func New() *GetScheduleApiService { + return &GetScheduleApiService{ + scheduleRepository: config.App().Repository.ScheduleRepository, + } +} + +func (h *GetScheduleApiService) GetScheduleByName(scheduleId string) (*types_api.ApiScheduleType, error) { + scheduleEngine, err := h.scheduleRepository.GetScheduleByName(scheduleId) + + if err != nil { + return nil, err + } + + scheduleApi := &types_api.ApiScheduleType{ + ID: scheduleEngine.ID, + Type: scheduleEngine.Type, + Code: scheduleEngine.Code, + Name: scheduleEngine.Name, + CreatedAt: "", + UpdatedAt: "", + } + + return scheduleApi, nil +} From 8aef5de128ab5358680b40b4e310c89472fc8714 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:36 +0000 Subject: [PATCH 09/62] feat: implement CreateScheduleApiService for creating new schedules --- .../create_schedule_api_service.go | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 pkg/server/services/create_schedule_api_service/create_schedule_api_service.go diff --git a/pkg/server/services/create_schedule_api_service/create_schedule_api_service.go b/pkg/server/services/create_schedule_api_service/create_schedule_api_service.go new file mode 100644 index 0000000..c9efee3 --- /dev/null +++ b/pkg/server/services/create_schedule_api_service/create_schedule_api_service.go @@ -0,0 +1,34 @@ +package create_schedule_api_service + +import ( + "github.com/ovvesley/akoflow/pkg/server/config" + "github.com/ovvesley/akoflow/pkg/server/database/repository/schedule_repository" + "github.com/ovvesley/akoflow/pkg/server/types/types_api" +) + +type CreateScheduleApiService struct { + scheduleRepository schedule_repository.IScheduleRepository +} + +func New() *CreateScheduleApiService { + return &CreateScheduleApiService{ + scheduleRepository: config.App().Repository.ScheduleRepository, + } +} + +func (h *CreateScheduleApiService) CreateSchedule(name string, scheduleType string, code string) (types_api.ApiScheduleType, error) { + + scheduleEngine, err := h.scheduleRepository.CreateSchedule(name, scheduleType, code) + if err != nil { + return types_api.ApiScheduleType{}, err + } + + return types_api.ApiScheduleType{ + ID: scheduleEngine.ID, + Type: scheduleEngine.Type, + Code: scheduleEngine.Code, + Name: scheduleEngine.Name, + CreatedAt: "", + UpdatedAt: "", + }, nil +} From b8fd73ae9c061d2e61d3914792da525270233b95 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:42 +0000 Subject: [PATCH 10/62] feat: add ScheduleEntity struct with associated methods for schedule management --- .../schedule_entity/schedule_entity.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 pkg/server/entities/schedule_entity/schedule_entity.go diff --git a/pkg/server/entities/schedule_entity/schedule_entity.go b/pkg/server/entities/schedule_entity/schedule_entity.go new file mode 100644 index 0000000..ae626d9 --- /dev/null +++ b/pkg/server/entities/schedule_entity/schedule_entity.go @@ -0,0 +1,36 @@ +package schedule_entity + +type ScheduleEntity struct { + ID int `json:"id"` + Type string `json:"type"` + Code string `json:"code"` + Name string `json:"name,omitempty"` // Optional field, not always present +} + +func (s ScheduleEntity) GetId() int { + return s.ID +} + +func (s ScheduleEntity) GetType() string { + return s.Type +} + +func (s ScheduleEntity) GetCode() string { + return s.Code +} + +func (s ScheduleEntity) GetName() string { + if s.Name == "" { + return "default" + } + return s.Name +} + +func New(schedule ScheduleEntity) ScheduleEntity { + return ScheduleEntity{ + ID: schedule.ID, + Type: schedule.Type, + Code: schedule.Code, + Name: schedule.Name, + } +} From a2faef7372998a27f88f157ccaac3f87e087e2ec Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:47 +0000 Subject: [PATCH 11/62] feat: implement ScheduleApiHandler with methods for schedule management --- .../schedule_api_handler.go | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 pkg/server/engine/httpserver/handlers/schedule_api_handler/schedule_api_handler.go diff --git a/pkg/server/engine/httpserver/handlers/schedule_api_handler/schedule_api_handler.go b/pkg/server/engine/httpserver/handlers/schedule_api_handler/schedule_api_handler.go new file mode 100644 index 0000000..d399b6b --- /dev/null +++ b/pkg/server/engine/httpserver/handlers/schedule_api_handler/schedule_api_handler.go @@ -0,0 +1,79 @@ +package schedule_api_handler + +import ( + "net/http" + + "github.com/ovvesley/akoflow/pkg/server/config" + "github.com/ovvesley/akoflow/pkg/server/services/create_schedule_api_service" + "github.com/ovvesley/akoflow/pkg/server/services/get_schedule_api_service" + "github.com/ovvesley/akoflow/pkg/server/services/list_schedules_api_service" +) + +type ScheduleApiHandler struct { + listApiSchedulesService *list_schedules_api_service.ListSchedulesApiService + createApiScheduleService *create_schedule_api_service.CreateScheduleApiService + getApiScheduleService *get_schedule_api_service.GetScheduleApiService +} + +func New() *ScheduleApiHandler { + return &ScheduleApiHandler{ + listApiSchedulesService: list_schedules_api_service.New(), + createApiScheduleService: create_schedule_api_service.New(), + getApiScheduleService: get_schedule_api_service.New(), + } +} + +func (h *ScheduleApiHandler) ListAllSchedules(w http.ResponseWriter, r *http.Request) { + schedules, err := h.listApiSchedulesService.ListAllSchedules() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + config.App().HttpHelper.WriteJson(w, schedules) +} + +type CreateScheduleApiRequest struct { + Name string `json:"name"` + Type string `json:"type"` + Code string `json:"code"` +} + +func (h *ScheduleApiHandler) CreateSchedule(w http.ResponseWriter, r *http.Request) { + + var request CreateScheduleApiRequest + if err := config.App().HttpHelper.ReadJson(r, &request); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if request.Name == "" || request.Type == "" || request.Code == "" { + http.Error(w, "Name, Type, and Code are required fields", http.StatusBadRequest) + return + } + + schedule, err := h.createApiScheduleService.CreateSchedule(request.Name, request.Type, request.Code) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + config.App().HttpHelper.WriteJson(w, schedule) +} + +func (h *ScheduleApiHandler) GetSchedule(w http.ResponseWriter, r *http.Request) { + scheduleName := config.App().HttpHelper.GetUrlParam(r, "scheduleName") + + schedule, err := h.getApiScheduleService.GetScheduleByName(scheduleName) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + config.App().HttpHelper.WriteJson(w, schedule) +} From 81eec9165cdbd7b6de713ad57a82e0800c5c0615 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:52 +0000 Subject: [PATCH 12/62] feat: enhance schedule management UI with editor modal and validation functionality --- .../schedules.tmpl.html | 93 ++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html index 5c16e86..e882402 100644 --- a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html @@ -1,17 +1,106 @@ {{template "base.html" .}} {{define "title"}}AkôFlow Runtimes{{end}} + {{define "content"}}
{{ template "header" dict "title" "Schedules" }} +
+

My Schedules

+ +
+
-

Hello World

+
+ + + + {{end}} \ No newline at end of file From 95d2f6d41597c218396b8c6b1a399f5611df2815 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:01:58 +0000 Subject: [PATCH 13/62] feat: add schedule API routes for listing and creating schedules --- pkg/server/engine/httpserver/httpserver.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/server/engine/httpserver/httpserver.go b/pkg/server/engine/httpserver/httpserver.go index bf61900..13f58fa 100755 --- a/pkg/server/engine/httpserver/httpserver.go +++ b/pkg/server/engine/httpserver/httpserver.go @@ -7,6 +7,7 @@ import ( "github.com/ovvesley/akoflow/pkg/server/engine/httpserver/handlers/internal_storage_handler" "github.com/ovvesley/akoflow/pkg/server/engine/httpserver/handlers/public_static_handler" "github.com/ovvesley/akoflow/pkg/server/engine/httpserver/handlers/runtime_api_handler" + "github.com/ovvesley/akoflow/pkg/server/engine/httpserver/handlers/schedule_api_handler" "github.com/ovvesley/akoflow/pkg/server/engine/httpserver/handlers/storage_databasedump_handler" "github.com/ovvesley/akoflow/pkg/server/engine/httpserver/handlers/workflow_api_handler" "github.com/ovvesley/akoflow/pkg/server/engine/httpserver/handlers/workflow_handler" @@ -45,6 +46,10 @@ func StartServer() { http.HandleFunc("GET /akoflow-api/workflows/{workflowId}/", http_config.KernelHandler(workflow_api_handler.New().GetWorkflow)) http.HandleFunc("GET /akoflow-api/runtimes/", http_config.KernelHandler(runtime_api_handler.New().ListAllRuntimes)) + http.HandleFunc("GET /akoflow-api/schedules/", http_config.KernelHandler(schedule_api_handler.New().ListAllSchedules)) + http.HandleFunc("POST /akoflow-api/schedules/", http_config.KernelHandler(schedule_api_handler.New().CreateSchedule)) + http.HandleFunc("GET /akoflow-api/schedules/{scheduleId}/", http_config.KernelHandler(schedule_api_handler.New().GetSchedule)) + //http.HandleFunc("GET /akoflow-api/workflows/{workflowId}/activities/", http_config.KernelHandler(workflow_api_handler.New().ListAllActivities)) //http.HandleFunc("GET /akoflow-api/workflows/{workflowId}/activities/{activityId}/", http_config.KernelHandler(workflow_api_handler.New().GetActivity)) //http.HandleFunc("GET /akoflow-api/workflows/{workflowId}/activities/{activityId}/logs/", http_config.KernelHandler(workflow_api_handler.New().ListAllLogs)) From 64c6d2d87d694f611377e6470205b759e8c7b09f Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:02:03 +0000 Subject: [PATCH 14/62] feat: implement ScheduleRepository with CRUD operations for schedules --- .../schedule_repository.go | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 pkg/server/database/repository/schedule_repository/schedule_repository.go diff --git a/pkg/server/database/repository/schedule_repository/schedule_repository.go b/pkg/server/database/repository/schedule_repository/schedule_repository.go new file mode 100644 index 0000000..de237cd --- /dev/null +++ b/pkg/server/database/repository/schedule_repository/schedule_repository.go @@ -0,0 +1,135 @@ +package schedule_repository + +import ( + "github.com/ovvesley/akoflow/pkg/server/database/model" + "github.com/ovvesley/akoflow/pkg/server/database/repository" + "github.com/ovvesley/akoflow/pkg/server/entities/schedule_entity" +) + +type IScheduleRepository interface { + ListAllSchedules() ([]schedule_entity.ScheduleEntity, error) + // GetScheduleById(id int) (schedule_entity.ScheduleEntity, error) + CreateSchedule(name string, scheduleType string, code string) (schedule_entity.ScheduleEntity, error) + GetScheduleByName(name string) (schedule_entity.ScheduleEntity, error) + // UpdateSchedule(schedule schedule_entity.ScheduleEntity) (schedule_entity.ScheduleEntity, error) + // DeleteSchedule(id int) error +} + +type ScheduleRepository struct { + tableName string +} + +var TableName = "schedules" + +func New() IScheduleRepository { + + database := repository.Database{} + c := database.Connect() + err := repository.CreateOrVerifyTable(c, model.ScheduleModel{}) + if err != nil { + return nil + } + + err = c.Close() + if err != nil { + return nil + } + + return &ScheduleRepository{ + tableName: TableName, + } +} + +func (r *ScheduleRepository) ListAllSchedules() ([]schedule_entity.ScheduleEntity, error) { + database := repository.Database{} + c := database.Connect() + + rows, err := c.Query("SELECT id, type, code, name FROM " + r.tableName) + if err != nil { + return nil, err + } + + var schedules []schedule_entity.ScheduleEntity + for rows.Next() { + var schedule model.ScheduleModel + err = rows.Scan(&schedule.ID, &schedule.Type, &schedule.Code, &schedule.Name) + if err != nil { + return nil, err + } + schedules = append(schedules, schedule_entity.ScheduleEntity{ + ID: schedule.ID, + Type: schedule.Type, + Code: schedule.Code, + Name: schedule.Name, + }) + } + + err = c.Close() + if err != nil { + return nil, err + } + + return schedules, nil +} + +func (r *ScheduleRepository) CreateSchedule(name string, scheduleType string, code string) (schedule_entity.ScheduleEntity, error) { + database := repository.Database{} + c := database.Connect() + + schedule := model.ScheduleModel{ + Type: scheduleType, + Code: code, + Name: name, + } + + query := "INSERT INTO " + r.tableName + " (type, code, name) VALUES (?, ?, ?)" + result, err := c.Exec(query, schedule.Type, schedule.Code, schedule.Name) + if err != nil { + return schedule_entity.ScheduleEntity{}, err + } + + id, err := result.LastInsertId() + if err != nil { + return schedule_entity.ScheduleEntity{}, err + } + + schedule.ID = int(id) + + err = c.Close() + if err != nil { + return schedule_entity.ScheduleEntity{}, err + } + + return schedule_entity.ScheduleEntity{ + ID: schedule.ID, + Type: schedule.Type, + Code: schedule.Code, + Name: schedule.Name, + }, nil +} + +func (r *ScheduleRepository) GetScheduleByName(name string) (schedule_entity.ScheduleEntity, error) { + database := repository.Database{} + c := database.Connect() + + query := "SELECT id, type, code, name FROM " + r.tableName + " WHERE name = ?" + row := c.QueryRow(query, name) + + var schedule model.ScheduleModel + err := row.Scan(&schedule.ID, &schedule.Type, &schedule.Code, &schedule.Name) + if err != nil { + return schedule_entity.ScheduleEntity{}, err + } + + err = c.Close() + if err != nil { + return schedule_entity.ScheduleEntity{}, err + } + + return schedule_entity.ScheduleEntity{ + ID: schedule.ID, + Type: schedule.Type, + Code: schedule.Code, + Name: schedule.Name, + }, nil +} From 0165d36b3b32beac392ba10330847c784c0b7e12 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:02:08 +0000 Subject: [PATCH 15/62] feat: update ScheduleModel to include Name field and remove Namespace --- pkg/server/database/model/schedule_model.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/server/database/model/schedule_model.go b/pkg/server/database/model/schedule_model.go index c0ac730..0ddf3dc 100644 --- a/pkg/server/database/model/schedule_model.go +++ b/pkg/server/database/model/schedule_model.go @@ -3,10 +3,10 @@ package model import "github.com/ovvesley/akoflow/pkg/server/database" type ScheduleModel struct { - ID int `db:"id" sql:"INTEGER PRIMARY KEY AUTOINCREMENT"` - Type string `db:"type"` - Code string `db:"code"` - Namespace string `db:"namespace"` + ID int `db:"id" sql:"INTEGER PRIMARY KEY AUTOINCREMENT"` + Type string `db:"type"` + Code string `db:"code"` + Name string `db:"name"` } func (ScheduleModel) TableName() string { From 8206d77d44a9c7e46be0fe2ae2fd418153109ccf Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:02:13 +0000 Subject: [PATCH 16/62] feat: add ReadJson function to handle JSON decoding with error handling --- pkg/server/config/http_helper/http_helper.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/server/config/http_helper/http_helper.go b/pkg/server/config/http_helper/http_helper.go index 7c49b95..462473d 100755 --- a/pkg/server/config/http_helper/http_helper.go +++ b/pkg/server/config/http_helper/http_helper.go @@ -67,3 +67,19 @@ func GetPatternFromRequest(r *http.Request) string { return patternStr } + +func ReadJson(r *http.Request, data interface{}) error { + if r.Body == nil { + return fmt.Errorf("request body is empty") + } + + decoder := json.NewDecoder(r.Body) + decoder.DisallowUnknownFields() // Prevents unknown fields from being included in the JSON + + err := decoder.Decode(data) + if err != nil { + return fmt.Errorf("error decoding JSON: %w", err) + } + + return nil +} From eac6776f7a26b73e612101e465129f0a43703351 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 20:02:58 +0000 Subject: [PATCH 17/62] feat: add ScheduleRepository to AppContainer and implement ReadJson in HttpHelper --- pkg/server/config/app_container.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/server/config/app_container.go b/pkg/server/config/app_container.go index 8a8d47c..045973c 100755 --- a/pkg/server/config/app_container.go +++ b/pkg/server/config/app_container.go @@ -17,6 +17,7 @@ import ( "github.com/ovvesley/akoflow/pkg/server/database/repository/node_metrics_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/node_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/runtime_repository" + "github.com/ovvesley/akoflow/pkg/server/database/repository/schedule_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/storages_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/workflow_execution_repository" "github.com/ovvesley/akoflow/pkg/server/database/repository/workflow_repository" @@ -49,6 +50,7 @@ type AppContainerRepository struct { NodeRepository node_repository.INodeRepository NodeMetricsRepository node_metrics_repository.INodeMetricsRepository WorkflowExecutionRepository workflow_execution_repository.IWorkflowExecutionRepository + ScheduleRepository schedule_repository.IScheduleRepository } type AppContainerConnector struct { @@ -64,6 +66,7 @@ type AppContainerTemplateRenderer struct { type AppContainerHttpHelper struct { WriteJson func(w http.ResponseWriter, data interface{}) GetUrlParam func(r *http.Request, key string) string + ReadJson func(r *http.Request, data interface{}) error } // GetEnvVars returns the environment variables as a map @@ -112,6 +115,7 @@ func MakeAppContainer() AppContainer { nodeRepository := node_repository.New() nodesMetricsRepository := node_metrics_repository.New() workflowExecutionRepository := workflow_execution_repository.New() + scheduleRepository := schedules_repository.New() // create the Connector instances k8sConnector := connector_k8s.New() @@ -136,6 +140,7 @@ func MakeAppContainer() AppContainer { NodeRepository: nodeRepository, NodeMetricsRepository: nodesMetricsRepository, WorkflowExecutionRepository: workflowExecutionRepository, + ScheduleRepository: scheduleRepository, }, Connector: AppContainerConnector{ K8sConnector: k8sConnector, @@ -148,6 +153,7 @@ func MakeAppContainer() AppContainer { HttpHelper: AppContainerHttpHelper{ WriteJson: http_helper.WriteJson, GetUrlParam: http_helper.GetUrlPathParam, + ReadJson: http_helper.ReadJson, }, Logger: logger, EnvVars: EnvVars{ From 3d5b72c10a442c2596c059aa43c8da6398c604b7 Mon Sep 17 00:00:00 2001 From: Wesley Ferreira Date: Sat, 12 Jul 2025 21:51:39 +0000 Subject: [PATCH 18/62] feat: update schedule repository initialization and enhance schedule name input handling in the admin template --- pkg/server/config/app_container.go | 2 +- pkg/server/database/model/schedule_model.go | 11 +- .../schedules.tmpl.html | 107 ++++++++++++++++-- 3 files changed, 107 insertions(+), 13 deletions(-) diff --git a/pkg/server/config/app_container.go b/pkg/server/config/app_container.go index 045973c..54d5fab 100755 --- a/pkg/server/config/app_container.go +++ b/pkg/server/config/app_container.go @@ -115,7 +115,7 @@ func MakeAppContainer() AppContainer { nodeRepository := node_repository.New() nodesMetricsRepository := node_metrics_repository.New() workflowExecutionRepository := workflow_execution_repository.New() - scheduleRepository := schedules_repository.New() + scheduleRepository := schedule_repository.New() // create the Connector instances k8sConnector := connector_k8s.New() diff --git a/pkg/server/database/model/schedule_model.go b/pkg/server/database/model/schedule_model.go index 0ddf3dc..acf6ad7 100644 --- a/pkg/server/database/model/schedule_model.go +++ b/pkg/server/database/model/schedule_model.go @@ -3,10 +3,13 @@ package model import "github.com/ovvesley/akoflow/pkg/server/database" type ScheduleModel struct { - ID int `db:"id" sql:"INTEGER PRIMARY KEY AUTOINCREMENT"` - Type string `db:"type"` - Code string `db:"code"` - Name string `db:"name"` + ID int `db:"id" sql:"INTEGER PRIMARY KEY AUTOINCREMENT"` + Type string `db:"type"` + Code string `db:"code"` + Name string `db:"name"` + PluginSoPath string `db:"plugin_so_path"` + CreatedAt string `db:"created_at"` + UpdatedAt string `db:"updated_at"` } func (ScheduleModel) TableName() string { diff --git a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html index e882402..b67ed89 100644 --- a/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html +++ b/pkg/server/engine/httpserver/handlers/akoflow_admin_handler/akoflow_admin_handler_tmpl/schedules.tmpl.html @@ -22,11 +22,12 @@

My Schedul