Skip to content

Commit

Permalink
MySQL too
Browse files Browse the repository at this point in the history
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Sep 30, 2023
1 parent 0272c3b commit 9c6c9e1
Showing 1 changed file with 34 additions and 88 deletions.
122 changes: 34 additions & 88 deletions state/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,35 +389,6 @@ func (m *MySQL) ensureStateTable(ctx context.Context, schemaName, stateTableName
}
}

// Create the DaprSaveFirstWriteV1 stored procedure
_, err = m.db.ExecContext(ctx, `CREATE PROCEDURE IF NOT EXISTS DaprSaveFirstWriteV1(tableName VARCHAR(255), id VARCHAR(255), value JSON, etag VARCHAR(36), isbinary BOOLEAN, expiredateToken TEXT)
LANGUAGE SQL
MODIFIES SQL DATA
BEGIN
SET @id = id;
SET @value = value;
SET @etag = etag;
SET @isbinary = isbinary;
SET @selectQuery = concat('SELECT COUNT(id) INTO @count FROM ', tableName ,' WHERE id = ? AND (expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)');
PREPARE select_stmt FROM @selectQuery;
EXECUTE select_stmt USING @id;
DEALLOCATE PREPARE select_stmt;
IF @count < 1 THEN
SET @upsertQuery = concat('INSERT INTO ', tableName, ' SET id=?, value=?, eTag=?, isbinary=?, expiredate=', expiredateToken, ' ON DUPLICATE KEY UPDATE value=?, eTag=?, isbinary=?, expiredate=', expiredateToken);
PREPARE upsert_stmt FROM @upsertQuery;
EXECUTE upsert_stmt USING @id, @value, @etag, @isbinary, @value, @etag, @isbinary;
DEALLOCATE PREPARE upsert_stmt;
ELSE
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Row already exists';
END IF;
END`)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -596,7 +567,6 @@ func (m *MySQL) setValue(parentCtx context.Context, querier querier, req *state.
ttlQuery string
params []any
result sql.Result
maxRows int64 = 1
)

var v any
Expand Down Expand Up @@ -624,10 +594,7 @@ func (m *MySQL) setValue(parentCtx context.Context, querier querier, req *state.
ttlQuery = "NULL"
}

mustCommit := false
hasEtag := req.ETag != nil && *req.ETag != ""

if hasEtag {
if req.HasETag() {
// When an eTag is provided do an update - not insert
query = `UPDATE ` + m.tableName + `
SET value = ?, eTag = ?, isbinary = ?, expiredate = ` + ttlQuery + `
Expand All @@ -636,73 +603,52 @@ func (m *MySQL) setValue(parentCtx context.Context, querier querier, req *state.
AND (expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)`
params = []any{enc, eTag, isBinary, req.Key, *req.ETag}
} else if req.Options.Concurrency == state.FirstWrite {
// If we're not in a transaction already, start one as we need to ensure consistency
if querier == m.db {
querier, err = m.db.BeginTx(parentCtx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer querier.(*sql.Tx).Rollback()
mustCommit = true
}

// With first-write-wins and no etag, we can insert the row only if it doesn't exist
// Things get a bit tricky when the row exists but it is expired, so it just hasn't been garbage-collected yet
// What we can do in that case is to first check if the row doesn't exist or has expired, and then perform an upsert
// To do that, we use a stored procedure
query = "CALL DaprSaveFirstWriteV1(?, ?, ?, ?, ?, ?)"
params = []any{m.tableName, req.Key, enc, eTag, isBinary, ttlQuery}
// If the operation uses first-write concurrency, we need to handle the special case of a row that has expired but hasn't been garbage collected yet
// In this case, the row should be considered as if it were deleted
query = `REPLACE INTO ` + m.tableName + `
WITH a AS (
SELECT
? AS id,
? AS value,
? AS isbinary,
CURRENT_TIMESTAMP AS insertDate,
CURRENT_TIMESTAMP AS updateDate,
? AS eTag,
` + ttlQuery + ` AS expiredate
FROM ` + m.tableName + `
WHERE NOT EXISTS (
SELECT 1
FROM ` + m.tableName + `
WHERE id = ?
AND (expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)
)
)
SELECT * FROM a`
params = []any{req.Key, enc, isBinary, eTag, req.Key}
} else {
// If this is a duplicate MySQL returns that two rows affected
maxRows = 2
query = `INSERT INTO ` + m.tableName + ` (id, value, eTag, isbinary, expiredate)
VALUES (?, ?, ?, ?, ` + ttlQuery + `)
ON DUPLICATE KEY UPDATE
value=?, eTag=?, isbinary=?, expiredate=` + ttlQuery
params = []any{req.Key, enc, eTag, isBinary, enc, eTag, isBinary}
query = `REPLACE INTO ` + m.tableName + ` (id, value, eTag, isbinary, expiredate)
VALUES (?, ?, ?, ?, ` + ttlQuery + `)`
params = []any{req.Key, enc, eTag, isBinary}
}

ctx, cancel := context.WithTimeout(parentCtx, m.timeout)
defer cancel()
result, err = querier.ExecContext(ctx, query, params...)

if err != nil {
if hasEtag {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

// Do not count affected rows when using first-write
// Conflicts are handled separately
if hasEtag || req.Options.Concurrency != state.FirstWrite {
var rows int64
rows, err = result.RowsAffected()
if err != nil {
return err
}

if rows == 0 {
err = errors.New("rows affected error: no rows match given key and eTag")
err = state.NewETagError(state.ETagMismatch, err)
m.logger.Error(err)
return err
}

if rows > maxRows {
err = fmt.Errorf("rows affected error: more than %d row affected; actual %d", maxRows, rows)
m.logger.Error(err)
return err
}
rows, err := result.RowsAffected()
if err != nil {
return err
}

// Commit the transaction if needed
if mustCommit {
err = querier.(*sql.Tx).Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
if rows == 0 && (req.HasETag() || req.Options.Concurrency == state.FirstWrite) {
err = errors.New("rows affected error: no rows match given key and eTag")
err = state.NewETagError(state.ETagMismatch, err)
m.logger.Error(err)
return err
}

return nil
Expand Down

0 comments on commit 9c6c9e1

Please sign in to comment.