/
datastore_export_job_check_api.go
133 lines (118 loc) · 4.09 KB
/
datastore_export_job_check_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/gcpug/ds2bq/datastore"
"github.com/morikuni/failure"
)
type DatastoreExportJobCheckRequest struct {
DS2BQJobID string
DatastoreExportJobID string
}
func HandleDatastoreExportJobCheckAPI(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
msg := fmt.Sprintf("failed ioutil.Read(request.Body).err=%+v", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
return
}
form := &DatastoreExportJobCheckRequest{}
if err := json.Unmarshal(b, form); err != nil {
msg := fmt.Sprintf("failed json.Unmarshal(request.Body).err=%+v", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
return
}
log.Printf("%s\n", string(b))
dseJobStore, err := NewDSExportJobStore(r.Context(), DatastoreClient)
if err != nil {
msg := fmt.Sprintf("failed NewDSExportJobStore.err=%+v", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
return
}
res, err := datastore.CheckJobStatus(r.Context(), form.DatastoreExportJobID)
if err != nil {
msg := fmt.Sprintf("failed datastore.CheckJobStatus.err=%+v", err)
log.Println(msg)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
_, err := w.Write([]byte(msg))
if err != nil {
log.Println(err)
}
return
}
switch res.Status {
case datastore.Running:
log.Printf("%s is Running...\n", form.DatastoreExportJobID)
_, err := dseJobStore.IncrementJobStatusCheckCount(r.Context(), form.DS2BQJobID)
if err != nil {
log.Printf("failed DSExportJobStore.IncrementJobStatusCheckCount. DS2BQJobID=%v,err=%v\n", form.DS2BQJobID, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusConflict)
case datastore.Fail:
log.Printf("%s is Fail. ErrCode=%v,ErrMessage=%v\n", form.DatastoreExportJobID, res.ErrCode, res.ErrMessage)
_, err := dseJobStore.FinishExportJob(r.Context(), form.DS2BQJobID, DSExportJobStatusFailed, fmt.Sprintf("Code=%v,MSG=%v,META=%+v", res.ErrCode, res.ErrMessage, res.Metadata))
if err != nil {
log.Printf("failed DSExportJobStore.FinishExportJob. DS2BQJobID=%v,err=%v\n", form.DS2BQJobID, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
case datastore.Done:
log.Printf("%s is Done...\n", form.DatastoreExportJobID)
_, err := dseJobStore.FinishExportJob(r.Context(), form.DS2BQJobID, DSExportJobStatusDone, "")
if err != nil {
log.Printf("failed DSExportJobStore.FinishExportJob. DS2BQJobID=%v,err=%v\n", form.DS2BQJobID, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if err := InsertBQLoadJobs(r.Context(), form.DS2BQJobID, res.Metadata.OutputURLPrefix, r.Host); err != nil {
log.Printf("failed InsertBQLoadJobs. DS2BQJobID=%v,err=%v\n", form.DS2BQJobID, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
default:
log.Printf("%v is Unsupported Status\n", res.Status)
w.WriteHeader(http.StatusInternalServerError)
}
}
func InsertBQLoadJobs(ctx context.Context, ds2bqJobID string, outputURLPrefix string, host string) error {
bljs, err := NewBQLoadJobStore(ctx, DatastoreClient)
if err != nil {
return failure.Wrap(err, failure.Message("failed NewBQLoadJobStore"))
}
q, err := NewBQLoadJobCheckQueue(host, TasksClient)
if err != nil {
return failure.Wrap(err, failure.Message("failed NewBQLoadJobCheckQueue"))
}
ls := NewBQLoadService(bljs, q)
if err := ls.InsertBigQueryLoadJob(ctx, ds2bqJobID, outputURLPrefix); err != nil {
return failure.Wrap(err, failure.Message("failed BQLoadService.InsertBigQueryLoadJob"))
}
return nil
}