Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink:fix flink streaming query problem [ Cannot get a client from a closed pool] #6614

Merged
merged 10 commits into from
Mar 21, 2023

Conversation

xuzhiwen1255
Copy link
Contributor

Solve the problems of #6455

cause :

lazyTable() will open the catalog, then loadTable,loadTable will build tableOps into the loaded table, After the table is loaded, the close method of tableLoader is called. The close method closes the clientPool.At this point, the clientPool has been passed into tableOps as a parameter. When planSplits are needed in Enumerator, the refresh of table needs to be called, and the clientPool in tableOps has been closed,therefore, the exception [Cannot get a client from a closed pool] occurs.

  private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
      SplitEnumeratorContext<IcebergSourceSplit> enumContext,
      @Nullable IcebergEnumeratorState enumState) {
    SplitAssigner assigner;
    if (enumState == null) {
      assigner = assignerFactory.createAssigner();
    } else {
      LOG.info(
          "Iceberg source restored {} splits from state for table {}",
          enumState.pendingSplits().size(),
          lazyTable().name());
      assigner = assignerFactory.createAssigner(enumState.pendingSplits());
    }

    if (scanContext.isStreaming()) {
        // TODO 
      ContinuousSplitPlanner splitPlanner =
          new ContinuousSplitPlannerImpl(lazyTable(), scanContext, planningThreadName());
      return new ContinuousIcebergEnumerator(
          enumContext, assigner, scanContext, splitPlanner, enumState);
    } else {
      List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
      assigner.onDiscoveredSplits(splits);
      return new StaticIcebergEnumerator(enumContext, assigner);
    }
  }

@github-actions github-actions bot added the core label Jan 18, 2023
@xuzhiwen1255
Copy link
Contributor Author

@stevenzwu Can you have a look?

