Skip to content

Commit

Permalink
#120 Microsoft SQL create integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Buhtoyarov committed Nov 4, 2023
1 parent 696d73e commit b1bcfd9
Show file tree
Hide file tree
Showing 8 changed files with 549 additions and 1 deletion.
Expand Up @@ -18,11 +18,16 @@
import io.github.bucket4j.util.ConsumptionScenario;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -472,4 +477,105 @@ public void testWithMapperAsync() throws Exception {
assertTrue(mappedBucket2.tryConsume(5).get());
assertFalse(unmappedBucket.tryConsume(1).get());
}

@Test
public void test_1000_tokens_consumption() throws InterruptedException {
int threadCount = 8;
int opsCount = 1_000;
int capacity = 2_000;

BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(limit -> limit.capacity(capacity).refillIntervally(1, Duration.ofDays(1)))
.build();

CountDownLatch startLatch = new CountDownLatch(threadCount);
CountDownLatch stopLatch = new CountDownLatch(threadCount);
AtomicInteger opsCounter = new AtomicInteger(opsCount);
ConcurrentHashMap<Integer, Integer> updatesByThread = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, Throwable> errors = new ConcurrentHashMap<>();
for (int i = 0; i < threadCount; i++) {
Bucket bucket = proxyManager.builder().build(key, () -> configuration);
final int threadId = i;
new Thread(() -> {
try {
startLatch.countDown();
startLatch.await();
while (opsCounter.decrementAndGet() >= 0) {
Integer currenValue = null;
if (bucket.tryConsume(1)) {
updatesByThread.compute(threadId, (key, current) -> current == null ? 1 : current + 1);
} else {
throw new IllegalStateException("Token should be consumed");
}
}
} catch (Throwable e) {
errors.put(threadId, e);
e.printStackTrace();
} finally {
stopLatch.countDown();
}
}, "Updater-thread-" + i).start();
}
stopLatch.await();

long availableTokens = proxyManager.builder().build(key, () -> configuration).getAvailableTokens();
System.out.println("availableTokens " + availableTokens);
System.out.println("Failed threads " + errors.keySet());
System.out.println("Updates by thread " + updatesByThread);
assertTrue(errors.isEmpty());
assertEquals(capacity - opsCount, availableTokens);
}

@Test
public void test_1000_tokens_consumption_async() throws InterruptedException {
if (!proxyManager.isAsyncModeSupported()) {
return;
}

int threadCount = 8;
int opsCount = 1_000;
int capacity = 2_000;

BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(limit -> limit.capacity(capacity).refillIntervally(1, Duration.ofDays(1)))
.build();

CountDownLatch startLatch = new CountDownLatch(threadCount);
CountDownLatch stopLatch = new CountDownLatch(threadCount);
AtomicInteger opsCounter = new AtomicInteger(opsCount);
ConcurrentHashMap<Integer, Integer> updatesByThread = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, Throwable> errors = new ConcurrentHashMap<>();
for (int i = 0; i < threadCount; i++) {
AsyncBucketProxy bucket = proxyManager.asAsync().builder().build(key, () -> CompletableFuture.completedFuture(configuration));
final int threadId = i;
new Thread(() -> {
try {
startLatch.countDown();
startLatch.await();
while (opsCounter.decrementAndGet() >= 0) {
Integer currenValue = null;
if (bucket.tryConsume(1).get()) {
updatesByThread.compute(threadId, (key, current) -> current == null ? 1 : current + 1);
} else {
throw new IllegalStateException("Token should be consumed");
}
}
} catch (Throwable e) {
errors.put(threadId, e);
e.printStackTrace();
} finally {
stopLatch.countDown();
}
}, "Updater-thread-" + i).start();
}
stopLatch.await();

long availableTokens = proxyManager.builder().build(key, () -> configuration).getAvailableTokens();
System.out.println("availableTokens " + availableTokens);
System.out.println("Failed threads " + errors.keySet());
System.out.println("Updates by thread " + updatesByThread);
assertTrue(errors.isEmpty());
assertEquals(capacity - opsCount, availableTokens);
}

}
62 changes: 62 additions & 0 deletions bucket4j-mssql/pom.xml
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-parent</artifactId>
<version>8.6.0</version>
<relativePath>../bucket4j-parent</relativePath>
</parent>

<artifactId>bucket4j-mssql</artifactId>
<name>bucket4j-mssql</name>

<properties>
<modular-name>mssql</modular-name>
</properties>

<repositories>
<repository>
<id>central</id>
<name>Maven Central</name>
<url>https://repo1.maven.org/maven2/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mssqlserver</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java6</artifactId>
<version>2.3.8</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,179 @@
/*-
* ========================LICENSE_START=================================
* Bucket4j
* %%
* Copyright (C) 2015 - 2022 Vladimir Bukhtoyarov
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/
package io.github.bucket4j.mssql;

import io.github.bucket4j.BucketExceptions;
import io.github.bucket4j.distributed.jdbc.SQLProxyConfiguration;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.AbstractSelectForUpdateBasedProxyManager;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.LockAndGetResult;
import io.github.bucket4j.distributed.proxy.generic.select_for_update.SelectForUpdateBasedTransaction;
import io.github.bucket4j.distributed.remote.RemoteBucketState;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.text.MessageFormat;
import java.util.Objects;

/**
* @author Vladimir Bukhtoyarov
*
* @param <K> type of primary key
*/
public class MSSQLSelectForUpdateBasedProxyManager<K> extends AbstractSelectForUpdateBasedProxyManager<K> {

private final DataSource dataSource;
private final SQLProxyConfiguration<K> configuration;
private final String removeSqlQuery;
private final String updateSqlQuery;
private final String insertSqlQuery;
private final String selectSqlQuery;

/**
*
* @param configuration {@link SQLProxyConfiguration} configuration.
*/
public MSSQLSelectForUpdateBasedProxyManager(SQLProxyConfiguration<K> configuration) {
super(configuration.getClientSideConfig());
this.dataSource = Objects.requireNonNull(configuration.getDataSource());
this.configuration = configuration;
this.removeSqlQuery = MessageFormat.format("DELETE FROM {0} WHERE {1} = ?", configuration.getTableName(), configuration.getIdName());
this.updateSqlQuery = MessageFormat.format("UPDATE {0} SET {1}=? WHERE {2}=?", configuration.getTableName(), configuration.getStateName(), configuration.getIdName());
this.insertSqlQuery = MessageFormat.format(
"INSERT INTO {0}({1},{2}) VALUES(?, null)",
configuration.getTableName(), configuration.getIdName(), configuration.getStateName());
this.selectSqlQuery = MessageFormat.format("SELECT {0} FROM {1} WITH(ROWLOCK, UPDLOCK) WHERE {2} = ?", configuration.getStateName(), configuration.getTableName(), configuration.getIdName());
}

@Override
protected SelectForUpdateBasedTransaction allocateTransaction(K key) {
Connection connection;
try {
connection = dataSource.getConnection();
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}

return new SelectForUpdateBasedTransaction() {
@Override
public void begin() {
try {
connection.setAutoCommit(false);
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

@Override
public void rollback() {
try {
connection.rollback();
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

@Override
public void commit() {
try {
connection.commit();
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

@Override
public LockAndGetResult tryLockAndGet() {
try (PreparedStatement selectStatement = connection.prepareStatement(selectSqlQuery)) {
configuration.getPrimaryKeyMapper().set(selectStatement, 1, key);
try (ResultSet rs = selectStatement.executeQuery()) {
if (rs.next()) {
byte[] data = rs.getBytes(configuration.getStateName());
return LockAndGetResult.locked(data);
} else {
return LockAndGetResult.notLocked();
}
}
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

@Override
public boolean tryInsertEmptyData() {
try (PreparedStatement insertStatement = connection.prepareStatement(insertSqlQuery)) {
configuration.getPrimaryKeyMapper().set(insertStatement, 1, key);
return insertStatement.executeUpdate() > 0;
} catch (SQLException e) {
if (e.getErrorCode() == 1205) {
// https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/mssqlserver-1205-database-engine-error?view=sql-server-ver16
// another transaction won the lock, initial bucket dada will be inserted by other actor
return false;
} else if (e.getErrorCode() == 2627) {
// duplicate key, another parallel transaction has inserted the data
return false;
} else {
throw new BucketExceptions.BucketExecutionException(e);
}
}
}

@Override
public void update(byte[] data, RemoteBucketState newState) {
try {
try (PreparedStatement updateStatement = connection.prepareStatement(updateSqlQuery)) {
updateStatement.setBytes(1, data);
configuration.getPrimaryKeyMapper().set(updateStatement, 2, key);
updateStatement.executeUpdate();
}
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

@Override
public void release() {
try {
connection.close();
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

};

}

@Override
public void removeProxy(K key) {
try (Connection connection = dataSource.getConnection()) {
try(PreparedStatement removeStatement = connection.prepareStatement(removeSqlQuery)) {
configuration.getPrimaryKeyMapper().set(removeStatement, 1, key);
removeStatement.executeUpdate();
}
} catch (SQLException e) {
throw new BucketExceptions.BucketExecutionException(e);
}
}

}

0 comments on commit b1bcfd9

Please sign in to comment.