Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cdc iterator #22

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open

Add cdc iterator #22

wants to merge 48 commits into from

Conversation

alarbada
Copy link
Collaborator

Description

Adds cdc iterator, with tests. Stacked from #21

@alarbada alarbada mentioned this pull request Jul 12, 2024
@alarbada alarbada marked this pull request as ready for review July 15, 2024 23:29
@alarbada alarbada requested a review from a team as a code owner July 15, 2024 23:29
@alarbada alarbada changed the base branch from main to setup-tests July 16, 2024 15:02
Base automatically changed from setup-tests to main July 18, 2024 19:13
Copy link
Contributor

@lovromazgon lovromazgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's figure out the proper places to listen to the context. Right now we do it in 3 places, which can all happen concurrently, and based on which goroutine detects it first we might see different outcomes. We should try to make this behavior deterministic.

return nil, fmt.Errorf("failed to get start position: %w", err)
}

iterator.canalTomb.Go(func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use tomb to manage a single goroutine, it feels like this could be simplified.

Comment on lines +120 to +121
c.canal.Close()
return fmt.Errorf("context cancelled from runCanal: %w", ctx.Err())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be nice if we waited for canal to stop and give us back an error or nil? This seems racy.

case data, ok := <-c.data:
if !ok { // closed
if err := c.canalTomb.Err(); err != nil {
return sdk.Record{}, fmt.Errorf("canal exited unexpectedly: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really always unexpected? Can't there be a graceful stop of the canal, e.g. when the context gets cancelled? This also feels racy, based on which goroutine first detects the closing context we might get different errors returned from this function.

for i, col := range columns {
val := rows[i]
if s, ok := val.(string); ok {
// I don't know why exactly, but "github.com/go-mysql-org/go-mysql/canal"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, strange. Ideally we would check the table columns and only execute this on columns that we know have a timestamp. Otherwise, we might be parsing dates when that's not intended.

return payload
}

func tryParseCanalStrDate(s string) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, so we parse the time and then format it? What's the purpose of this? 🧐

}

func (c *cdcIterator) buildRecord(e *canal.RowsEvent) (sdk.Record, error) {
pos, err := c.canal.GetMasterPos()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The position handling here feels sketchy. Are we sure this is correct? How do we know that the name in this position corresponds to the one that e.Header.LogPos points to? 🤔

cdc_iterator.go Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants