Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) #6594

Closed
wants to merge 73 commits into from

Conversation

@Xeli
Copy link
Contributor

Xeli commented Aug 21, 2018

What is the purpose of the change
Adding a PubSub connector with support for Checkpointing

Verifying this change
This change added tests and can be verified as follows:

  • Added unit tests.

  • Added integration tests to flink-end-to-end which runs against docker.

  • An example has been added in flink-examples which runs against the actual Google PubSub service. this has been manually verified.

  • Is there a need for integration tests?
    We feel like there is and have added them.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): Yes, Google Cloud Sdk for PubSub but because it is a connector this does not add any dependencies in flink itself.
  • The public API, i.e., is any changed class annotated with @public(Evolving): No
  • The serializers: No
  • The runtime per-record code paths (performance sensitive): Nothing has been changed only a connector has been added.
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
  • Yarn/Mesos, ZooKeeper: No
  • The S3 file system connector: No

Documentation
Does this pull request introduce a new feature? Yes
If yes, how is the feature documented? JavaDocs, added an example in flink-examples and we updated the website docs.

Copy link
Contributor

yanghua left a comment

Hi @Xeli thanks for your contribution, A big PR. I just have a glance and give some comment you can consider.

@Xeli

This comment has been minimized.

Copy link
Contributor Author

Xeli commented Aug 25, 2018

Thanks you for your review @yanghua ! I made edits to fix most of them, I've posted 2 follow up questions on the points that were not entirely clear to me :)

@Xeli Xeli force-pushed the Xeli:master branch from ff8a3ce to d25c840 Sep 2, 2018
docs/dev/connectors/pubsub.md Outdated Show resolved Hide resolved
@sceee sceee mentioned this pull request Sep 11, 2018
@Xeli Xeli force-pushed the Xeli:master branch 2 times, most recently from 6e89012 to 36ad321 Sep 15, 2018
@Xeli Xeli force-pushed the Xeli:master branch from 5f2ad56 to 698b6f1 Oct 14, 2018
Copy link
Contributor

zentol left a comment

I had a look at some parts of the PR.

I can see a lot of mocking in the test; it would be good to reduce that as history has shown that they just cause problems down the line.

Copy link
Contributor

Fokko left a comment

Looks great @Xeli Looking forward for getting PubSub into Flink!

@nielsbasjes nielsbasjes force-pushed the Xeli:master branch from 5915e17 to df85204 Nov 12, 2018
@nielsbasjes

This comment has been minimized.

Copy link
Contributor

nielsbasjes commented Nov 12, 2018

I have fixed a subset of the review comments and rebased against the current master.

@nielsbasjes

This comment has been minimized.

Copy link
Contributor

nielsbasjes commented Nov 12, 2018

From the review comments a few points remain open for me:
Both are about the way the 'testing' code is part of the code base:

  • The stopping of the Source (Bound being a separate Source)
  • The connection to the local PubSub emulator (hostAndPort).

How do we proceed on these points?

@nielsbasjes

This comment has been minimized.

Copy link
Contributor

nielsbasjes commented Nov 14, 2018

I check why the build failed and all I could find is this:

[FAIL] 'Kafka 0.10 end-to-end test' failed after 0 minutes and 54 seconds! Test exited with exit code 0 but the logs contained errors, exceptions or non-empty .out files

I have been unable to find out why this happens and what our changes did to cause this.
Please advise.

@zentol

This comment has been minimized.

Copy link
Contributor

zentol commented Nov 15, 2018

The test failure is likely just an instability in the kafka test, feel free to ignore it for now.

@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Feb 13, 2019

Thanks a lot for this pull request.
I will try to help you to get it into Flink soon.

docs/dev/connectors/pubsub.md Outdated Show resolved Hide resolved
@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Feb 13, 2019

@zentol What's your take on the "Sink / Source" name, vs "Consumer / Producer"

@Xeli Xeli force-pushed the Xeli:master branch 2 times, most recently from c16fe05 to 635549a Feb 14, 2019
@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Feb 15, 2019

@Xeli and @nielsbasjes: Before we go really deep into the reviewing of this code, let's plan this contribution a bit.

I would really like this to go into Flink, ideally into the upcoming 1.8 release. I have the feeling that multiple people would like to use this connector, and maybe get the attention of Google and some Google Cloud users with this effort :)

But I also see a few problems with this effort
A) I haven't worked on the Flink codebase for over a year, so my reviewing skills are a bit rusty. I will try to mitigate this by asking for additional reviewing help once I've gone over the code once. But this means for you that there will be probably multiple rounds of reviews (and that I might comment on things which are not actually an issue).
B) There is not much time left till the 1.8 release. I don't know how many things we find in the code that need to be addressed, and how much time you have to address them.
C) I found already some issues in the code with respect to exception handling, logging, maybe performance. I also want to do some correctness testing (such as rescaling, failures etc.). Have you done such tests already?

Despite these problems, I'm committed to get this in. I'd be happy to support an approach where we decide to list "known issues" in the documentation. Once the release is out, we iteratively open small pull requests (maybe with the help from others) to resolve the known issues (+ the feedback we get after the release).

What do you think about this?

@Xeli

This comment has been minimized.

Copy link
Contributor Author

Xeli commented Feb 15, 2019

Hi @rmetzger

We are still eager to get the code in the flink repository, especially trying to get it in 1.8 sounds really good.

I am fine with putting some more effort into this as long as we can keep up the momentum and have a 1.8 release as the goal for now.

Regarding your points:

A) It would help us if you could make it clear which points I can improve (such as the exception logging comment) and which points are more of a design decision comment (source/sink vs subscription/publisher) and should not be changed just yet. I am fine with multiple review rounds as long as we keep the pace. :)
B) I don't see too many problems here for now.
C) We have done extensive tests using our own use-case as a base (a financial transaction processing system), so, for example, at-least-once guarantees under different failure scenarios has been tested quite extensively. And this PR also has some changes in existing code to make sure we handle rescaling and such properly.

All in all, I think we're on the same level, let's try to get this in 1.8 :)

@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Feb 15, 2019

Great!
I will then do the following:

  1. I do a pass over the code, commenting on everything that seems suspicious
  2. I will run some more tests with probably more feedback
  3. I wait till you've addressed all outstanding items, then we do a final round of review
@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Feb 15, 2019

One thing to consider in the long run: Apache Beam / Google Cloud Dataflow's PubSub connector has a mode that manages subscriptions behind the scenes: https://cloud.google.com/dataflow/model/pubsub-io

Copy link
Contributor

rmetzger left a comment

As promised, this is my first pass over my code.

I haven't reviewed the tests yet, as I'm not sure how much will change on the main code.
In particular, I'm a bit uncertain regarding the changes in MessageAcknowledgingSourceBase

docs/dev/connectors/pubsub.md Outdated Show resolved Hide resolved
flink-connectors/flink-connector-pubsub/pom.xml Outdated Show resolved Hide resolved
public Subscriber getSubscriber(Credentials credentials, ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) {
FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(10000L)
.setMaxOutstandingRequestBytes(100000L)

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 15, 2019

Contributor

Design question: I guess 99% of all users of the connector will use this default provider. Right now, these two parameters are hard-coded.
Does it make sense to make them configurable?
Maybe not through the builder of the Source itself, but through the constructors of the factory?
People will then have to instantiate the factory themselves, but at least they can put custom parameters

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 15, 2019

Contributor

What happens if we receive a message > 100kb ? (Since the maxOutstandingRequestBytes has been set to that value)

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 17, 2019

Author Contributor

final hostAndPort has been fixed.

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 17, 2019

Author Contributor

In my own applications I change/tune 3 parameters:

  • setMaxOutstandingElementCount
  • setMaxOutstandingRequestBytes
  • setMaxAckExtensionPeriod

The first two go hand in hand. They are what you would tune when dealing with backpressure (see the flink docs pubsub section I added for a bit of explanation).
The third indicates how long a message is kept unacknowledged. After this time PubSub might start sending the message to other consumers. This is relevant when the time between checkpoints is high.

My thought was to not expose this through the Source builder because it's kinda tricky to properly explain what they do, while in the context of a Subscriber the google docs speak for themselves.

I do see your point though: it would increase usability a lot if people can just tune the number a bit without having to go through the hassle of figuring out how to create a Subscriber.

So I now added:

	/**
	 * Tune how many messages the connector will buffer when the Flink pipeline cannot keep up (backpressure).
	 *
	 * @param maxMessagesReceivedNotProcessed This indicates how many messages will be read and buffered until when the flink pipeline can't handle the messages fast enough.
	 * @param maxBytesReceivedNotProcessed This indicates how many bytes will be read and buffered. A good pick would be: maxBytesReceivedNotProcessed = maxMessagesReceivedNotProcessed * averageBytesSizePerMessage
	 */
	public PubSubSourceBuilder<OUT> withBackpressureParameters(long maxMessagesReceivedNotProcessed, long maxBytesReceivedNotProcessed) {
		this.maxMessagesReceivedNotProcessed = maxMessagesReceivedNotProcessed;
		this.maxBytesReceivedNotProcessed = maxBytesReceivedNotProcessed;
		return this;
	}

Do you think this is clear enough?

This comment has been minimized.

Copy link
@afedulov

afedulov Feb 17, 2019

Contributor

@Xeli, the proposition is to allow the modification of these parameters via the Factory. That is, having the default Factory, which one can tweak. As mentioned below, this also allows to hide the testing options (hostAndPort) from the main API (builder).

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 18, 2019

Contributor

Thanks for updating the docs!

I think we should consider moving all DefaultPubSubSubscriberFactory specific config parameters out of the PubSubSourceBuilder, as @afedulov said.

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 18, 2019

Author Contributor

Hmm, so we would have 3 ways of initializing the connector, right?

Using all the default values:

PubSubSource.newBuilder(schema, project, subscription)
                         .withCredentials(someCredentials())
                         .build();

Using defaultFactory:

PubSubSubscriberFactory factory = DefaultPubSubSubscriberFactory.newBuilder()                                                                                                                          
    .withHostAndPort("localhost:1234")
    .withMaxOutstandingElementCount(1)
    .maxOutstandingRequestBytes(12)
    .build();
PubSubSource.newBuilder(schema, project, subscription)
                         .withCredentials(someCredentials())
                         .withPubSubSubscriberFactory(factory)
                         .build();

If someone wants complete control:

PubSubSubscriberFactory factory = (credentials, projectSubscription, messageReceiver) -> {
    Subscriber.Builder builder = Subscriber
		etc etc;
   return builder.build();
};
PubSubSource.newBuilder(schema, project, subscription)
                         .withCredentials(someCredentials())
                         .withPubSubSubscriberFactory(factory)
                         .build();

Which basically means I remove the two builder methods from PubSubSource, add a builder to the default factory, and make default factory public.

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 20, 2019

Contributor

Yes, I think that would be a better separation.

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 21, 2019

Author Contributor

Now that I switched to the sync API I created 2 PubSubSubscriberFactories. The options now are:

Using all the default values:

PubSubSource.newBuilder(schema, project, subscription)
                         .withCredentials(someCredentials())
                         .build();

Using emulator:

PubSubSource.newBuilder(schema, project, subscription)
                         .withCredentials(someCredentials())
	                 .withPubSubSubscriberFactory(new PubSubSubscriberFactoryForEmulator("localhost:1234))
                         .build();

If someone wants complete control:

PubSubSubscriberFactory factory = (credentials, projectSubscription, messageReceiver) -> {
    Subscriber.Builder builder = Subscriber
		etc etc;
   return builder.build();
};
PubSubSource.newBuilder(schema, project, subscription)
                         .withCredentials(someCredentials())
                         .withPubSubSubscriberFactory(factory)
                         .build();
}
}

nackOutstandingMessages();

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 15, 2019

Contributor

Why do we notify the system to re-send messages?

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 17, 2019

Author Contributor

The pubsub connector doesn't like to get shutdown with unacknowledged messages.

So before I added this, whenever I would cancel a flink job there would be unacknowledged messages. And while the flink job would shutdown the job and start unloading classes from the classloader the PubSub sdk would still be doing stuff in the background and eventually start throwing ClassNotFound exceptions.

It also seems cleaner to nack messages when shutting down gracefully. In any way, if the nack's don't happen, PubSub will resend the messages eventually so there is no risk of not guaranteeing atleast once.

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 18, 2019

Contributor

But isn't the shutdown being delayed (and thus recovery times go up) if we need to ack a lot of outstanding messages?
Isn't there a better solution to ensure that PubSub has been shut down before Flink starts unloading classes?
(If there are PubSub threads being leaked, that's something we should investigate and understand.)

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 18, 2019

Author Contributor

Yes this would slow down the shutdown, in one of our cases saw ~20k messages that has to be nacked and the slow down was in the order of 100s of milliseconds to a second, which sounds acceptable?

Regarding if this is the best solution: I've asked the sdk owners this question here: googleapis/google-cloud-java#3752 and they actually implemented this behavior not too long ago and don't intend on changing it.

Threads are only leaked if we don't wait for the subscriber to shutdown.subscriberWrapper.awaitTerminated(); makes sure the subscriber is completely done and terminated.

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 20, 2019

Contributor

Okay. Then let's keep this as is.
Can you maybe add a comment before the statement, explaining why the call is needed? I fear that other contributors will eventually delete it, if they don't have the context.

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 21, 2019

Author Contributor

Small note, I am struggling to get this shutting down working, it seems to either leak threads or is still shutting down threads eventhough awaitTerminated() and isTerminated() both say it should be all done.

To reproduce, simple stop or cancel the job and see the output of a taskmanager, it will throw class not found exceptions.

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 25, 2019

Author Contributor

I've created this issue over at the pubsub sdk github: googleapis/google-cloud-java#4558

I do wonder if this isn't a bug in flink as well. According to the docs: https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/debugging_classloading.html#unloading-of-dynamically-loaded-classes classes should not be unloaded if there are still non gc'd objects that use it.

This comment has been minimized.

Copy link
@rmetzger

rmetzger Feb 25, 2019

Contributor

Afaik, the JVM should not unload classes if they are still referenced by something (in this case a running thread).
So even if Flink has lost it's reference to the Classloader, the classes should still be present till the thread has ceased to exist.
Which classes are not found in these shutdown scenarios?

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 25, 2019

Author Contributor

Yeah, that's what I was thinking after reading the Flink docs. The exceptions I am seeing always seems to be constructors or similar of different classes.

I think the following is happening: the background thread creates new objects after the job has finished (and thus class unloading has begun) These new objects might not have been referenced at some point and thus have been unloaded.

In the google issue there's an code sample to wait for the background threads to shutdown, I'll give that a try.

This comment has been minimized.

Copy link
@Xeli

Xeli Feb 26, 2019

Author Contributor

Alright, I've got the shutdown working cleanly now (just pushed).

The last point I would like your advice on is this line: https://github.com/apache/flink/pull/6594/files#diff-5d1317f139d297f4e67ff13defa4d66cR208

I swallow the InterruptionException here because I see the following behavior:

When the job gets stopped (StoppableFunction) stop() gets called and everything shutsdown gracefully. No problem here.

When the job gets cancelled, cancel() gets called and immediatly afterwards the thread that executed run() gets an interrupt. If after 30 seconds the thread still has not shutdown it will interrupt again and again after 3 times it will kill the taskmanager jvm.

Because the first interrupt happens immediatly and we still want the threads to shutdown cleanly I opt to simply swallow the exception here as we know the threads are shutting down at this point. Should we keep it as simple as this or add some logic to only ignore the first interrupt?

@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Jun 28, 2019

Thanks for the updates. Checkstyle is still failing

04:05:19.022 [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ flink-connector-gcp-pubsub_2.11 ---
04:05:19.240 [INFO] There are 4 errors reported by Checkstyle 8.12 with /tools/maven/checkstyle.xml ruleset.
04:05:19.242 [ERROR] src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java:[74] (regexp) RegexpSinglelineJava: Line has leading space characters; indentation should be performed with tabs only.
04:05:19.248 [ERROR] src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java:[75] (regexp) RegexpSinglelineJava: Line has leading space characters; indentation should be performed with tabs only.
04:05:19.248 [ERROR] src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java:[76] (regexp) RegexpSinglelineJava: Line has leading space characters; indentation should be performed with tabs only.
04:05:19.248 [ERROR] src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java:[77] (regexp) RegexpSinglelineJava: Line has leading space characters; indentation should be performed with tabs only.
04:05:19.249 [INFO] ------------------------------------------------------------------------
04:05:19.249 [INFO] Reactor Summary:
@Xeli

This comment has been minimized.

Copy link
Contributor Author

Xeli commented Jun 28, 2019

@rmetzger it took a while for travis to run, but it just succeeded

@becketqin

This comment has been minimized.

Copy link
Contributor

becketqin commented Jul 2, 2019

@Xeli Just to let you know, I am reviewing the patch. Hopefully we can check this in before 1.9 release (code freeze on Jul 5). So we will be able to get more feedback as users onboard.

Copy link
Contributor

becketqin left a comment

@Xeli Thanks for updating the patch. I made a pass on the change. Due to limited time before 1.9 feature freeze, I mainly focused on the correctness and public API issues. There might be some performance concerns as well as missing Java docs, but we can have follow up patches to address them after 1.9 feature freeze.

docs/dev/connectors/pubsub.md Outdated Show resolved Hide resolved
.addAllAckIds(splittedAckIds.f0)
.build();

stub.withDeadlineAfter(60, SECONDS).acknowledge(acknowledgeRequest);

This comment has been minimized.

Copy link
@becketqin

becketqin Jul 3, 2019

Contributor

Should this also use timeout value instead of hard-coded 60 seconds?

This comment has been minimized.

Copy link
@Xeli

Xeli Jul 3, 2019

Author Contributor

Well, the timeout value is set alongside the number of retries. So, for instance, in my project we set it to 5 seconds with 12 retries, resulting in 60 seconds total. If we were to use timeout for acknowledgements it would go from 60s to 5s without retries which might be unexpected.

I think we should add retries to the acknowledgment and then (re)use this timeout value. I've created a new jira issue and added this. Shall we leave it at 60 for now?

@Override
public void close() throws Exception {
channel.shutdownNow();
channel.awaitTermination(20, SECONDS);

This comment has been minimized.

Copy link
@becketqin

becketqin Jul 3, 2019

Contributor

Should this timeout also be timeout?

This comment has been minimized.

Copy link
@Xeli

Xeli Jul 3, 2019

Author Contributor

timeout relates to connection timeouts to pubsub.googleapis.com.

I chose 20s because flink has a timeout of 30 seconds after it calls cancel() on the source function. I kinda fear if we set it to timeout it might be set too low.

@Xeli

This comment has been minimized.

Copy link
Contributor Author

Xeli commented Jul 3, 2019

@becketqin thanks for the review! I've just addressed all your points.

As mentioned in one of the review points I have created https://issues.apache.org/jira/browse/FLINK-13083 for the follow-ups. Please don't hesitate to add any improvements to that jira issue

Copy link
Contributor

becketqin left a comment

@Xeli Thanks for the quick turn-around. I left a few more comments to answer your reply.

.setData(ByteString.copyFrom(serializationSchema.serialize(message)))
.build();

publisher.publish(pubsubMessage).get();

This comment has been minimized.

Copy link
@becketqin

becketqin Jul 4, 2019

Contributor

While the synchronous publish is simple, I am not sure if this is a performance compromise worth making. In checkpoint, a typical logic is following:

  1. Flush & wait for all the handlers to be fired (a simple counter would do the job)
  2. check if there has been any error reported by the handler ( a flag set by handler if some error occurs)
  3. Proceed if there is no error, or fail if there is one.

Even if performance is not a critical concern in this PR, I still think we are losing too much here since such synchronous write causes one round-trip per message, which may become a usability problem.

This comment has been minimized.

Copy link
@Xeli

Xeli Jul 4, 2019

Author Contributor

I agree, I've changed it so that we are now blocking / waiting in the snapshotState() function and if any exception has been thrown we rethrow it.

… a StepBuilder pattern
@Xeli Xeli force-pushed the Xeli:master branch from 08dc32d to af632b3 Jul 4, 2019
…fore each checkpoint completes rather than a check per message
@becketqin

This comment has been minimized.

Copy link
Contributor

becketqin commented Jul 4, 2019

@Xeli Thanks for updating the patch. I made some minor modifications to it and created a PR against your repo. Can you take a look. Otherwise I think we can merge the patch and address other issues in followup PRs.

… PubSubSink flush logic
@Xeli Xeli force-pushed the Xeli:master branch from 0c5658d to d5e74d6 Jul 4, 2019
@Xeli

This comment has been minimized.

Copy link
Contributor Author

Xeli commented Jul 4, 2019

Great, I've merged your patch into this PR (+ added your builder improvement to the docs).

@becketqin

This comment has been minimized.

Copy link
Contributor

becketqin commented Jul 5, 2019

@rmetzger I have reviewed the PR and I think the major issues have been addressed. Do you have time to also take a look? We are trying to get this checked in before 1.9 feature freeze and do some minor clean-ups in a follow-up patch.

@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Jul 5, 2019

Thanks a lot for the review @becketqin
It seems that the change has passed the build in https://travis-ci.org/Xeli/flink/builds/554345672.

I will reduce the number of commits before merging.

rmetzger added a commit to rmetzger/flink that referenced this pull request Jul 5, 2019
@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Jul 5, 2019

I've prepared this branch for merging: master...rmetzger:pubsub
I'll merge it soon (after lunch to be precise :) )

@asfgit asfgit closed this in 0937601 Jul 5, 2019
@becketqin

This comment has been minimized.

Copy link
Contributor

becketqin commented Jul 5, 2019

Really glad to see this gets into Flink!
@Xeli Thanks a lot for your contribution and patience in addressing the comments.
@rmetzger thanks for helping merge the patch.

@rmetzger

This comment has been minimized.

Copy link
Contributor

rmetzger commented Jul 5, 2019

Thank you both for the implementation and review work.
I'm sorry that it took so long.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.