Skip to content
8 changes: 5 additions & 3 deletions docs/package.mill
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,11 @@ object `package` extends RootModule {
if (brokenLocalLinks().nonEmpty){
throw new Exception("Broken Local Links: " + upickle.default.write(brokenLocalLinks(), indent = 2))
}
if (brokenRemoteLinks().nonEmpty){
throw new Exception("Broken Rmote Links: " + upickle.default.write(brokenRemoteLinks(), indent = 2))
}
// This is flaky due to rate limits so ignore it for now

// if (brokenRemoteLinks().nonEmpty){
// throw new Exception("Broken Rmote Links: " + upickle.default.write(brokenRemoteLinks(), indent = 2))
// }
}

def brokenLocalLinks: T[Map[os.Path, Seq[(String, String)]]] = Task{
Expand Down
42 changes: 41 additions & 1 deletion main/api/src/mill/api/SystemStreams.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package mill.api

import java.io.{InputStream, OutputStream, PrintStream}
import mill.main.client.InputPumper
import mill.main.client.{DebugLog, InputPumper}

import scala.util.DynamicVariable

Expand Down Expand Up @@ -175,4 +175,44 @@ object SystemStreams {
override def transferTo(out: OutputStream): Long = delegate().transferTo(out)
}
}
private def debugPrintln(s: String) =
DebugLog.println(pprint.apply(s.toCharArray, width = 999).toString)
private[mill] class DebugDelegateStream(delegate0: SystemStreams) extends SystemStreams(
new PrintStream(new ThreadLocalStreams.ProxyOutputStream {
override def delegate(): OutputStream = delegate0.out

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
debugPrintln(new String(b, off, len))
super.write(b, off, len)
}

override def write(b: Array[Byte]): Unit = {
debugPrintln(new String(b))
super.write(b)
}

override def write(b: Int): Unit = {
debugPrintln(new String(Array(b.toByte)))
super.write(b)
}
}),
new PrintStream(new ThreadLocalStreams.ProxyOutputStream {
override def delegate(): OutputStream = delegate0.err
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
debugPrintln(new String(b, off, len))
super.write(b, off, len)
}

override def write(b: Array[Byte]): Unit = {
debugPrintln(new String(b))
super.write(b)
}

override def write(b: Int): Unit = {
debugPrintln(new String(Array(b.toByte)))
super.write(b)
}
}),
delegate0.in
)
}
31 changes: 21 additions & 10 deletions main/client/src/mill/main/client/ProxyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,20 @@ public static class Pumper implements Runnable{
private InputStream src;
private OutputStream destOut;
private OutputStream destErr;
public Pumper(InputStream src, OutputStream destOut, OutputStream destErr){
private Object synchronizer;
public Pumper(InputStream src, OutputStream destOut, OutputStream destErr, Object synchronizer){
this.src = src;
this.destOut = destOut;
this.destErr = destErr;
this.synchronizer = synchronizer;
}
public Pumper(InputStream src, OutputStream destOut, OutputStream destErr){
this(src, destOut, destErr, new Object());
}

public void preRead(InputStream src){}

public void preWrite(){}
public void preWrite(byte[] buffer, int length){}

public void run() {

Expand Down Expand Up @@ -141,10 +146,12 @@ public void run() {
}

if (delta != -1) {
this.preWrite();
switch(stream){
case ProxyStream.OUT: destOut.write(buffer, 0, offset); break;
case ProxyStream.ERR: destErr.write(buffer, 0, offset); break;
synchronized (synchronizer) {
this.preWrite(buffer, offset);
switch(stream){
case ProxyStream.OUT: destOut.write(buffer, 0, offset); break;
case ProxyStream.ERR: destErr.write(buffer, 0, offset); break;
}
}
}
}
Expand All @@ -158,14 +165,18 @@ public void run() {
}

try {
destOut.flush();
destErr.flush();
synchronized (synchronizer) {
destOut.flush();
destErr.flush();
}
} catch(IOException e) {}
}

public void flush() throws IOException {
destOut.flush();
destErr.flush();
synchronized (synchronizer) {
destOut.flush();
destErr.flush();
}
}
}
}
17 changes: 8 additions & 9 deletions main/eval/src/mill/eval/EvaluatorCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
}
}
}

// Make sure we wait for all tasks from this batch to finish before starting the next
// one, so we don't mix up exclusive and non-exclusive tasks running at the same time
terminals.map(t => (t, Await.result(futures(t), duration.Duration.Inf)))
}

val tasks0 = terminals0.filter {
Expand All @@ -216,18 +220,13 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {

// Run all non-command tasks according to the threads
// given but run the commands in linear order
evaluateTerminals(
tasks,
ec,
exclusive = false
)
val nonExclusiveResults = evaluateTerminals(tasks, ec, exclusive = false)

evaluateTerminals(leafExclusiveCommands, ec, exclusive = true)
val exclusiveResults = evaluateTerminals(leafExclusiveCommands, ec, exclusive = true)

logger.clearPromptStatuses()
val finishedOptsMap = terminals0
.map(t => (t, Await.result(futures(t), duration.Duration.Inf)))
.toMap

val finishedOptsMap = (nonExclusiveResults ++ exclusiveResults).toMap

val results0: Vector[(Task[_], TaskResult[(Val, Int)])] = terminals0
.flatMap { t =>
Expand Down
Loading
Loading