Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.2.0
-----
* Add TimeRangeFilter to filter out SSTables outside given time window (CASSANALYTICS-102)
* Generated distribution artifacts fix (CASSANALYTICS-105)
* Fix SSTable descriptor mismatch preventing newly produced SSTables from being uploaded (CASSANALYTICS-98)
* Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -76,13 +77,15 @@ public abstract class CellIterator implements Iterator<Cell>, AutoCloseable
public interface ScannerSupplier
{
/**
* @param partitionId arbitrary id uniquely identifying this partiton of the bulk read
* @param partitionKeyFilters list of partition key filters to push-down,
* @param columnFilter optional column filter to only read certain columns
* @param partitionId arbitrary id uniquely identifying this partiton of the bulk read
* @param partitionKeyFilters list of partition key filters to push-down,
* @param sstableTimeRangeFilter sstable time range filter to filter based on min and max timestamp
* @param columnFilter optional column filter to only read certain columns
* @return a StreamScanner to iterate over each cell of the data.g
*/
StreamScanner<RowData> get(int partitionId,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
@NotNull SSTableTimeRangeFilter sstableTimeRangeFilter,
@Nullable PruneColumnFilter columnFilter);
}

Expand All @@ -91,6 +94,7 @@ public CellIterator(int partitionId,
Stats stats,
TypeConverter typeConverter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
@NotNull SSTableTimeRangeFilter sstableTimeRangeFilter,
Function<CqlTable, PruneColumnFilter> columnFilterSupplier,
ScannerSupplier scannerSupplier)
{
Expand All @@ -116,7 +120,7 @@ public CellIterator(int partitionId,
// Open compaction scanner
startTimeNanos = System.nanoTime();
previousTimeNanos = startTimeNanos;
scanner = scannerSupplier.get(partitionId, partitionKeyFilters, columnFilter);
scanner = scannerSupplier.get(partitionId, partitionKeyFilters, sstableTimeRangeFilter, columnFilter);
long openTimeNanos = System.nanoTime() - startTimeNanos;
LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
stats.openedCompactionScanner(openTimeNanos);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cassandra.spark.sparksql.filters;

import java.io.Serializable;
import java.util.Objects;

import com.google.common.collect.Range;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.jetbrains.annotations.NotNull;

/**
* {@link SSTableTimeRangeFilter} to filter out based on timestamp in microseconds.
* Uses Google Guava's Range internally for storing time range.
*/
public class SSTableTimeRangeFilter implements Serializable
{
public static final SSTableTimeRangeFilter ALL
= new SSTableTimeRangeFilter(Range.closed(0L, Long.MAX_VALUE));

/**
* {@code timeRange} is range of timestamp values represented in microseconds. Supports only closed range.
*/
private final Range<Long> timeRange;
private final int hashcode;

/**
* Creates a {@link SSTableTimeRangeFilter} with given time {@link Range} of timestamp values represented in
* microseconds.
*/
private SSTableTimeRangeFilter(Range<Long> timeRange)
{
this(timeRange, Objects.hash(timeRange));
}

// for serialization
private SSTableTimeRangeFilter(Range<Long> timeRange, int hashcode)
{
this.timeRange = timeRange;
this.hashcode = hashcode;
}

/**
* Returns the underlying Range.
*
* @return the time range of timestamp values in microseconds.
*/
@NotNull
public Range<Long> range()
{
return timeRange;
}

/**
* Determines if SSTable with min and max timestamp overlap with the filter. SSTable is included if it
* overlaps with filter time range.
*
* @param startMicros SSTable min timestamp in microseconds (inclusive)
* @param endMicros SSTable max timestamp in microseconds (inclusive)
* @return true if the SSTable should be included, false if it should be omitted.
*/
public boolean overlaps(long startMicros, long endMicros)
{
// Creates a closed range with startMicros and endMicros
Range<Long> sstableTimeRange = Range.closed(startMicros, endMicros);

// Check if ranges are connected (overlap or adjacent)
return timeRange.isConnected(sstableTimeRange);
}

@Override
public String toString()
{
return String.format("TimeRangeFilter%s", timeRange.toString());
}

@Override
public boolean equals(Object o)
{
if (this == o)
{
return true;
}
if (!(o instanceof SSTableTimeRangeFilter))
{
return false;
}
SSTableTimeRangeFilter that = (SSTableTimeRangeFilter) o;
return timeRange.equals(that.timeRange);
}

@Override
public int hashCode()
{
return hashcode;
}

/**
* Creates a {@link SSTableTimeRangeFilter} for a specific time range.
*
* @param startMicros the start timestamp in microseconds (inclusive)
* @param endMicros the end timestamp in microseconds (inclusive)
* @return {@link SSTableTimeRangeFilter} with both start and end timestamps
*/
@NotNull
public static SSTableTimeRangeFilter create(long startMicros, long endMicros)
{
return new SSTableTimeRangeFilter(Range.closed(startMicros, endMicros));
}

// Kryo

public static class Serializer extends com.esotericsoftware.kryo.Serializer<SSTableTimeRangeFilter>
{
public SSTableTimeRangeFilter read(Kryo kryo, Input in, Class<SSTableTimeRangeFilter> type)
{
return new SSTableTimeRangeFilter(Range.closed(in.readLong(), in.readLong()), in.readInt());
}

public void write(Kryo kryo, Output out, SSTableTimeRangeFilter object)
{
out.writeLong(object.timeRange.lowerEndpoint());
out.writeLong(object.timeRange.upperEndpoint());
out.writeInt(object.hashcode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class CqlUtils
private static final Pattern ESCAPED_WHITESPACE_PATTERN = Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+");
private static final Pattern NEWLINE_PATTERN = Pattern.compile("\n");
private static final Pattern ESCAPED_DOUBLE_BACKSLASH = Pattern.compile("\\\\");
private static final Pattern COMPACTION_STRATEGY_PATTERN = Pattern.compile("compaction\\s*=\\s*\\{\\s*'class'\\s*:\\s*'([^']+)'");

private CqlUtils()
{
Expand Down Expand Up @@ -147,7 +148,7 @@ public static Map<TableIdentifier, String> extractCdcTables(@NotNull final Strin
{
String keyspace = matcher.group(1);
String table = matcher.group(2);
createStmts.put(TableIdentifier.of(keyspace, table), extractCleanedTableSchema(cleaned, keyspace, table));
createStmts.put(TableIdentifier.of(keyspace, table), extractCleanedTableSchema(cleaned, keyspace, table, false));
}
return createStmts;
}
Expand Down Expand Up @@ -179,19 +180,20 @@ public static ReplicationFactor extractReplicationFactor(@NotNull String schemaS

public static String extractTableSchema(@NotNull String schemaStr, @NotNull String keyspace, @NotNull String table)
{
return extractCleanedTableSchema(cleanCql(schemaStr), keyspace, table);
return extractCleanedTableSchema(cleanCql(schemaStr), keyspace, table, false);
}

public static String extractCleanedTableSchema(@NotNull String createStatementToClean,
@NotNull String keyspace,
@NotNull String table)
@NotNull String table,
boolean withTableProps)
{
Pattern pattern = Pattern.compile(String.format("CREATE TABLE (IF NOT EXISTS)? ?\"?%s?\"?\\.{1}\"?%s\"?[^;]*;", keyspace, table));
Matcher matcher = pattern.matcher(createStatementToClean);
if (matcher.find())
{
String fullSchema = createStatementToClean.substring(matcher.start(0), matcher.end(0));
String redactedSchema = removeTableProps(fullSchema);
String redactedSchema = withTableProps ? fullSchema : removeTableProps(fullSchema);
String clustering = extractClustering(fullSchema);
String separator = " WITH ";
if (clustering != null)
Expand Down Expand Up @@ -268,4 +270,30 @@ public static int extractIndexCount(@NotNull String schemaStr, @NotNull String k
}
return indexCount;
}

/**
* Extracts the compaction strategy used from table schema.
*
* @param tableSchema table schema
* @return the compaction strategy, or null if not found
*/
public static String extractCompactionStrategy(@NotNull String tableSchema)
{
Matcher matcher = COMPACTION_STRATEGY_PATTERN.matcher(tableSchema);
if (matcher.find())
{
return matcher.group(1);
}
return null;
}

/**
* Time range filter is only supported for TimeWindowCompactionStrategy.
*
* @return true if the strategy is TimeWindowCompactionStrategy, false otherwise
*/
public static boolean isTimeRangeFilterSupported(String compactionStrategy)
{
return compactionStrategy == null || compactionStrategy.endsWith("TimeWindowCompactionStrategy");
}
}
Loading