-
Notifications
You must be signed in to change notification settings - Fork 4
/
api_purgedata.go
203 lines (194 loc) · 7.27 KB
/
api_purgedata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"time"
"github.com/artisoft-io/jetstore/jets/datatable"
"github.com/artisoft-io/jetstore/jets/user"
"github.com/jackc/pgx/v4"
"go.uber.org/zap"
)
type PurgeDataAction struct {
Action string `json:"action"`
WorkspaceName string `json:"workspaceName"`
RunUiDbInitScript bool `json:"run_ui_db_init_script"`
Data []map[string]interface{} `json:"data"`
}
func (pd *PurgeDataAction)getWorkspaceName() string {
if pd.WorkspaceName == "" {
return os.Getenv("WORKSPACE")
}
return pd.WorkspaceName
}
// DoPurgeDataAction ------------------------------------------------------
// Entry point function
func (server *Server) DoPurgeDataAction(w http.ResponseWriter, r *http.Request) {
var results *map[string]interface{}
var code int
body, err := io.ReadAll(r.Body)
if err != nil {
ERROR(w, http.StatusUnprocessableEntity, err)
return
}
token := user.ExtractToken(r)
user,_ := user.ExtractTokenID(token)
server.AuditLogger.Info(string(body), zap.String("user", user),zap.String("time", time.Now().Format(time.RFC3339)))
action := PurgeDataAction{}
err = json.Unmarshal(body, &action)
if err != nil {
ERROR(w, http.StatusUnprocessableEntity, err)
return
}
// Intercept specific dataTable action
switch action.Action {
case "reset_domain_tables":
results, code, err = server.ResetDomainTables(&action)
case "rerun_db_init":
results, code, err = server.RunWorkspaceDbInit(&action)
case "export_client_configuration":
results, code, err = server.ExportClientConfiguration(&action)
default:
code = http.StatusUnprocessableEntity
err = fmt.Errorf("DoPurgeDataAction: unknown action: %v", action.Action)
}
if err != nil {
log.Printf("Error: %v", err)
ERROR(w, code, err)
return
}
addToken(r, results)
JSON(w, http.StatusOK, results)
}
// ResetDomainTables ------------------------------------------------------
// Clear and rebuild all domain tables defined in workspace -- using update_db command line
// Delete all tables containing the input data, get the table name list from input_loader_status
// also clear/truncate the input_registry table
// Also migrate the system tables to latest schema and conditionally run the workspace db init script
func (server *Server) ResetDomainTables(purgeDataAction *PurgeDataAction) (*map[string]interface{}, int, error) {
// Delete the input staging tables, ignore error here since input_loader_status does not exist
// in initial deployment
stmt := "SELECT DISTINCT table_name FROM jetsapi.input_loader_status"
rows, err := server.dbpool.Query(context.Background(), stmt)
if err == nil {
defer rows.Close()
for rows.Next() {
// scan the row
var tableName string
if err = rows.Scan(&tableName); err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("while scaning staging tables: %v", err)
}
// Drop the table
stmt := fmt.Sprintf("DROP TABLE IF EXISTS %s", pgx.Identifier{"public", tableName}.Sanitize())
log.Println(stmt)
_, err := server.dbpool.Exec(context.Background(), stmt)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("while droping staging tables: %v", err)
}
}
}
// Clear and rebuild the domain table using the update_db command line
// Also migrate the system tables to latest schema
log.Println("Rebuild Domain Tables")
serverArgs := []string{ "-drop", "-migrateDb" }
if purgeDataAction.RunUiDbInitScript {
serverArgs = append(serverArgs, "-initWorkspaceDb")
}
if *usingSshTunnel {
serverArgs = append(serverArgs, "-usingSshTunnel")
}
_, err = datatable.RunUpdateDb(purgeDataAction.getWorkspaceName(), &serverArgs)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("while running updateDb: %v", err)
}
// Truncate the jetsapi.input_registry
stmt = fmt.Sprintf("TRUNCATE %s", pgx.Identifier{"jetsapi", "input_registry"}.Sanitize())
log.Println(stmt)
_, err = server.dbpool.Exec(context.Background(), stmt)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("while truncating input_registry table: %v", err)
}
// Truncate the jetsapi.session_registry
stmt = fmt.Sprintf("TRUNCATE %s", pgx.Identifier{"jetsapi", "session_registry"}.Sanitize())
log.Println(stmt)
_, err = server.dbpool.Exec(context.Background(), stmt)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("while truncating session_registry table: %v", err)
}
return &map[string]interface{}{}, http.StatusOK, nil
}
// RunWorkspaceDbInit ------------------------------------------------------
// Initialize jetstore database with workspace db init script
func (server *Server) RunWorkspaceDbInit(purgeDataAction *PurgeDataAction) (*map[string]interface{}, int, error) {
// using update_db script
log.Println("Running DB Initialization with jetsapi Schema Update Scripts Only")
serverArgs := []string{ "-initWorkspaceDb", "-migrateDb" }
if *usingSshTunnel {
serverArgs = append(serverArgs, "-usingSshTunnel")
}
if _, err := datatable.RunUpdateDb(purgeDataAction.WorkspaceName, &serverArgs); err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("while running updateDb command: %v", err)
}
return &map[string]interface{}{}, http.StatusOK, nil
}
// ExportClientConfiguration ------------------------------------------------------
// Export client configuration to jetstore bucket (depricated)
func (server *Server) ExportClientConfiguration(purgeDataAction *PurgeDataAction) (*map[string]interface{}, int, error) {
var client string
for irow := range purgeDataAction.Data {
// expecting client to be specified in the data section of the request
v := purgeDataAction.Data[irow]["client"]
if v != nil {
client = v.(string)
}
if client == "" {
return nil, http.StatusBadRequest, fmt.Errorf("client name required to export client configuration")
}
}
// using run_reports script
serverArgs := []string{
"-client",
client,
"-reportName",
"export_client_config",
"-filePath",
"workspace/exported_config",
"-originalFileName",
"notused",
}
if *usingSshTunnel {
serverArgs = append(serverArgs, "-usingSshTunnel")
}
log.Println("Exporting client configuration for client", client)
log.Printf("Executing run_reports: %s", serverArgs)
cmd := exec.Command("/usr/local/bin/run_reports", serverArgs...)
var b bytes.Buffer
cmd.Stdout = &b
cmd.Stderr = &b
err := cmd.Run()
if err != nil {
log.Printf("while executing run_reports command '%v': %v", serverArgs, err)
log.Println("=*=*=*=*=*=*=*=*=*=*=*=*=*=*")
log.Println("RUN REPORTS CAPTURED OUTPUT BEGIN")
log.Println("=*=*=*=*=*=*=*=*=*=*=*=*=*=*")
b.WriteTo(os.Stdout)
log.Println("=*=*=*=*=*=*=*=*=*=*=*=*=*=*")
log.Println("RUN REPORTS CAPTURED OUTPUT END")
log.Println("=*=*=*=*=*=*=*=*=*=*=*=*=*=*")
return nil, http.StatusInternalServerError, fmt.Errorf("while running run_reports command: %v", err)
}
log.Println("============================")
log.Println("RUN REPORTS CAPTURED OUTPUT BEGIN")
log.Println("============================")
b.WriteTo(os.Stdout)
log.Println("============================")
log.Println("RUN REPORTS CAPTURED OUTPUT END")
log.Println("============================")
return &map[string]interface{}{}, http.StatusOK, nil
}