Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea
target/
dependency-reduced-pom.xml
69 changes: 40 additions & 29 deletions pulsar-sql/pom.xml → pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar</artifactId>
<version>3.0.8-SNAPSHOT</version>
<version>4.0.0</version>
</parent>

<artifactId>pulsar-sql</artifactId>
Expand All @@ -37,6 +37,7 @@
<!-- use okio version that matches the okhttp3 version -->
<okio.version>1.17.2</okio.version>
<airlift.version>213</airlift.version>
<trino.version>368</trino.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -106,45 +107,55 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<groupId>org.commonjava.maven.plugins</groupId>
<artifactId>directory-maven-plugin</artifactId>
<version>${directory-maven-plugin.version}</version>
<executions>
<execution>
<id>checkstyle</id>
<phase>verify</phase>
<id>directories</id>
<goals>
<goal>check</goal>
<goal>directory-of</goal>
</goals>
<phase>initialize</phase>
<configuration>
<property>pulsar.basedir</property>
<project>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-sql</artifactId>
</project>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>${maven-checkstyle-plugin.version}</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>${puppycrawl.checkstyle.version}</version>
</dependency>
</dependencies>
<configuration>
<configLocation>${pulsar.basedir}/src/checkstyle.xml</configLocation>
<suppressionsLocation>${pulsar.basedir}/src/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<encoding>UTF-8</encoding>
<excludes>**/proto/*</excludes>
</configuration>
</plugin>
</plugins>
</build>

<modules>
<module>presto-pulsar</module>
<module>presto-pulsar-plugin</module>
<module>presto-distribution</module>
</modules>

<profiles>
<profile>
<id>main</id>
<activation>
<property>
<name>disableSqlMainProfile</name>
<!-- always active unless true is passed as a value -->
<value>!true</value>
</property>
</activation>
<modules>
<module>presto-pulsar</module>
<module>presto-pulsar-plugin</module>
<module>presto-distribution</module>
</modules>
</profile>
<profile>
<id>pulsar-sql-tests</id>
<modules>
<module>presto-pulsar</module>
<module>presto-pulsar-plugin</module>
<module>presto-distribution</module>
</modules>
</profile>
<!--
The only working way for OWASP dependency checker plugin
to exclude module when failBuildOnCVSS is used
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-sql</artifactId>
<version>3.0.8-SNAPSHOT</version>
<version>4.0.0</version>
</parent>

<artifactId>pulsar-presto-distribution</artifactId>
Expand Down Expand Up @@ -329,22 +329,6 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>${license-maven-plugin.version}</version>
<configuration>
<licenseSets>
<licenseSet>
<header>../../src/license-header.txt</header>
</licenseSet>
</licenseSets>
<mapping>
<java>SLASHSTAR_STYLE</java>
</mapping>
</configuration>
</plugin>
</plugins>
<extensions>
<extension>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-sql</artifactId>
<version>3.0.8-SNAPSHOT</version>
<version>4.0.0</version>
</parent>

<artifactId>pulsar-presto-connector</artifactId>
Expand Down
9 changes: 8 additions & 1 deletion pulsar-sql/presto-pulsar/pom.xml → presto-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-sql</artifactId>
<version>3.0.8-SNAPSHOT</version>
<version>4.0.0</version>
</parent>

<artifactId>pulsar-presto-connector-original</artifactId>
Expand Down Expand Up @@ -143,6 +143,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>testmocks</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.trino.decoder.FieldValueProviders.bytesValueProvider;
import static io.trino.decoder.FieldValueProviders.longValueProvider;
import static org.apache.bookkeeper.mledger.PositionFactory.LATEST;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -48,6 +49,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -58,7 +60,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -75,7 +76,6 @@
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.sql.presto.util.CacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NoStrictCacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NullCacheSizeAllocator;
Expand Down Expand Up @@ -123,8 +123,7 @@ public class PulsarRecordCursor implements RecordCursor {

PulsarDispatchingRowDecoderFactory decoderFactory;

protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
protected ConcurrentHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentHashMap<>();

private static final Logger log = Logger.get(PulsarRecordCursor.class);

Expand Down Expand Up @@ -366,7 +365,7 @@ public void accept(Entry entry) {
}

private boolean entryExceedSplitEndPosition(Entry entry) {
return ((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0;
return entry.getPosition().compareTo(pulsarSplit.getEndPosition()) >= 0;
}

@VisibleForTesting
Expand All @@ -383,7 +382,7 @@ public void run() {

if (outstandingReadsRequests.get() > 0) {
if (!cursor.hasMoreEntries()
|| (((PositionImpl) cursor.getReadPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0
|| (cursor.getReadPosition().compareTo(pulsarSplit.getEndPosition()) >= 0
&& chunkedMessagesMap.isEmpty())) {
isDone = true;

Expand All @@ -401,7 +400,7 @@ public void run() {

long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
long entriesToSkip =
(numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
(numEntries - (cursor.getReadPosition()).getEntryId()) + 1;
cursor.skipEntries(Math.toIntExact((entriesToSkip)));

entriesProcessed += entriesToSkip;
Expand All @@ -413,7 +412,7 @@ public void run() {
// if the available size is invalid and the entry queue size is 0, read one entry
outstandingReadsRequests.decrementAndGet();
cursor.asyncReadEntries(batchSize, entryQueueCacheSizeAllocator.getAvailableCacheSize(),
this, System.nanoTime(), PositionImpl.LATEST);
this, System.nanoTime(), LATEST);
}

// stats for successful read request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -58,8 +59,8 @@ public class PulsarSplit implements ConnectorSplit {
private final TupleDomain<ColumnHandle> tupleDomain;
private final SchemaInfo schemaInfo;

private final PositionImpl startPosition;
private final PositionImpl endPosition;
private final Position startPosition;
private final Position endPosition;
private final String schemaInfoProperties;

private final OffloadPoliciesImpl offloadPolicies;
Expand Down Expand Up @@ -95,8 +96,8 @@ public PulsarSplit(
this.startPositionLedgerId = startPositionLedgerId;
this.endPositionLedgerId = endPositionLedgerId;
this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
this.startPosition = PositionImpl.get(startPositionLedgerId, startPositionEntryId);
this.endPosition = PositionImpl.get(endPositionLedgerId, endPositionEntryId);
this.startPosition = new ImmutablePositionImpl(startPositionLedgerId, startPositionEntryId);
this.endPosition = new ImmutablePositionImpl(endPositionLedgerId, endPositionEntryId);
this.schemaInfoProperties = schemaInfoProperties;
this.offloadPolicies = offloadPolicies;

Expand Down Expand Up @@ -174,11 +175,11 @@ public TupleDomain<ColumnHandle> getTupleDomain() {
return tupleDomain;
}

public PositionImpl getStartPosition() {
public Position getStartPosition() {
return startPosition;
}

public PositionImpl getEndPosition() {
public Position getEndPosition() {
return endPosition;
}

Expand Down
Loading