Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
package org.apache.texera.amber.engine.architecture.controller

import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import org.apache.texera.workflow.LogicalPlan
import org.apache.texera.amber.compiler.model.LogicalPlan

case class Workflow(
context: WorkflowContext,
logicalPlan: LogicalPlan,
// The logical plan is only retained for in-JVM compilation paths (amber's WorkflowCompiler,
// e2e TestUtils). At runtime the client ships a pre-compiled physical plan, so the Computing
// Unit builds a Workflow with `logicalPlan = None`; nothing on the execution path reads it.
logicalPlan: Option[LogicalPlan] = None,
physicalPlan: PhysicalPlan
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.dropwizard.Configuration
import io.dropwizard.configuration.{EnvironmentVariableSubstitutor, SubstitutingSourceProvider}
import io.dropwizard.setup.{Bootstrap, Environment}
import io.dropwizard.websockets.WebsocketBundle
import org.apache.texera.amber.config.{ApplicationConfig, StorageConfig}
import org.apache.texera.amber.config.{ApplicationConfig, EnvironmentalVariable, StorageConfig}
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.virtualidentity.ExecutionIdentity
import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowContext}
Expand All @@ -40,12 +40,11 @@ import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.common.storage.SequentialRecordStorage
import org.apache.texera.amber.engine.common.{AmberRuntime, Utils}
import org.apache.texera.amber.util.JSONUtils.objectMapper
import org.apache.texera.amber.util.ObjectMapperUtils
import org.apache.texera.amber.util.{ObjectMapperUtils, PhysicalPlanSerdeModule}
import org.apache.commons.jcs3.access.exception.InvalidArgumentException
import org.apache.texera.auth.SessionUser
import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
import org.apache.texera.web.auth.JwtAuth.setupJwtAuth
import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import org.apache.texera.web.resource.{
SyncExecutionResource,
Expand Down Expand Up @@ -136,16 +135,28 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
)
// register scala module to dropwizard default object mapper
bootstrap.getObjectMapper.registerModule(DefaultScalaModule)
// The execution request carries a pre-compiled PhysicalPlan; register its serializers so the
// CU deserializes it byte-for-byte compatibly with the workflow-compiling-service's output.
PhysicalPlanSerdeModule.register(bootstrap.getObjectMapper)
}

