Skip to content
This repository has been archived by the owner on May 18, 2021. It is now read-only.

Commit

Permalink
s/graph_id/flow_id/ golintyvety
Browse files Browse the repository at this point in the history
  • Loading branch information
zootalures committed Nov 21, 2017
1 parent 11ee9f7 commit bb04154
Show file tree
Hide file tree
Showing 30 changed files with 682 additions and 778 deletions.
50 changes: 25 additions & 25 deletions actor/api.go
Expand Up @@ -85,8 +85,8 @@ func NewGraphManager(persistenceProvider persistence.ProviderState, blobStore bl
}

func (m *actorManager) CreateGraph(ctx context.Context, req *model.CreateGraphRequest) (*model.CreateGraphResponse, error) {
m.log.WithFields(logrus.Fields{"graph_id": req.GraphId}).Debug("Creating graph")
r, e := m.forwardRequest(req, ctx)
m.log.WithFields(logrus.Fields{"flow_id": req.FlowId}).Debug("Creating graph")
r, e := m.forwardRequest( ctx,req)
if e != nil {
return nil, e
}
Expand All @@ -96,7 +96,7 @@ func (m *actorManager) CreateGraph(ctx context.Context, req *model.CreateGraphRe

func (m *actorManager) GetGraphState(ctx context.Context, req *model.GetGraphStateRequest) (*model.GetGraphStateResponse, error) {
m.log.Debug("Getting graph stage")
r, e := m.forwardRequest(req, ctx)
r, e := m.forwardRequest(ctx,req)
if e != nil {
return nil, e
}
Expand All @@ -105,7 +105,7 @@ func (m *actorManager) GetGraphState(ctx context.Context, req *model.GetGraphSta

func (m *actorManager) AddInvokeFunction(ctx context.Context, req *model.AddInvokeFunctionStageRequest) (*model.AddStageResponse, error) {
m.log.Debug("Adding stage")
r, e := m.forwardRequest(req, ctx)
r, e := m.forwardRequest(ctx,req)
if e != nil {
return nil, e
}
Expand All @@ -115,7 +115,7 @@ func (m *actorManager) AddInvokeFunction(ctx context.Context, req *model.AddInvo

func (m *actorManager) AddDelay(ctx context.Context, req *model.AddDelayStageRequest) (*model.AddStageResponse, error) {
m.log.Debug("Adding stage")
r, e := m.forwardRequest(req, ctx)
r, e := m.forwardRequest( ctx,req)
if e != nil {
return nil, e
}
Expand All @@ -124,7 +124,7 @@ func (m *actorManager) AddDelay(ctx context.Context, req *model.AddDelayStageReq
}

func (m *actorManager) AddStage(ctx context.Context, req *model.AddStageRequest) (*model.AddStageResponse, error) {
r, e := m.forwardRequest(req, ctx)
r, e := m.forwardRequest(ctx, req)
if e != nil {
return nil, e
}
Expand All @@ -135,7 +135,7 @@ func (m *actorManager) AddStage(ctx context.Context, req *model.AddStageRequest)


func (m *actorManager) AddValueStage(ctx context.Context, req *model.AddCompletedValueStageRequest) (*model.AddStageResponse, error) {
r, e := m.forwardRequest(req, ctx)
r, e := m.forwardRequest(ctx, req)
if e != nil {
return nil, e
}
Expand All @@ -145,8 +145,8 @@ func (m *actorManager) AddValueStage(ctx context.Context, req *model.AddComplete
}

func (m *actorManager) AwaitStageResult(ctx context.Context, req *model.AwaitStageResultRequest) (*model.AwaitStageResultResponse, error) {
m.log.WithFields(logrus.Fields{"graph_id": req.GraphId}).Debug("Getting stage result")
r, e := m.forwardRequest(req, ctx)
m.log.WithFields(logrus.Fields{"flow_id": req.FlowId}).Debug("Getting stage result")
r, e := m.forwardRequest(ctx,req)
if e != nil {
return nil, e
}
Expand All @@ -155,14 +155,14 @@ func (m *actorManager) AwaitStageResult(ctx context.Context, req *model.AwaitSta
}

func (m *actorManager) CompleteStageExternally(ctx context.Context, req *model.CompleteStageExternallyRequest) (*model.CompleteStageExternallyResponse, error) {
m.log.WithFields(logrus.Fields{"graph_id": req.GraphId}).Debug("Completing stage externally")
r, e := m.forwardRequest(req, ctx)
m.log.WithFields(logrus.Fields{"flow_id": req.FlowId}).Debug("Completing stage externally")
r, e := m.forwardRequest(ctx,req)
return r.(*model.CompleteStageExternallyResponse), e
}

func (m *actorManager) Commit(ctx context.Context, req *model.CommitGraphRequest) (*model.GraphRequestProcessedResponse, error) {
m.log.WithFields(logrus.Fields{"graph_id": req.GraphId}).Debug("Committing graph")
r, e := m.forwardRequest(req, ctx)
m.log.WithFields(logrus.Fields{"flow_id": req.FlowId}).Debug("Committing graph")
r, e := m.forwardRequest(ctx,req)
if e != nil {
return nil, e
}
Expand All @@ -179,22 +179,22 @@ func (m *actorManager) lookupSupervisor(req interface{}) (*actor.PID, error) {
return nil, fmt.Errorf("Ignoring request of unknown type %v", reflect.TypeOf(req))
}

shardID, err := m.shardExtractor.ShardID(msg.GetGraphId())
shardID, err := m.shardExtractor.ShardID(msg.GetFlowId())
if err != nil {
m.log.Warnf("Failed to extract shard for graph %s: %v", msg.GetGraphId(), err)
return nil, model.NewGraphNotFoundError(msg.GetGraphId())
m.log.Warnf("Failed to extract shard for graph %s: %v", msg.GetFlowId(), err)
return nil, model.NewGraphNotFoundError(msg.GetFlowId())
}

pid, ok := m.shardSupervisors[shardID]

if !ok {
m.log.Warnf("No local supervisor found for shard %d", shardID)
return nil, model.NewGraphNotFoundError(msg.GetGraphId())
return nil, model.NewGraphNotFoundError(msg.GetFlowId())
}
return pid, nil
}

func (m *actorManager) forwardRequest(req interface{}, ctx context.Context) (interface{}, error) {
func (m *actorManager) forwardRequest(ctx context.Context, req interface{} ) (interface{}, error) {
supervisor, err := m.lookupSupervisor(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -233,16 +233,16 @@ func (m *actorManager) StreamNewEvents(predicate persistence.StreamPredicate, fn

}

func (m *actorManager) SubscribeGraphEvents(graphID string, fromIndex int, fn persistence.StreamCallBack) (*eventstream.Subscription, error) {
graphPath, err := m.graphActorPath(graphID)
func (m *actorManager) SubscribeGraphEvents(flowID string, fromIndex int, fn persistence.StreamCallBack) (*eventstream.Subscription, error) {
graphPath, err := m.graphActorPath(flowID)
if err != nil {
return nil, err
}
return m.persistenceProvider.GetStreamingState().SubscribeActorJournal(graphPath, fromIndex, fn), nil
}

func (m *actorManager) QueryGraphEvents(graphID string, fromIndex int, p persistence.StreamPredicate, fn persistence.StreamCallBack) error {
graphPath, err := m.graphActorPath(graphID)
func (m *actorManager) QueryGraphEvents(flowID string, fromIndex int, p persistence.StreamPredicate, fn persistence.StreamCallBack) error {
graphPath, err := m.graphActorPath(flowID)
if err != nil {
return err
}
Expand All @@ -254,10 +254,10 @@ func (m *actorManager) UnsubscribeStream(sub *eventstream.Subscription) {
m.persistenceProvider.GetStreamingState().UnsubscribeStream(sub)
}

func (m *actorManager) graphActorPath(graphID string) (string, error) {
shardID, err := m.shardExtractor.ShardID(graphID)
func (m *actorManager) graphActorPath(flowID string) (string, error) {
shardID, err := m.shardExtractor.ShardID(flowID)
if err != nil {
return "", err
}
return fmt.Sprintf("%s/%s", supervisorName(shardID), graphID), nil
return fmt.Sprintf("%s/%s", supervisorName(shardID), flowID), nil
}
20 changes: 10 additions & 10 deletions actor/executor.go
Expand Up @@ -62,7 +62,7 @@ func (exec *graphExecutor) Receive(context actor.Context) {
}

func (exec *graphExecutor) HandleInvokeStage(msg *model.InvokeStageRequest) *model.FaasInvocationResponse {
stageLog := exec.log.WithFields(logrus.Fields{"graph_id": msg.GraphId, "stage_id": msg.StageId, "function_id": msg.FunctionId, "operation": msg.Operation})
stageLog := exec.log.WithFields(logrus.Fields{"flow_id": msg.FlowId, "stage_id": msg.StageId, "function_id": msg.FunctionId})
stageLog.Info("Running Stage")

buf := new(bytes.Buffer)
Expand Down Expand Up @@ -90,7 +90,7 @@ func (exec *graphExecutor) HandleInvokeStage(msg *model.InvokeStageRequest) *mod

req, _ := http.NewRequest("POST", exec.faasAddr+"/"+msg.FunctionId, buf)
req.Header.Set("Content-type", fmt.Sprintf("multipart/form-data; boundary=\"%s\"", partWriter.Boundary()))
req.Header.Set(protocol.HeaderFlowID, msg.GraphId)
req.Header.Set(protocol.HeaderFlowID, msg.FlowId)
req.Header.Set(protocol.HeaderStageRef, msg.StageId)
resp, err := exec.client.Do(req)

Expand All @@ -106,7 +106,7 @@ func (exec *graphExecutor) HandleInvokeStage(msg *model.InvokeStageRequest) *mod
stageLog.WithField("fn_call_id", callID).WithField("fn_lb_delay", lbDelayHeader).WithField("http_status", fmt.Sprintf("%d", resp.StatusCode)).Error("Got non-200 error from FaaS endpoint")

if resp.StatusCode == 504 {
return &model.FaasInvocationResponse{GraphId: msg.GraphId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: model.NewInternalErrorResult(model.ErrorDatumType_stage_timeout, "stage timed out"), CallId: callID}
return &model.FaasInvocationResponse{FlowId: msg.FlowId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: model.NewInternalErrorResult(model.ErrorDatumType_stage_timeout, "stage timed out"), CallId: callID}
}
return stageFailed(msg, model.ErrorDatumType_stage_failed, fmt.Sprintf("Invalid http response from functions platform code %d", resp.StatusCode), callID)
}
Expand All @@ -119,25 +119,25 @@ func (exec *graphExecutor) HandleInvokeStage(msg *model.InvokeStageRequest) *mod
}
stageLog.WithField("fn_call_id", callID).WithField("fn_lb_delay", lbDelayHeader).WithField("successful", fmt.Sprintf("%t", result.Successful)).Info("Got stage response")

return &model.FaasInvocationResponse{GraphId: msg.GraphId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: result, CallId: callID}
return &model.FaasInvocationResponse{FlowId: msg.FlowId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: result, CallId: callID}
}

func stageFailed(msg *model.InvokeStageRequest, errorType model.ErrorDatumType, errorMessage string, callID string) *model.FaasInvocationResponse {
return &model.FaasInvocationResponse{GraphId: msg.GraphId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: model.NewInternalErrorResult(errorType, errorMessage), CallId: callID}
return &model.FaasInvocationResponse{FlowId: msg.FlowId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: model.NewInternalErrorResult(errorType, errorMessage), CallId: callID}
}

func (exec *graphExecutor) HandleInvokeFunction(msg *model.InvokeFunctionRequest) *model.FaasInvocationResponse {
datum := msg.Arg

method := strings.ToUpper(model.HTTPMethod_name[int32(datum.Method)])
stageLog := exec.log.WithFields(logrus.Fields{"graph_id": msg.GraphId, "stage_id": msg.StageId, "target_function_id": msg.FunctionId, "method": method})
stageLog := exec.log.WithFields(logrus.Fields{"flow_id": msg.FlowId, "stage_id": msg.StageId, "target_function_id": msg.FunctionId, "method": method})
stageLog.Info("Sending function invocation")

var bodyReader io.Reader

if datum.Body != nil {
var err error
bodyReader, err = exec.blobStore.Read(msg.GraphId, datum.Body.BlobId)
bodyReader, err = exec.blobStore.Read(msg.FlowId, datum.Body.BlobId)
if err != nil {
stageLog.WithError(err).Warn("Failed to fetch blob from store")
return exec.invokeFailed(msg, "Failed to read data for invocation", "")
Expand Down Expand Up @@ -193,7 +193,7 @@ func (exec *graphExecutor) HandleInvokeFunction(msg *model.InvokeFunctionRequest
}
}

blob, err := exec.blobStore.Create(msg.GraphId, contentType, resp.Body)
blob, err := exec.blobStore.Create(msg.FlowId, contentType, resp.Body)
if err != nil {
stageLog.WithError(err).Warn("failed to persist data in blob store")
return exec.invokeFailed(msg, "Failed to persist HTTP response data", callID)
Expand All @@ -207,7 +207,7 @@ func (exec *graphExecutor) HandleInvokeFunction(msg *model.InvokeFunctionRequest
StatusCode: uint32(resp.StatusCode)}}}

result := &model.CompletionResult{Successful: exec.successfulResponse(resp), Datum: resultDatum}
return &model.FaasInvocationResponse{GraphId: msg.GraphId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: result, CallId: callID}
return &model.FaasInvocationResponse{FlowId: msg.FlowId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: result, CallId: callID}
}

func (exec *graphExecutor) successfulResponse(resp *http.Response) bool {
Expand All @@ -218,5 +218,5 @@ func (exec *graphExecutor) successfulResponse(resp *http.Response) bool {

func (exec *graphExecutor) invokeFailed(msg *model.InvokeFunctionRequest, errorMessage string, callID string) *model.FaasInvocationResponse {

return &model.FaasInvocationResponse{GraphId: msg.GraphId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: model.NewInternalErrorResult(model.ErrorDatumType_function_invoke_failed, errorMessage), CallId: callID}
return &model.FaasInvocationResponse{FlowId: msg.FlowId, StageId: msg.StageId, FunctionId: msg.FunctionId, Result: model.NewInternalErrorResult(model.ErrorDatumType_function_invoke_failed, errorMessage), CallId: callID}
}
7 changes: 3 additions & 4 deletions actor/executor_test.go
Expand Up @@ -301,10 +301,9 @@ func givenValidInvokeStageRequest(store blobs.Store, m *MockClient) *model.FaasI
}

result := exec.HandleInvokeStage(&model.InvokeStageRequest{
GraphId: "graph-id",
FlowId: "graph-id",
StageId: "stage-id",
FunctionId: "/function/id/",
Operation: model.CompletionOperation_thenApply,
Closure: model.NewBlob("closure", uint64(200), "content/type"),
Args: []*model.CompletionResult{model.NewSuccessfulResult(model.NewBlobDatum(model.NewBlob("arg1", uint64(200), "content/type"))), model.NewEmptyResult()},
})
Expand All @@ -320,7 +319,7 @@ func givenValidFunctionRequest(store blobs.Store, m *MockClient, body *model.Blo
}

result := exec.HandleInvokeFunction(&model.InvokeFunctionRequest{
GraphId: "graph-id",
FlowId: "graph-id",
StageId: "stage-id",
FunctionId: "/function/id/",
Arg: &model.HTTPReqDatum{
Expand All @@ -339,7 +338,7 @@ func givenValidFunctionRequest(store blobs.Store, m *MockClient, body *model.Blo
func hasValidResult(t *testing.T, result *model.FaasInvocationResponse) {
assert.Equal(t, "/function/id/", result.FunctionId)
assert.Equal(t, "stage-id", result.StageId)
assert.Equal(t, "graph-id", result.GraphId)
assert.Equal(t, "graph-id", result.FlowId)
require.NotNil(t, result.Result)
require.NotNil(t, result.Result.GetDatum())
}
Expand Down

0 comments on commit bb04154

Please sign in to comment.