Skip to content

Commit

Permalink
Upgrade checkerframework to 3.42.0
Browse files Browse the repository at this point in the history
This includes improvements in checker's safety. Files that needed adjustment
were adjusted, rather than suppressing the warning.
  • Loading branch information
kennknowles committed Jan 11, 2024
1 parent a3e0653 commit 6e74ddb
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ class BeamModulePlugin implements Plugin<Project> {
def aws_java_sdk2_version = "2.20.47"
def cassandra_driver_version = "3.10.2"
def cdap_version = "6.5.1"
def checkerframework_version = "3.27.0"
def checkerframework_version = "3.42.0"
def classgraph_version = "4.8.162"
def dbcp2_version = "2.9.0"
def errorprone_version = "2.10.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.it.common.PipelineLauncher.JobState.PENDING_STATES;
import static org.apache.beam.it.common.logging.LogStrings.formatForLogging;
import static org.apache.beam.it.common.utils.RetryUtil.clientRetryPolicy;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.api.client.util.ArrayMap;
import com.google.api.services.dataflow.Dataflow;
Expand Down Expand Up @@ -215,10 +216,18 @@ public Map<String, Double> getMetrics(String project, String region, String jobI
// currently, reporting distribution metrics as 4 separate scalar metrics
@SuppressWarnings("rawtypes")
ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution();
result.put(metricName + "_COUNT", ((Number) distributionMap.get("count")).doubleValue());
result.put(metricName + "_MIN", ((Number) distributionMap.get("min")).doubleValue());
result.put(metricName + "_MAX", ((Number) distributionMap.get("max")).doubleValue());
result.put(metricName + "_SUM", ((Number) distributionMap.get("sum")).doubleValue());
result.put(
metricName + "_COUNT",
checkStateNotNull(((Number) distributionMap.get("count"))).doubleValue());
result.put(
metricName + "_MIN",
checkStateNotNull(((Number) distributionMap.get("min"))).doubleValue());
result.put(
metricName + "_MAX",
checkStateNotNull(((Number) distributionMap.get("max"))).doubleValue());
result.put(
metricName + "_SUM",
checkStateNotNull(((Number) distributionMap.get("sum"))).doubleValue());
} else if (metricUpdate.getGauge() != null) {
LOG.warn("Gauge metric {} cannot be handled.", metricName);
// not sure how to handle gauge metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ synchronized void start(ExecutorService executor) {
}

public synchronized void stop() {
final Future<Void> executionSamplerFuture = this.executionSamplerFuture;
if (executionSamplerFuture == null) {
return;
}
Expand All @@ -142,7 +143,7 @@ public synchronized void stop() {
} catch (ExecutionException e) {
throw new RuntimeException("Exception in state sampler", e);
} finally {
executionSamplerFuture = null;
this.executionSamplerFuture = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.metrics;

import java.util.Objects;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
Expand All @@ -30,7 +31,7 @@ public static String toString(Metric metric) {
if (metric instanceof Counter) {
return Long.toString(((Counter) metric).getCount());
} else if (metric instanceof Gauge) {
return ((Gauge) metric).getValue().toString();
return Objects.toString(((Gauge<?>) metric).getValue());
} else if (metric instanceof Meter) {
return Double.toString(((Meter) metric).getRate());
} else if (metric instanceof Histogram) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
return DistributionResult.IDENTITY_ELEMENT;
}
ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution();
long count = ((Number) distributionMap.get("count")).longValue();
long min = ((Number) distributionMap.get("min")).longValue();
long max = ((Number) distributionMap.get("max")).longValue();
long sum = ((Number) distributionMap.get("sum")).longValue();
long count = checkArgumentNotNull(((Number) distributionMap.get("count"))).longValue();
long min = checkArgumentNotNull(((Number) distributionMap.get("min"))).longValue();
long max = checkArgumentNotNull(((Number) distributionMap.get("max"))).longValue();
long sum = checkArgumentNotNull(((Number) distributionMap.get("sum"))).longValue();
return DistributionResult.create(sum, count, min, max);
}

Expand Down
1 change: 1 addition & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk',
classesTriggerCheckerBugs: [
'DoFnTester': 'https://github.com/typetools/checker-framework/issues/3776',
'FileIO': 'https://github.com/typetools/checker-framework/issues/6388',
'MergingActiveWindowSetTest': 'https://github.com/typetools/checker-framework/issues/3776',
'WindowFnTestUtils': 'https://github.com/typetools/checker-framework/issues/3776',
],
Expand Down
131 changes: 73 additions & 58 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListener;
import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.common.value.qual.ArrayLen;
import org.checkerframework.dataflow.qual.Pure;
Expand Down Expand Up @@ -351,34 +352,7 @@ public boolean tryClaim(TimestampedValue<T>[] position) {
return false;
}
try {
if (currentReader == null) {
currentReader = initialRestriction.createReader(pipelineOptions);
if (!currentReader.start()) {
claimedAll = true;
try {
currentReader.close();
} finally {
currentReader = null;
}
return false;
}
position[0] =
TimestampedValue.of(
currentReader.getCurrent(), currentReader.getCurrentTimestamp());
return true;
}
if (!currentReader.advance()) {
claimedAll = true;
try {
currentReader.close();
} finally {
currentReader = null;
}
return false;
}
position[0] =
TimestampedValue.of(currentReader.getCurrent(), currentReader.getCurrentTimestamp());
return true;
return tryClaimOrThrow(position);
} catch (IOException e) {
if (currentReader != null) {
try {
Expand All @@ -393,6 +367,37 @@ public boolean tryClaim(TimestampedValue<T>[] position) {
}
}

private boolean tryClaimOrThrow(TimestampedValue<T>[] position) throws IOException {
BoundedSource.BoundedReader<T> currentReader = this.currentReader;
if (currentReader == null) {
BoundedSource.BoundedReader<T> newReader =
initialRestriction.createReader(pipelineOptions);
if (!newReader.start()) {
claimedAll = true;
newReader.close();
return false;
}
position[0] =
TimestampedValue.of(newReader.getCurrent(), newReader.getCurrentTimestamp());
this.currentReader = newReader;
return true;
}

if (!currentReader.advance()) {
claimedAll = true;
try {
currentReader.close();
} finally {
this.currentReader = null;
}
return false;
}

position[0] =
TimestampedValue.of(currentReader.getCurrent(), currentReader.getCurrentTimestamp());
return true;
}

@Override
protected void finalize() throws Throwable {
if (currentReader != null) {
Expand All @@ -415,6 +420,7 @@ public BoundedSourceT currentRestriction() {

@Override
public @Nullable SplitResult<BoundedSourceT> trySplit(double fractionOfRemainder) {
BoundedSource.BoundedReader<T> currentReader = this.currentReader;
if (currentReader == null) {
return null;
}
Expand Down Expand Up @@ -859,13 +865,15 @@ private Object createCacheKey(
source, checkpoint, BoundedWindow.TIMESTAMP_MIN_VALUE));
}

@EnsuresNonNull("currentReader")
private void initializeCurrentReader() throws IOException {
checkState(currentReader == null);
Object cacheKey =
createCacheKey(initialRestriction.getSource(), initialRestriction.getCheckpoint());
currentReader = cachedReaders.getIfPresent(cacheKey);
if (currentReader == null) {
currentReader =
UnboundedReader<OutputT> cachedReader = cachedReaders.getIfPresent(cacheKey);

if (cachedReader == null) {
this.currentReader =
initialRestriction
.getSource()
.createReader(pipelineOptions, initialRestriction.getCheckpoint());
Expand All @@ -874,6 +882,7 @@ private void initializeCurrentReader() throws IOException {
// We also remove this cache entry to avoid eviction.
readerHasBeenStarted = true;
cachedReaders.invalidate(cacheKey);
this.currentReader = cachedReader;
}
}

Expand All @@ -891,50 +900,56 @@ private void cacheCurrentReader(
@Override
public boolean tryClaim(@Nullable UnboundedSourceValue<OutputT> @ArrayLen(1) [] position) {
try {
if (currentReader == null) {
initializeCurrentReader();
}
checkStateNotNull(currentReader, "currentReader null after initialization");
if (currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) {
return false;
}
if (!readerHasBeenStarted) {
readerHasBeenStarted = true;
if (!currentReader.start()) {
position[0] = null;
return true;
}
} else if (!currentReader.advance()) {
position[0] = null;
return true;
}
position[0] =
UnboundedSourceValue.create(
currentReader.getCurrentRecordId(),
currentReader.getCurrent(),
currentReader.getCurrentTimestamp(),
currentReader.getWatermark());
return true;
return tryClaimOrThrow(position);
} catch (IOException e) {
if (currentReader != null) {
if (this.currentReader != null) {
try {
currentReader.close();
} catch (IOException closeException) {
e.addSuppressed(closeException);
} finally {
currentReader = null;
this.currentReader = null;
}
}
throw new RuntimeException(e);
}
}

private boolean tryClaimOrThrow(
@Nullable UnboundedSourceValue<OutputT> @ArrayLen(1) [] position) throws IOException {
if (this.currentReader == null) {
initializeCurrentReader();
}
UnboundedSource.UnboundedReader<OutputT> currentReader = this.currentReader;
if (currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) {
return false;
}
if (!readerHasBeenStarted) {
readerHasBeenStarted = true;
if (!currentReader.start()) {
position[0] = null;
return true;
}
} else if (!currentReader.advance()) {
position[0] = null;
return true;
}
position[0] =
UnboundedSourceValue.create(
currentReader.getCurrentRecordId(),
currentReader.getCurrent(),
currentReader.getCurrentTimestamp(),
currentReader.getWatermark());
return true;
}

/** The value is invalid if {@link #tryClaim} has ever thrown an exception. */
@Override
public UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction() {
if (currentReader == null) {
return initialRestriction;
}
UnboundedReader<OutputT> currentReader = this.currentReader;
Instant watermark = ensureTimestampWithinBounds(currentReader.getWatermark());
// We convert the reader to the empty reader to mark that we are done.
if (!(currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader)
Expand All @@ -945,7 +960,7 @@ public UnboundedSourceRestriction<OutputT, CheckpointT> currentRestriction() {
} catch (IOException e) {
LOG.warn("Failed to close UnboundedReader.", e);
} finally {
currentReader =
this.currentReader =
EmptyUnboundedSource.INSTANCE.createReader(
PipelineOptionsFactory.create(), checkpointT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
*/
package org.apache.beam.sdk.values;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.Factory;
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand All @@ -41,7 +45,7 @@
public class RowWithGetters extends Row {
private final Object getterTarget;
private final List<FieldValueGetter> getters;
private @Nullable Map<Integer, Object> cache = null;
private @Nullable Map<Integer, @Nullable Object> cache = null;

RowWithGetters(
Schema schema, Factory<List<FieldValueGetter>> getterFactory, Object getterTarget) {
Expand All @@ -65,7 +69,20 @@ public class RowWithGetters extends Row {
if (cache == null) {
cache = new TreeMap<>();
}
fieldValue = cache.computeIfAbsent(fieldIdx, idx -> getters.get(idx).get(getterTarget));
fieldValue =
cache.computeIfAbsent(
fieldIdx,
new Function<Integer, Object>() {
@Override
public Object apply(Integer idx) {
FieldValueGetter getter = getters.get(idx);
checkStateNotNull(getter);
@SuppressWarnings("nullness")
@NonNull
Object value = getter.get(getterTarget);
return value;
}
});
} else {
fieldValue = getters.get(fieldIdx).get(getterTarget);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.protobuf;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.protobuf.Descriptors;
Expand Down Expand Up @@ -385,10 +386,10 @@ private static Schema.Options.Builder getOptions(
case ITERABLE:
Field field = Field.of("OPTION", fieldType);
ProtoDynamicMessageSchema schema = ProtoDynamicMessageSchema.forSchema(Schema.of(field));
optionsBuilder.setOption(
prefix + fieldDescriptor.getFullName(),
fieldType,
schema.createConverter(field).convertFromProtoValue(entry.getValue()));
@SuppressWarnings("rawtypes")
ProtoDynamicMessageSchema.Convert convert = schema.createConverter(field);
Object value = checkArgumentNotNull(convert.convertFromProtoValue(entry.getValue()));
optionsBuilder.setOption(prefix + fieldDescriptor.getFullName(), fieldType, value);
break;
case MAP:
case DATETIME:
Expand Down

0 comments on commit 6e74ddb

Please sign in to comment.