override def run(configuration: Configuration, environment: Environment): Unit = {
ObjectMapperUtils.warmupObjectMapperForOperatorsSerde()

SqlServer.initConnection(
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)
// In remote mode the CU routes execution-metadata operations over HTTP to the dashboard
// service and holds no Postgres credentials of its own (issue #5011).
val remote =
EnvironmentalVariable
.get(EnvironmentalVariable.ENV_EXECUTION_METADATA_REMOTE)
.contains("true")

if (!remote) {
SqlServer.initConnection(
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)
}

environment.jersey.setUrlPattern("/api/*")

Expand All @@ -163,42 +174,47 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with

environment.jersey.register(classOf[PveResource])

setupJwtAuth(environment)

// The Computing Unit performs no JWT authentication and holds no JWT secret (issue #5011): no
// JwtAuthFilter and no RolesAllowedDynamicFeature are registered, so @RolesAllowed is not
// enforced and the execution endpoints are open — the client ships a pre-compiled physical
// plan. The value-factory binder below is kept ONLY so that @Auth-annotated parameters on
// co-registered dashboard resources stay injectable (resolving to no authenticated user);
// it does not validate tokens. Contrast TexeraWebApplication, which keeps full JWT auth.
environment.jersey.register(
new io.dropwizard.auth.AuthValueFactoryProvider.Binder[SessionUser](classOf[SessionUser])
)
environment.jersey.register(
classOf[org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature]
)
environment
.servlets()
.addServletListeners(
new WebsocketPayloadSizeTuner(ApplicationConfig.maxWorkflowWebsocketRequestPayloadSizeKb)
)

val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs
if (ApplicationConfig.cleanupAllExecutionResults) {
// do one time cleanup of collections that were not closed gracefully before restart/crash
// retrieve all executions that were executing before the reboot.
val allExecutionsBeforeRestart: List[WorkflowExecutions] =
WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1)
cleanExecutions(
allExecutionsBeforeRestart,
statusByte => {
if (statusByte != maptoStatusCode(COMPLETED)) {
maptoStatusCode(FAILED) // for incomplete executions, mark them as failed.
} else {
statusByte
// Result/log cleanup needs a database connection and is owned by the dashboard service in
// remote mode, so skip it on the computing unit when running remotely.
if (!remote) {
val timeToLive: Int = ApplicationConfig.sinkStorageTTLInSecs
if (ApplicationConfig.cleanupAllExecutionResults) {
// do one time cleanup of collections that were not closed gracefully before restart/crash
// retrieve all executions that were executing before the reboot.
val allExecutionsBeforeRestart: List[WorkflowExecutions] =
WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(-1)
cleanExecutions(
allExecutionsBeforeRestart,
statusByte => {
if (statusByte != maptoStatusCode(COMPLETED)) {
maptoStatusCode(FAILED) // for incomplete executions, mark them as failed.
} else {
statusByte
}
}
}
)
}
scheduleRecurringCallThroughActorSystem(
2.seconds,
ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds
) {
recurringCheckExpiredResults(timeToLive)
)
}
scheduleRecurringCallThroughActorSystem(
2.seconds,
ApplicationConfig.sinkStorageCleanUpCheckIntervalInSecs.seconds
) {
recurringCheckExpiredResults(timeToLive)
}
}

environment.jersey.register(classOf[WorkflowExecutionsResource])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
package org.apache.texera.web

import com.typesafe.scalalogging.LazyLogging
import org.apache.http.client.utils.URLEncodedUtils
import org.apache.texera.auth.JwtAuth.jwtConsumer
import org.apache.texera.auth.util.HeaderField
import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum
import org.apache.texera.dao.jooq.generated.tables.pojos.User

import java.net.URI
import java.nio.charset.Charset
import javax.websocket.HandshakeResponse
import javax.websocket.server.{HandshakeRequest, ServerEndpointConfig}
import scala.jdk.CollectionConverters.{ListHasAsScala, _}
Expand Down Expand Up @@ -82,36 +78,14 @@ class ServletAwareConfigurator extends ServerEndpointConfig.Configurator with La
)
logger.debug(s"User created from headers: ID=$userId, Name=$userName")
} else {
// SINGLE NODE MODE: Construct the User object from JWT in query parameters.
val params =
URLEncodedUtils.parse(new URI("?" + request.getQueryString), Charset.defaultCharset())
// SINGLE NODE MODE: the Computing Unit does not authenticate. It grants WRITE access and
// runs whatever physical plan the client sends — no JWT is parsed or validated, so the CU
// holds no JWT secret. The execution owner is resolved downstream from the CU's
// USER_JWT_TOKEN when metadata is persisted.
config.getUserProperties.put(
HeaderField.UserComputingUnitAccess,
PrivilegeEnum.WRITE.name()
)
params.asScala
.map(pair => pair.getName -> pair.getValue)
.toMap
.get("access-token")
.map(token => {
val claims = jwtConsumer.process(token).getJwtClaims
config.getUserProperties.put(
classOf[User].getName,
new User(
claims.getClaimValue("userId").asInstanceOf[Long].toInt,
claims.getSubject,
String.valueOf(claims.getClaimValue("email").asInstanceOf[String]),
null,
null,
null,
null,
null,
null,
null,
null
)
)
})
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class TexeraWebApplication
environment.jersey.register(classOf[ProjectResource])
environment.jersey.register(classOf[ProjectAccessResource])
environment.jersey.register(classOf[WorkflowExecutionsResource])
environment.jersey.register(classOf[InternalExecutionMetadataResource])
environment.jersey.register(classOf[DashboardResource])
environment.jersey.register(classOf[GmailResource])
environment.jersey.register(classOf[AdminExecutionResource])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.texera.web.model.websocket.request

import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo}
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.workflow.LogicalLink

case class EditingTimeCompilationRequest(
operators: List[LogicalOp],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,28 @@
package org.apache.texera.web.model.websocket.request

import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.texera.amber.core.workflow.WorkflowSettings
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.workflow.LogicalLink
import org.apache.texera.amber.core.workflow.{PhysicalPlan, WorkflowSettings}

case class ReplayExecutionInfo(
@JsonDeserialize(contentAs = classOf[java.lang.Long])
eid: Long,
interaction: String
)

/**
* Execution request the client sends to the ComputingUnitMaster. The client (frontend / agent
* service) compiles the workflow against the workflow-compiling-service and ships the resulting
* ready-to-run [[PhysicalPlan]] here, so the CU neither compiles nor authenticates — it just runs
* the plan. `opsToViewResult` (logical operator ids) is used to mark which output ports need
* result storage.
*/
case class WorkflowExecuteRequest(
executionName: String,
engineVersion: String,
logicalPlan: LogicalPlanPojo,
physicalPlan: PhysicalPlan,
opsToViewResult: List[String] = List.empty,
replayFromExecution: Option[ReplayExecutionInfo], // contains execution Id, interaction Id.
workflowSettings: WorkflowSettings,
emailNotificationEnabled: Boolean,
computingUnitId: Int
) extends TexeraWebSocketRequest

case class LogicalPlanPojo(
operators: List[LogicalOp],
links: List[LogicalLink],
opsToViewResult: List[String],
opsToReuseResult: List[String]
)
Loading