Skip to content

Commit

Permalink
Presto - Reduce number of queries run (flyteorg#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor committed Jun 16, 2020
1 parent fb08a9f commit 3a54059
Showing 1 changed file with 15 additions and 34 deletions.
49 changes: 15 additions & 34 deletions go/tasks/plugins/presto/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func HandleExecutionState(
newState, transformError = MonitorQuery(ctx, tCtx, currentState, executionsCache)

case PhaseQuerySucceeded:
if currentState.QueryCount < 4 {
if currentState.QueryCount < 1 {
// If there are still Presto statements to execute, increment the query count, reset the phase to 'queued'
// and continue executing the remaining statements. In this case, we won't request another allocation token
// as the 5 statements that get executed are all considered to be part of the same "query"
// as the 2 statements that get executed are all considered to be part of the same "query"
currentState.PreviousPhase = currentState.CurrentPhase
currentState.CurrentPhase = PhaseQueued
} else {
Expand Down Expand Up @@ -304,7 +304,18 @@ func GetNextQuery(
user = tCtx.TaskExecutionMetadata().GetNamespace()
}

statement = fmt.Sprintf(`CREATE TABLE hive.flyte_temporary_tables."%s_temp" AS %s`, tempTableName, statement)
externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "")
if err != nil {
return Query{}, err
}

queryWrapTemplate := `
CREATE TABLE hive.flyte_temporary_tables."%s_temp"
WITH (format = 'PARQUET', external_location = '%s')
AS (%s)
`

statement = fmt.Sprintf(queryWrapTemplate, tempTableName, externalLocation, statement)

prestoQuery := Query{
Statement: statement,
Expand All @@ -317,46 +328,16 @@ func GetNextQuery(
},
TempTableName: tempTableName + "_temp",
ExternalTableName: tempTableName + "_external",
ExternalLocation: externalLocation.String(),
}

return prestoQuery, nil

case 1:
externalLocation, err := tCtx.DataStore().ConstructReference(ctx, tCtx.OutputWriter().GetRawOutputPrefix(), "")
if err != nil {
return Query{}, err
}

statement := fmt.Sprintf(`
CREATE TABLE hive.flyte_temporary_tables."%s" (LIKE hive.flyte_temporary_tables."%s")
WITH (format = 'PARQUET', external_location = '%s')`,
currentState.CurrentPrestoQuery.ExternalTableName,
currentState.CurrentPrestoQuery.TempTableName,
externalLocation,
)
currentState.CurrentPrestoQuery.Statement = statement
currentState.CurrentPrestoQuery.ExternalLocation = externalLocation.String()
return currentState.CurrentPrestoQuery, nil

case 2:
statement := `
INSERT INTO hive.flyte_temporary_tables."%s"
SELECT *
FROM hive.flyte_temporary_tables."%s"`
statement = fmt.Sprintf(statement, currentState.CurrentPrestoQuery.ExternalTableName, currentState.CurrentPrestoQuery.TempTableName)
currentState.CurrentPrestoQuery.Statement = statement
return currentState.CurrentPrestoQuery, nil

case 3:
statement := fmt.Sprintf(`DROP TABLE hive.flyte_temporary_tables."%s"`, currentState.CurrentPrestoQuery.TempTableName)
currentState.CurrentPrestoQuery.Statement = statement
return currentState.CurrentPrestoQuery, nil

case 4:
statement := fmt.Sprintf(`DROP TABLE hive.flyte_temporary_tables."%s"`, currentState.CurrentPrestoQuery.ExternalTableName)
currentState.CurrentPrestoQuery.Statement = statement
return currentState.CurrentPrestoQuery, nil

default:
return currentState.CurrentPrestoQuery, nil
}
Expand Down

0 comments on commit 3a54059

Please sign in to comment.