Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL Publisher/Subscriber #56

Merged
merged 56 commits into from
Jun 1, 2019
Merged

SQL Publisher/Subscriber #56

merged 56 commits into from
Jun 1, 2019

Conversation

maclav3
Copy link
Contributor

@maclav3 maclav3 commented Mar 4, 2019

Subscriber works by querying the messages table with SELECT in configurable intervals

)
require.NoError(t, err)

ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak (from govet)

)
require.NoError(t, err)

ctx, _ := context.WithTimeout(context.Background(), 15*time.Second)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cancel function returned by context.WithTimeout should be called, not discarded, to avoid a context leak (from govet)

dev/events.sql Outdated
@@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS messages (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 74d84d8

Logger watermill.LoggerAdapter
}

func (s *DefaultSchema) EnsureTableForTopicQueries(topic string) []string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this name, maybe something more related to schema, intiialiation etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 74d84d8
SchemaInitializingQueries

}

func (s *DefaultSchema) InsertQuery(topic string) string {
if s.Logger == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private getter will be nice to handle it universally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer applies, schema doesn't log anymore, only publisher/subscriber

"`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,",
"`payload` json DEFAULT NULL,",
"`metadata` json DEFAULT NULL,",
"`topic` varchar(255) NOT NULL",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed now (topic)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed 74d84d8


func (s *DefaultSchema) InsertArgs(topic string, msg *message.Message) (args []interface{}, err error) {
logger := s.Logger
defer func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should not return error and log it, and logging should be also done in "generic part"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to publisher/subscriber 74d84d8

`(uuid, payload, metadata, topic) VALUES (?,?,?,?)`,
}, " ")

s.Logger.Info("Preparing query to insert messages", watermill.LogFields{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't all logging be done in general implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to publisher/subscriber 74d84d8

}()

var uuid ulid.ULID
uuid, err = ulid.Parse(msg.UUID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think to make it configurable? because it can be different to everyone (and not much people are using ulid)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooor we may store uuid as string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b29759f no longer requiring ulid, just string

}, nil
}

// Publish inserts the messages as rows into the MessagesTable.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe some info that transaction should be handled by the user of Publish (and examples in docs?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if c.ResendInterval == 0 {
c.ResendInterval = time.Second
}
if c.MessagesTable == "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it duplicated with SchemaAdapter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed from both subscriber and publisher


// sanitizeTopicName checks if the topic name contains any characters which could be unsuitable for the SQL Pub/Sub.
// Topics are translated into SQL tables and patched into some queries, so this is done to prevent injection as well.
func sanitizeTopicName(topic string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it validation, sanitization for me is more about changing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true dat, changed


err := s.query(ctx, topic, out, logger)
if err != nil {
logger.Error("Error querying for message", err, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe some configureable sleep, to not kill db? :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RetryInterval de25f18

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually used in 14532ac

// go on querying
}

err := s.query(ctx, topic, out, logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how it's working with reconnecting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about implementing the reconnect test later on

// it is finalized after the ACK is written
var tx *sql.Tx
tx, err = s.db.BeginTx(ctx, &sql.TxOptions{
//Isolation: sql.LevelSerializable,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@maclav3 maclav3 merged commit ba87cfc into master Jun 1, 2019
@maclav3 maclav3 deleted the mysql branch June 1, 2019 17:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants