Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correctly set resume token when restarting streams #314

Merged
merged 4 commits into from Apr 26, 2021

Conversation

larkee
Copy link
Contributor

@larkee larkee commented Apr 22, 2021

Fixes #311

@larkee larkee requested a review from as a code owner Apr 22, 2021
@product-auto-label product-auto-label bot added the api: spanner label Apr 22, 2021
@google-cla google-cla bot added the cla: yes label Apr 22, 2021
@larkee larkee changed the title fix: correctly set resume token when restarting streams fix: correctly set resume token when restarting streams Apr 22, 2021
@larkee larkee added kokoro:force-run kokoro:run and removed kokoro:force-run labels Apr 23, 2021
@yoshi-kokoro yoshi-kokoro removed the kokoro:run label Apr 23, 2021
@hadim
Copy link

@hadim hadim commented Apr 24, 2021

I have tried this PR and it works well (it executes the query which is not the case using 3.3.0) but at some point, it hangs forever while the query is supposed to be finished (I know that by manually performing a count on the request). I need to add a restart mechanism to make it work.

Here is my snippet.

def delete_catalog(catalog_key: str):

    if catalog_key not in catalogs_to_delete:
        raise Exception(f"Is that a correct catalog name? {catalog_key}")

    logger.info(f"Start deleting {catalog_key}")

    spanner_client = spanner.Client(project=spanner_project_id)
    instance = spanner_client.instance(spanner_instance_id)
    database = instance.database(spanner_database_id)

    # `my_table` ins interleaved in a parent table (not shown here)
    # `my_column` is the second element of a composite primary key
    # AND a secondary index.

    query = f"""
    DELETE FROM my_table
    WHERE my_column = \"{catalog_key}\"
    """

    # The below function never finishes. So we restart it using
    # Python signals every 30 minutes.
    n_rows = database.execute_partitioned_dml(query)

    logger.info(f"Deleting done for {catalog_key} ({n_rows} rows deleted)")


def main():
    class TimeoutException(Exception):
        pass

    def timeout_handler(signum, frame):
        raise TimeoutException

    signal.signal(signal.SIGALRM, timeout_handler)

    timeout = 60 * 30

    while True:
        signal.alarm(timeout)
        try:
            delete_catalog("my_catalog")
        except TimeoutException:
            logger.info("Restart")

THe problem is that even when all the rows have been deleted, it keeps hanging forever.

@larkee
Copy link
Contributor Author

@larkee larkee commented Apr 26, 2021

@hadim Thank you very much for trying out this PR and giving this feedback!

I'll try to replicate this behavior on my end and figure out what the issue is 👍

@larkee
Copy link
Contributor Author

@larkee larkee commented Apr 26, 2021

@hadim My initial attempts to replicate this issue have been unsuccessful. Since this PR does fix the original bug, I'm going to merge this. However, if you could open a new issue with more details about the tables and workload, I would be happy to look into this further 👍

@larkee larkee merged commit 0fcfc23 into googleapis:master Apr 26, 2021
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: spanner cla: yes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants