Skip to content

Commit

Permalink
feat: added locking mechanism. (#25)
Browse files Browse the repository at this point in the history
Added sync lock mechanism this way it prevents race conditions or mismatch data update due to multiple device syncing at the same time.
  • Loading branch information
kaiserbh committed Jan 6, 2024
1 parent 7cd5838 commit e18dea8
Show file tree
Hide file tree
Showing 6 changed files with 430 additions and 2 deletions.
38 changes: 37 additions & 1 deletion internal/database/postgres_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,25 @@ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT manga_sync_pkey PRIMARY KEY (id)
);
/*Keeps track of lock files when syncing.*/
CREATE TABLE sync_lock
(
id SERIAL UNIQUE ,
user_api_key TEXT UNIQUE,
acquired_by TEXT UNIQUE,
last_sync TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'unknown',
retry_count INT NOT NULL DEFAULT 0,
acquired_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT sync_lock_pkey PRIMARY KEY (id)
);
ALTER TABLE manga_data ADD FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE manga_sync ADD FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE sync_lock ADD FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE ON UPDATE CASCADE;
`

var postgresMigrations = []string{
Expand Down Expand Up @@ -97,5 +114,24 @@ var postgresMigrations = []string{
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`,
`,
`
CREATE TABLE sync_lock
(
id SERIAL UNIQUE ,
user_api_key TEXT UNIQUE,
acquired_by TEXT UNIQUE,
last_sync TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'unknown',
retry_count INT NOT NULL DEFAULT 0,
acquired_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT sync_lock_pkey PRIMARY KEY (id)
);
`,
`
ALTER TABLE sync_lock ADD FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE ON UPDATE CASCADE;
`,
}
31 changes: 31 additions & 0 deletions internal/database/sqlite_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ CREATE TABLE manga_sync
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
);
CREATE TABLE sync_lock
(
id INTEGER PRIMARY KEY,
user_api_key TEXT UNIQUE,
acquired_by TEXT UNIQUE,
last_sync TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'unknown',
retry_count INT NOT NULL DEFAULT 0,
acquired_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
);
`

var sqliteMigrations = []string{
Expand Down Expand Up @@ -90,4 +105,20 @@ var sqliteMigrations = []string{
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`,
`
CREATE TABLE sync_lock
(
id INTEGER PRIMARY KEY,
user_api_key TEXT UNIQUE,
acquired_by TEXT UNIQUE,
last_sync TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'unknown',
retry_count INT NOT NULL DEFAULT 0,
acquired_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_api_key) REFERENCES api_key (key) ON DELETE CASCADE
);
`,
}
148 changes: 147 additions & 1 deletion internal/database/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,157 @@ func (r SyncRepo) GetSyncByApiKey(ctx context.Context, apiKey string) (*domain.S
pq.Array(&mangaSync.UserApiKey.Scopes),
&mangaSync.UserApiKey.CreatedAt,
); err != nil {
if err == sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
return &domain.Sync{}, errors.Wrap(err, "error executing query")
}
return nil, errors.Wrap(err, "error executing query")
}

return &mangaSync, nil
}

func (r SyncRepo) GetSyncLockFile(ctx context.Context, apiKey string) (*domain.SyncLockFile, error) {
queryBuilder := r.db.squirrel.
Select(
"id",
"user_api_key",
"acquired_by",
"last_sync",
"status",
"retry_count",
"acquired_at",
"expires_at",
).
From("sync_lock").
Where(sq.Eq{"user_api_key": apiKey})

query, args, err := queryBuilder.ToSql()
if err != nil {
return nil, errors.Wrap(err, "error building query")
}

var syncLockFile domain.SyncLockFile

if err := r.db.handler.QueryRowContext(ctx, query, args...).Scan(
&syncLockFile.ID,
&syncLockFile.UserApiKey,
&syncLockFile.AcquiredBy,
&syncLockFile.LastSynced,
&syncLockFile.Status,
&syncLockFile.RetryCount,
&syncLockFile.AcquiredAt,
&syncLockFile.ExpiresAt,
); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return &domain.SyncLockFile{}, errors.Wrap(err, "error executing query")
}
return nil, errors.Wrap(err, "error executing query")
}

return &syncLockFile, nil
}

func (r SyncRepo) CreateSyncLockFile(ctx context.Context, apiKey string, acquiredBy string) (*domain.SyncLockFile, error) {
queryBuilder := r.db.squirrel.
Insert("sync_lock").
Columns(
"user_api_key",
"acquired_by",
"last_sync",
"status",
"retry_count",
"acquired_at",
"expires_at",
).
Values(
apiKey,
acquiredBy,
time.Now(),
domain.SyncStatusPending,
0,
time.Now(),
time.Now().Add(time.Minute*5),
).
Suffix("RETURNING id, created_at, updated_at").RunWith(r.db.handler)

var id int
var createdAt time.Time
var updatedAt time.Time

if err := queryBuilder.QueryRowContext(ctx).Scan(&id, &createdAt, &updatedAt); err != nil {
return nil, errors.Wrap(err, "error executing query")
}

syncLockFile := &domain.SyncLockFile{
ID: id,
UserApiKey: apiKey,
AcquiredBy: acquiredBy,
LastSynced: &createdAt,
Status: domain.SyncStatusPending,
RetryCount: 0,
AcquiredAt: &createdAt,
ExpiresAt: &updatedAt,
}

return syncLockFile, nil
}

func (r SyncRepo) UpdateSyncLockFile(ctx context.Context, syncLockFile *domain.SyncLockFile) (*domain.SyncLockFile, error) {
// Start building the query.
queryBuilder := r.db.squirrel.
Update("sync_lock").
Where(sq.Eq{"user_api_key": syncLockFile.UserApiKey})

// Dynamically add fields that are present.
if syncLockFile.AcquiredBy != "" {
queryBuilder = queryBuilder.Set("acquired_by", syncLockFile.AcquiredBy)
}
if syncLockFile.LastSynced != nil {
queryBuilder = queryBuilder.Set("last_sync", syncLockFile.LastSynced)
}
if syncLockFile.Status != "" {
queryBuilder = queryBuilder.Set("status", syncLockFile.Status)
}
if syncLockFile.RetryCount != 0 {
queryBuilder = queryBuilder.Set("retry_count", syncLockFile.RetryCount)
}
if syncLockFile.AcquiredAt != nil {
queryBuilder = queryBuilder.Set("acquired_at", syncLockFile.AcquiredAt)
}
if syncLockFile.ExpiresAt != nil {
queryBuilder = queryBuilder.Set("expires_at", syncLockFile.ExpiresAt)
}

queryBuilder = queryBuilder.Suffix("RETURNING updated_at").RunWith(r.db.handler)

var updatedAt time.Time
if err := queryBuilder.QueryRowContext(ctx).Scan(&updatedAt); err != nil {
return nil, errors.Wrap(err, "error executing query")
}

syncLockFile.UpdatedAt = &updatedAt

return syncLockFile, nil
}

func (r SyncRepo) DeleteSyncLockFile(ctx context.Context, apiKey string) bool {
queryBuilder := r.db.squirrel.
Delete("sync_lock").
Where(sq.Eq{"user_api_key": apiKey})

query, args, err := queryBuilder.ToSql()
if err != nil {
r.db.log.Error().Err(err).Msgf("error building query")
return false
}

_, err = r.db.handler.ExecContext(ctx, query, args...)
if err != nil {
r.db.log.Error().Err(err).Msgf("error executing query")
return false
}

r.db.log.Debug().Msgf("Sync lock file deleted: %v", apiKey)

return true
}
16 changes: 16 additions & 0 deletions internal/domain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type SyncRepo interface {
Update(ctx context.Context, sync *Sync) (*Sync, error)
ListSyncs(ctx context.Context, apiKey string) ([]Sync, error)
GetSyncByApiKey(ctx context.Context, apiKey string) (*Sync, error)
CreateSyncLockFile(ctx context.Context, apiKey string, acquiredBy string) (*SyncLockFile, error)
GetSyncLockFile(ctx context.Context, apiKey string) (*SyncLockFile, error)
UpdateSyncLockFile(ctx context.Context, syncLockFile *SyncLockFile) (*SyncLockFile, error)
DeleteSyncLockFile(ctx context.Context, apiKey string) bool
}

type Sync struct {
Expand All @@ -36,3 +40,15 @@ type SyncData struct {
Sync *Sync `json:"sync,omitempty"`
Data *MangaData `json:"backup,omitempty"`
}

type SyncLockFile struct {
ID int `json:"id,omitempty"`
UserApiKey string `json:"user_api_key,omitempty"`
AcquiredBy string `json:"acquired_by,omitempty"`
LastSynced *time.Time `json:"last_synced,omitempty"`
Status SyncStatus `json:"status,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
AcquiredAt *time.Time `json:"acquired_at,omitempty"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
UpdatedAt *time.Time `json:"updated_at,omitempty"`
}

0 comments on commit e18dea8

Please sign in to comment.