Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/master' into foreachBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jun 15, 2018
2 parents e8073ea + e4fee39 commit 0763a44
Show file tree
Hide file tree
Showing 18 changed files with 313 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
// must be stored in the same memory page.
// (8 byte key length) (key) (value) (8 byte pointer to next value)
int uaoSize = UnsafeAlignedOffset.getUaoSize();
final long recordLength = (2 * uaoSize) + klen + vlen + 8;
final long recordLength = (2L * uaoSize) + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + uaoSize)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private[spark] class BlockManager(
case e: Exception if i < MAX_ATTEMPTS =>
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
Thread.sleep(SLEEP_TIME_SECS * 1000L)
case NonFatal(e) =>
throw new SparkException("Unable to register with external shuffle server due to : " +
e.getMessage, e)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1918,7 +1918,7 @@ def assert_invalid_writer(self, writer, msg=None):
self.fail("invalid writer %s did not fail the query" % str(writer)) # not expected
except Exception as e:
if msg:
assert(msg in str(e), "%s not in %s" % (msg, str(e)))
self.assertTrue(msg in str(e), "%s not in %s" % (msg, str(e)))

finally:
self.stop_all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static long calculateSizeOfUnderlyingByteArray(long numFields, int elemen
private long elementOffset;

private long getElementOffset(int ordinal, int elementSize) {
return elementOffset + ordinal * elementSize;
return elementOffset + ordinal * (long)elementSize;
}

public Object getBaseObject() { return baseObject; }
Expand Down Expand Up @@ -414,46 +414,46 @@ public byte[] toByteArray() {
public short[] toShortArray() {
short[] values = new short[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2);
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2L);
return values;
}

@Override
public int[] toIntArray() {
int[] values = new int[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4);
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4L);
return values;
}

@Override
public long[] toLongArray() {
long[] values = new long[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8);
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8L);
return values;
}

@Override
public float[] toFloatArray() {
float[] values = new float[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4);
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4L);
return values;
}

@Override
public double[] toDoubleArray() {
double[] values = new double[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8);
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8L);
return values;
}

private static UnsafeArrayData fromPrimitiveArray(
Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderPortionInBytes(length);
final long valueRegionInBytes = elementSize * length;
final long valueRegionInBytes = (long)elementSize * length;
final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
@Override
public UnsafeRow appendRow(Object kbase, long koff, int klen,
Object vbase, long voff, int vlen) {
final long recordLength = 8 + klen + vlen + 8;
final long recordLength = 8L + klen + vlen + 8;
// if run out of max supported rows or page size, return null
if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
return null;
Expand Down
Loading

0 comments on commit 0763a44

Please sign in to comment.