* use pool.
*/
private void checkIsClosedOtherwiseReuse() {
if (connections != null && connections.isClosed()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can confirm whether it is closed before each use. If it is closed, we need to open it again and use it again

@stevenzwu
Copy link
Contributor

stevenzwu commented Jan 18, 2023

@xuzhiwen1255 thanks a lot for the root cause analysis. I agree with your conclusion. Let's discuss what is the right fix though. I am not sure we should add the extra complexity for JdbcCatalog and ClientPoolImpl.

I was wondering why this doesn't happen for HiveCatalog. Turns out the difference is that JdbcCatalog is Closeable. Flink TableLoader has the following close behavior, which seems fine to me.

    @Override
    public void close() throws IOException {
      if (catalog instanceof Closeable) {
        ((Closeable) catalog).close();
      }
    }

Both FlinkSource and the new FLIP-27 IcebergSource has the same usage pattern of TableLoader in the source construction. Here is the code snippet from FlinkSource.

    public FlinkInputFormat buildFormat() {
      Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");

      Schema icebergSchema;
      FileIO io;
      EncryptionManager encryption;
      if (table == null) {
        // load required fields by table loader.
        tableLoader.open();
        try (TableLoader loader = tableLoader) {
          table = loader.loadTable();
          icebergSchema = table.schema();
          io = table.io();
          encryption = table.encryption();
        } catch (IOException e) {
          throw new UncheckedIOException(e);
        }
      } else {

I guess issue #6455 is probably the firs report of using closeable JdbcCatalog in Flink source. Both FlinkSource and FLIP-27 IcebergSource need to load a Table object to extract fields like schema, io, etc.

This issue/PR pointed out the problem of reusing the TableLoader object for internal table loading inside the FlinkSource or IcebergSource. One alternative fix is to add a clone() method to Flink TableLoader interface. Then the source internal usage of TableLoader can open and close the cloned TableLoader object, which avoids the close of the original TableLoader object.

This is a more fundamental change. Hence, I would like to get more feedbacks from @hililiwei @pvary @rdblue .

@stevenzwu
Copy link
Contributor

BTW, this usage pattern of TableLoader also exists in FlinkSink.

    private <T> DataStreamSink<T> chainIcebergOperators() {
      Preconditions.checkArgument(
          inputCreator != null,
          "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
      Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");

      DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);

      if (table == null) {
        tableLoader.open();
        try (TableLoader loader = tableLoader) {
          this.table = loader.loadTable();
        } catch (IOException e) {
          throw new UncheckedIOException(
              "Failed to load iceberg table from table loader: " + tableLoader, e);
        }
      }

@pvary
Copy link
Contributor

pvary commented Jan 19, 2023

I need to think about this some more, but it feels strange to me that something which does not own an object 'closes' it.

If I understand correctly the TabeLoader is serializable, so in theory it should be cloneable too. We should also check the implications of this on the underlying JDBC/HMS pools as well.

@hililiwei
Copy link
Contributor

hililiwei commented Jan 19, 2023

This issue/PR pointed out the problem of reusing the TableLoader object for internal table loading inside the FlinkSource or IcebergSource. One alternative fix is to add a clone() method to Flink TableLoader interface. Then the source internal usage of TableLoader can open and close the cloned TableLoader object, which avoids the close of the original TableLoader object.

We can actually try to solve this problem from the inside of TableLoader, it makes sense to me to clone a table.

But I also wonder if: If a catalog is closed, do tables loaded with its internal objects need to be kept available?

  • If the answer is yes, jdbcCatalog doesn't seem to guarantee it. It closes its jdbc pool when it closes, this is one of the causes of this issuse.
    Consider the following code:
Table table = jdbcCatalog.loadTable(tableIdent);
jdbcCatalog.close();

The table can not be used because its JdbcClientPool is closed.

  • If the answer is no, I'm going to vote for clone table plan.

Please correct me if I make a mistake. I need to continue thinking about this as well.

@stevenzwu
Copy link
Contributor

If a catalog is closed, do tables loaded with its internal objects need to be kept available?

@hililiwei I think you have a good question here. if catalog is closed, should Table object loaded by the catalog be useable? it is unclear to me what's the answer. @rdblue, what's your take?

@@ -56,7 +56,6 @@ public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedExc
C client = get();
try {
return action.run(client);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove unnecessary whitespace changes.


protected abstract C reconnect(C client);

protected abstract void close(C client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move these back.

@rdblue
Copy link
Contributor

rdblue commented Jan 23, 2023

I think that if a catalog is closed, it's reasonable for tables to stop operating as well. The catalog manages its shared resources and if it chooses to share a connection pool with tables then it makes sense for the tables to no longer be able to connect after the pool is closed.

Tables should not own their own connection pools, so some resource needs to manage them and the catalog is a good place to do that.

I think the problem is that the TableLoader is taking ownership of the catalog and closing it. That seems incorrect to me.

@hililiwei
Copy link
Contributor

I think the problem is that the TableLoader is taking ownership of the catalog and closing it. That seems incorrect to me.

So from that point of view, cloning tables doesn't seem like a good idea, they cannot share resources. . We should keep the catalog available in the loader, or create a statically shared catalog.
Alternatively, we could try to get rid of tableLoader altogether and switch to catalog, but this would be a compatibility breaking change.

@stevenzwu
Copy link
Contributor

stevenzwu commented Jan 23, 2023

I think the problem is that the TableLoader is taking ownership of the catalog and closing it. That seems incorrect to me.

Yes, that will be bigger discussion.

Alternatively, we could try to get rid of tableLoader altogether and switch to catalog, but this would be a compatibility breaking change.

We can deprecate the TableLoader and provide an alternative if we can come up with a reasonable solution.

we can just get rid of TableLoader as it will break compatibility. So we still need to figure out a short-term solution for the issue #6455.

I think this Flink usage pattern of TableLoader is fine if the Table is used as a read-only table.

      if (table == null) {
        try (TableLoader loader = tableLoader) {
          loader.open();
          this.table = tableLoader.loadTable();
        } catch (IOException e) {
          throw new UncheckedIOException(e);
        }
      }

The problem for ContinuousSplitPlannerImpl of FLIP-27 IcebergSource is that it uses the Table as non read-only, as it needs to refresh the table. With catalog closed, the table is invalid for refresh. One solution is to pass in a cloned TableLoader to the ContinuousSplitPlannerImpl, which can assumes the ownership of the TableLoader. ContinuousSplitPlannerImpl also has a close method and can call TableLoader#close method to release the resources. If we can add a clone method to TableLoader interface, this approach can work.

@xuzhiwen1255 I definitely think we should close this PR as this is not the right way to fix the problem.

@pvary
Copy link
Contributor

pvary commented Jan 23, 2023

The problem for ContinuousSplitPlannerImpl of FLIP-27 IcebergSource is that it uses the Table as non read-only, as it needs to refresh the table.

Table API assumes that you can have a refrence for the table "forever" and you can refresh the table data whenever you want. There should be no restrictions.

This means that the table needs access to an open connection pool until the table is closed. I think that closing the JDBC pool before closing the table is the mistake here.

I think it is not by chance that we do not have a close method on the general Catalog interface. As a general rule we expect the Catalog to be an easy static wrapper around the resources needed to access the table snapshot pointer.
This could be strange and I had to push back several times on closing the HMS Connection pool in HiveCatalog when someone wanted to "fix" this issue.

The JDBC and maybe some other Catalog implementations did not do this pushback, and we are in the situation where the different Catalog implementations behave differently.

We should standardize the behavior (who is responsible for closing the connection pools). Hive has its own PoolCache to close unused pools, JDBC doesn't have this (if I understand correctly)

@stevenzwu
Copy link
Contributor

stevenzwu commented Jan 23, 2023

Table API assumes that you can have a reference for the table "forever" and you can refresh the table data whenever you want. There should be no restrictions.
This means that the table needs access to an open connection pool until the table is closed. I think that closing the JDBC pool before closing the table is the mistake here.

SerializableTable is an read-only copy of Table. we can't refresh a SerializableTable. Also note that Table interface doesn't have a close method.

As a general rule we expect the Catalog to be an easy static wrapper around the resources needed to access the table snapshot pointer.

static wrapper can make the object lifecycle management difficult. E.g., Flink needs to unload dynamically loaded classes in user code that often includes connectors.

@pvary
Copy link
Contributor

pvary commented Jan 24, 2023

static wrapper can make the object lifecycle management difficult. E.g., Flink needs to unload dynamically loaded classes in user code that often includes connectors.

What is the best way to use Connection pools in Flink tasks? Like a pool for HMSConnection, or JDBC driver?
Shall we add that, and the wrapper classes to the main Flink classpath, so the classes are not loaded/reloaded every time?

@stevenzwu
Copy link
Contributor

What is the best way to use Connection pools in Flink tasks? Like a pool for HMSConnection, or JDBC driver?
Shall we add that, and the wrapper classes to the main Flink classpath, so the classes are not loaded/reloaded every time?

That is a good question. It is a bit challenging. The easier model is share nothing across tasks (e.g. no global static conn pools). Let's say each TM as 8 subtask, each task need to open a client (and connection pool) talking to some external system. Each task is responsible for the ownership and lifecycle of the client. This ties the client/conn pool lifecycle to the Flink subtask lifecycle.

Yeah, if it is a global static, I think your suggestion of loading classes in the Flink parent classloader is probably the way to go. But that would requires some meddling of the image packaging of including the connector jars in the flink/libs dir.

@stevenzwu
Copy link
Contributor

I think that if a catalog is closed, it's reasonable for tables to stop operating as well. The catalog manages its shared resources and if it chooses to share a connection pool with tables then it makes sense for the tables to no longer be able to connect after the pool is closed.

Tables should not own their own connection pools, so some resource needs to manage them and the catalog is a good place to do that.

@rdblue I agree that in this case tables weren't able to connect/refresh after the pool is closed by the catalog. But I feel it should be ok to use the table as read-only (like a SerializableTable). what do you think?

I think the problem is that the TableLoader is taking ownership of the catalog and closing it. That seems incorrect to me.

It is not necessarily wrong for TableLoader to take the ownership of the catalog. because CatalogTableLoader loads the catalog in its open method, it is fine for CatalogTableLoader to close the catalog in its close method.

I am not saying it is in the most efficient way, as it implies each TableLoader object is going to load its own Catalog. there is no way to share Catalog. This is probably not an issue for most Flink jobs which don't load too many tables in one job.

@xuzhiwen1255
Copy link
Contributor Author

I'm sorry, I've been spending time with my family recently, so I haven't discussed this issue together.

I would like to share my opinion.

I think that if a catalog is closed, it's reasonable for tables to stop operating as well. The catalog manages its shared resources and if it chooses to share a connection pool with tables then it makes sense for the tables to no longer be able to connect after the pool is closed.

Tables should not own their own connection pools, so some resource needs to manage them and the catalog is a good place to do that.

I think the problem is that the TableLoader is taking ownership of the catalog and closing it. That seems incorrect to me.

+1,I think that after the catalog is closed, the table should be closed as well.

I think this is a code bug.

Suppose we use another catalog that is not currently jdbcCatalog, and it passes in some closeable objects, but the same problem still exists when catalog#close is closed.

Therefore, we need to avoid still using the table loaded by the current catalog after the catalog is closed. This will cause some unexpected situations to occur. In fact, some problems have been exposed now.

I think of a way to circumvent this problem, I wonder if it is feasible
pseudocode

// Abstract out a table accessor, through the public accessor to access the table,he manages the life cycle of the tables and catalog.
public class TableAccessor implements Closeable {
  private final TableLoader tableLoader;
  private Table table;

  public TableAccessor(TableLoader tableLoader) {
    this.tableLoader = tableLoader;
  }

  private Table lazyTable() {
    if (table == null) {
      tableLoader.open();
      try (TableLoader loader = tableLoader) {
        this.table = loader.loadTable();
      } catch (IOException e) {
        throw new UncheckedIOException("Failed to close table loader", e);
      }
    }
    return table;
  }

  @Override
  public void close() throws IOException {
    tableLoader.close();
  }
}

// ----- icebergSource ---------
  private TableAccessor tableAccessor;
  
  IcebergSource(
      TableLoader tableLoader) {
    this.tableLoader = tableLoader;
// An accessor is created when an icebergSource is built, and subsequent operations or reference retrieval on the table is obtained by the accessor
    tableAccessor = new TableAccessor(tableLoader);
  }


  private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
    ExecutorService workerPool =
        ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
    try {
      List<IcebergSourceSplit> splits =
          FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool);
      LOG.info(
          "Discovered {} splits from table {} during job initialization",
          splits.size(),
          lazyTable().name());
      return splits;
    } finally {
      workerPool.shutdown();
    // close accessor
      tableAccessor.close();
    }
  }

For streaming mode, we pass the accessor directly to the ContinuousSplitPlannerImpl, which closes the accessor.

 private TableAccessor tableAccessor;
  public ContinuousSplitPlannerImpl(TableAccessor tableAccessor .......) {
    this.tableAccessor =tableAccessor;
    .....
  }

  @Override
  public void close() throws IOException {
    if (!isSharedPool) {
      workerPool.shutdown();
    }
    tableAccessor.close();
  }

In this way, we need to access the table through the accessor of a table. tableLoader is maintained by the accessor. When the accessor is closed, we close the tableLoader to ensure that it is closed correctly and that the reference type in the table will not be unavailable because catalog is closed first.

@stevenzwu @pvary @hililiwei What do you think of this plan.
Please correct me if I'm wrong.

@xuzhiwen1255
Copy link
Contributor Author

TableAccessor
This is my experimental idea, yet to be tested. There may be a better way, but I haven't thought of it yet.

@hililiwei
Copy link
Contributor

That is a good question. It is a bit challenging. The easier model is share nothing across tasks (e.g. no global static conn pools). Let's say each TM as 8 subtask, each task need to open a client (and connection pool) talking to some external system. Each task is responsible for the ownership and lifecycle of the client. This ties the client/conn pool lifecycle to the Flink subtask lifecycle.

+1 for this.

In this way, we need to access the table through the accessor of a table. tableLoader is maintained by the accessor. When the accessor is closed, we close the tableLoader to ensure that it is closed correctly and that the reference type in the table will not be unavailable because catalog is closed first.

We should first standardize the behavior. The current approach, in my opinion, is not the best way to solve the problem.

From the above discussion, it seems that we have the following two solutions:

  1. Use catalog instead of TableLoader.
    It's a big change, but it's more in line with the current behavior of catalog. In addition, all resources are managed by the catalog.
  2. Clone a new table.
    In Flip-27, Read-only table does not meet our requirements, so we can try to clone a new table in the table loader. It's independent of catalog. We need to manage it manually.
  3. Clone a new TableLoader
    Pass in a cloned TableLoader to the ContinuousSplitPlannerImpl, and add a clone method to TableLoader interface.

Did I miss anything?

@stevenzwu
Copy link
Contributor

  1. Clone a new table. In Flip-27, Read-only table does not meet our requirements, so we can try to clone a new table in the table loader. It's independent of catalog. We need to manage it manually.

@hililiwei, this won't work. as a table may need to access the client pools that is closed by catalog

  1. Clone a new TableLoader. Pass in a cloned TableLoader to the ContinuousSplitPlannerImpl, and add a clone method to TableLoader interface.

this is a feasible solution for fixing the current problem with TableLoader. And we have to fix the problem for TableLoader as we can't just drop it and switch to completely new API or model for compatibility reason.

  1. Use catalog instead of TableLoader. It's a big change, but it's more in line with the current behavior of catalog. In addition, all resources are managed by the catalog.

This could be the proper long-term solution. we need to think carefully here and make sure the new API makes sense.

@stevenzwu
Copy link
Contributor

stevenzwu commented Jan 29, 2023

@xuzhiwen1255 regarding your TableAccessor proposal, I think it is simpler to just add TableLoader#clone so that we maintain the same resource management model where the TableLoader object owns the catalog (open and close).

@xuzhiwen1255
Copy link
Contributor Author

xuzhiwen1255 commented Jan 30, 2023

@stevenzwu @hililiwei I tried to change it, please review it for me, thank you.
For the sink, he did not operate the table, so I did not change his logic, only modified the logic of the source.

@xuzhiwen1255
Copy link
Contributor Author

@pvary Can you take a look? Thank you

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really close, except for one nit comment on the unnecessary whitespace/empty line change.

@hililiwei @pvary do you have other comments for this PR?

@stevenzwu
Copy link
Contributor

@xuzhiwen1255 can you rebase this PR? we will need to update the REST catalog from PR #7044

@xuzhiwen1255
Copy link
Contributor Author

@stevenzwu No problem, the modification has been completed.

@xuzhiwen1255 xuzhiwen1255 requested review from pvary and hililiwei and removed request for pvary and hililiwei March 14, 2023 07:28
@stevenzwu stevenzwu merged commit 2662591 into apache:master Mar 21, 2023
@stevenzwu
Copy link
Contributor

thanks @xuzhiwen1255 for the contribution . can you create a backport PR for 1.14 and 1.15?

@xuzhiwen1255
Copy link
Contributor Author

@stevenzwu Of course I would.

xuzhiwen1255 pushed a commit to xuzhiwen1255/iceberg that referenced this pull request Mar 22, 2023
xuzhiwen1255 pushed a commit to xuzhiwen1255/iceberg that referenced this pull request Mar 22, 2023
xuzhiwen1255 pushed a commit to xuzhiwen1255/iceberg that referenced this pull request Mar 22, 2023
stevenzwu pushed a commit that referenced this pull request Mar 23, 2023
xuzhiwen1255 pushed a commit to xuzhiwen1255/iceberg that referenced this pull request Mar 23, 2023
stevenzwu pushed a commit that referenced this pull request Mar 24, 2023
@xuzhiwen1255 xuzhiwen1255 deleted the fix-pool-closed branch March 24, 2023 03:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants