Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/gradle_jdk11.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This workflow will build a Java project with Gradle
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-gradle

name: JDK 11

on:
push:
branches: [ 3.x ]
pull_request:
branches: [ 3.x ]

jobs:
build:

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Cache Gradle packages
uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build PR
run: ./gradlew -PreleaseMode=pr build --stacktrace
#- name: Upload to Codecov
# uses: codecov/codecov-action@v1
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,24 @@ public final void setFuture(Future<?> future) {
public Runnable getWrappedRunnable() {
return runnable;
}

@Override
public String toString() {
String status;
Future<?> f = get();
if (f == FINISHED) {
status = "Finished";
} else if (f == DISPOSED) {
status = "Disposed";
} else {
Thread r = runner;
if (r != null) {
status = "Running on " + runner;
} else {
status = "Waiting";
}
}

return getClass().getSimpleName() + "[" + status + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void run() {
runner = null;
} catch (Throwable ex) {
// Exceptions.throwIfFatal(ex); nowhere to go
runner = null;
dispose();
runner = null;
RxJavaPlugins.onError(ex);
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public void run() {
throw e;
}
} finally {
lazySet(THREAD_INDEX, null);
Object o = get(PARENT_INDEX);
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
((DisposableContainer)o).delete(this);
Expand All @@ -81,6 +80,7 @@ public void run() {
break;
}
}
lazySet(THREAD_INDEX, null);
}
}

Expand Down Expand Up @@ -137,4 +137,26 @@ public boolean isDisposed() {
Object o = get(PARENT_INDEX);
return o == PARENT_DISPOSED || o == DONE;
}

@Override
public String toString() {
String state;
Object o = get(FUTURE_INDEX);
if (o == DONE) {
state = "Finished";
} else if (o == SYNC_DISPOSED) {
state = "Disposed(Sync)";
} else if (o == ASYNC_DISPOSED) {
state = "Disposed(Async)";
} else {
o = get(THREAD_INDEX);
if (o == null) {
state = "Waiting";
} else {
state = "Running on " + o;
}
}

return getClass().getSimpleName() + "[" + state + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,31 @@ public void run() {
TestHelper.race(r1, r2);
}
}

static class TestDirectTask extends AbstractDirectTask {
private static final long serialVersionUID = 587679821055711738L;

TestDirectTask() {
super(Functions.EMPTY_RUNNABLE);
}
}

@Test
public void toStringStates() {
TestDirectTask task = new TestDirectTask();

assertEquals("TestDirectTask[Waiting]", task.toString());

task.runner = Thread.currentThread();

assertEquals("TestDirectTask[Running on " + Thread.currentThread() + "]", task.toString());

task.dispose();

assertEquals("TestDirectTask[Disposed]", task.toString());

task.set(AbstractDirectTask.FINISHED);

assertEquals("TestDirectTask[Finished]", task.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,29 @@ public void withParentIsDisposed() {

assertFalse(set.remove(run));
}

@Test
public void toStringStates() {
CompositeDisposable set = new CompositeDisposable();
ScheduledRunnable task = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set);

assertEquals("ScheduledRunnable[Waiting]", task.toString());

task.set(ScheduledRunnable.THREAD_INDEX, Thread.currentThread());

assertEquals("ScheduledRunnable[Running on " + Thread.currentThread() + "]", task.toString());

task.dispose();

assertEquals("ScheduledRunnable[Disposed(Sync)]", task.toString());

task.set(ScheduledRunnable.FUTURE_INDEX, ScheduledRunnable.DONE);

assertEquals("ScheduledRunnable[Finished]", task.toString());

task = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set);
task.dispose();

assertEquals("ScheduledRunnable[Disposed(Async)]", task.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
Expand Down Expand Up @@ -771,4 +772,51 @@ public void schedulePeriodicallyDirectNullRunnable() {
assertEquals("run is null", npe.getMessage());
}
}

void schedulePrint(Function<Runnable, Disposable> onSchedule) {
CountDownLatch waitForBody = new CountDownLatch(1);
CountDownLatch waitForPrint = new CountDownLatch(1);

try {
Disposable d = onSchedule.apply(() -> {
waitForBody.countDown();
try {
waitForPrint.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});

waitForBody.await();

assertNotEquals("", d.toString());
} catch (Throwable ex) {
throw new AssertionError(ex);
} finally {
waitForPrint.countDown();
}
}

@Test
public void scheduleDirectPrint() {
if (getScheduler() instanceof TrampolineScheduler) {
// no concurrency with Trampoline
return;
}
schedulePrint(r -> getScheduler().scheduleDirect(r));
}

@Test
public void schedulePrint() {
if (getScheduler() instanceof TrampolineScheduler) {
// no concurrency with Trampoline
return;
}
Worker worker = getScheduler().createWorker();
try {
schedulePrint(worker::schedule);
} finally {
worker.dispose();
}
}
}