Skip to content

Commit

Permalink
Merge branch 'master' into spark12716
Browse files Browse the repository at this point in the history
  • Loading branch information
ajbozarth committed Jan 14, 2016
2 parents 4b5f7c7 + bcc7373 commit f1cd333
Show file tree
Hide file tree
Showing 390 changed files with 9,413 additions and 7,485 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
ec2/lib/
rat-results.txt
scalastyle.txt
scalastyle-output.xml
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ exportMethods("%in%",
"count",
"countDistinct",
"crc32",
"hash",
"cume_dist",
"date_add",
"date_format",
Expand Down
20 changes: 20 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,26 @@ setMethod("crc32",
column(jc)
})

#' hash
#'
#' Calculates the hash code of given columns, and returns the result as a int column.
#'
#' @rdname hash
#' @name hash
#' @family misc_funcs
#' @export
#' @examples \dontrun{hash(df$c)}
setMethod("hash",
signature(x = "Column"),
function(x, ...) {
jcols <- lapply(list(x, ...), function (x) {
stopifnot(class(x) == "Column")
x@jc
})
jc <- callJStatic("org.apache.spark.sql.functions", "hash", jcols)
column(jc)
})

#' dayofmonth
#'
#' Extracts the day of the month as an integer from a given date/timestamp/string.
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct")
#' @export
setGeneric("crc32", function(x) { standardGeneric("crc32") })

#' @rdname hash
#' @export
setGeneric("hash", function(x, ...) { standardGeneric("hash") })

#' @rdname cume_dist
#' @export
setGeneric("cume_dist", function(x) { standardGeneric("cume_dist") })
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ test_that("column functions", {
c <- column("a")
c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
c3 <- cosh(c) + count(c) + crc32(c) + exp(c)
c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c)
c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c)
Expand Down Expand Up @@ -1173,7 +1173,7 @@ test_that("group by, agg functions", {

expect_equal(3, count(mean(gd)))
expect_equal(3, count(max(gd)))
expect_equal(30, collect(max(gd))[1, 2])
expect_equal(30, collect(max(gd))[2, 2])
expect_equal(1, collect(count(gd))[1, 2])

mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}",
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.9</version>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
187 changes: 187 additions & 0 deletions core/src/main/java/org/apache/spark/api/java/Optional.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java;

import java.io.Serializable;

import com.google.common.base.Preconditions;

/**
* <p>Like {@code java.util.Optional} in Java 8, {@code scala.Option} in Scala, and
* {@code com.google.common.base.Optional} in Google Guava, this class represents a
* value of a given type that may or may not exist. It is used in methods that wish
* to optionally return a value, in preference to returning {@code null}.</p>
*
* <p>In fact, the class here is a reimplementation of the essential API of both
* {@code java.util.Optional} and {@code com.google.common.base.Optional}. From
* {@code java.util.Optional}, it implements:</p>
*
* <ul>
* <li>{@link #empty()}</li>
* <li>{@link #of(Object)}</li>
* <li>{@link #ofNullable(Object)}</li>
* <li>{@link #get()}</li>
* <li>{@link #orElse(Object)}</li>
* <li>{@link #isPresent()}</li>
* </ul>
*
* <p>From {@code com.google.common.base.Optional} it implements:</p>
*
* <ul>
* <li>{@link #absent()}</li>
* <li>{@link #of(Object)}</li>
* <li>{@link #fromNullable(Object)}</li>
* <li>{@link #get()}</li>
* <li>{@link #or(Object)}</li>
* <li>{@link #orNull()}</li>
* <li>{@link #isPresent()}</li>
* </ul>
*
* <p>{@code java.util.Optional} itself is not used at this time because the
* project does not require Java 8. Using {@code com.google.common.base.Optional}
* has in the past caused serious library version conflicts with Guava that can't
* be resolved by shading. Hence this work-alike clone.</p>
*
* @param <T> type of value held inside
*/
public final class Optional<T> implements Serializable {

private static final Optional<?> EMPTY = new Optional<>();

private final T value;

private Optional() {
this.value = null;
}

private Optional(T value) {
Preconditions.checkNotNull(value);
this.value = value;
}

// java.util.Optional API (subset)

/**
* @return an empty {@code Optional}
*/
public static <T> Optional<T> empty() {
@SuppressWarnings("unchecked")
Optional<T> t = (Optional<T>) EMPTY;
return t;
}

/**
* @param value non-null value to wrap
* @return {@code Optional} wrapping this value
* @throws NullPointerException if value is null
*/
public static <T> Optional<T> of(T value) {
return new Optional<>(value);
}

/**
* @param value value to wrap, which may be null
* @return {@code Optional} wrapping this value, which may be empty
*/
public static <T> Optional<T> ofNullable(T value) {
if (value == null) {
return empty();
} else {
return of(value);
}
}

/**
* @return the value wrapped by this {@code Optional}
* @throws NullPointerException if this is empty (contains no value)
*/
public T get() {
Preconditions.checkNotNull(value);
return value;
}

/**
* @param other value to return if this is empty
* @return this {@code Optional}'s value if present, or else the given value
*/
public T orElse(T other) {
return value != null ? value : other;
}

/**
* @return true iff this {@code Optional} contains a value (non-empty)
*/
public boolean isPresent() {
return value != null;
}

// Guava API (subset)
// of(), get() and isPresent() are identically present in the Guava API

/**
* @return an empty {@code Optional}
*/
public static <T> Optional<T> absent() {
return empty();
}

/**
* @param value value to wrap, which may be null
* @return {@code Optional} wrapping this value, which may be empty
*/
public static <T> Optional<T> fromNullable(T value) {
return ofNullable(value);
}

/**
* @param other value to return if this is empty
* @return this {@code Optional}'s value if present, or else the given value
*/
public T or(T other) {
return value != null ? value : other;
}

/**
* @return this {@code Optional}'s value if present, or else null
*/
public T orNull() {
return value;
}

// Common methods

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Optional)) {
return false;
}
Optional<?> other = (Optional<?>) obj;
return value == null ? other.value == null : value.equals(other.value);
}

@Override
public int hashCode() {
return value == null ? 0 : value.hashCode();
}

@Override
public String toString() {
return value == null ? "Optional.empty" : String.format("Optional[%s]", value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
Expand Down Expand Up @@ -185,16 +185,19 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!partitionWriters[i].fileSegment().file().delete()) {
logger.error("Unable to delete file for partition {}", i);
final File file = partitionWriters[i].fileSegment().file();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
threwException = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {

private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);

@Nullable
private final PrefixComparator prefixComparator;
@Nullable
private final RecordComparator recordComparator;
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
Expand Down Expand Up @@ -431,7 +433,11 @@ class SpillableIterator extends UnsafeSorterIterator {

public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) {
this.upstream = inMemIterator;
this.numRecords = inMemIterator.numRecordsLeft();
this.numRecords = inMemIterator.getNumRecords();
}

public int getNumRecords() {
return numRecords;
}

public long spill() throws IOException {
Expand Down Expand Up @@ -558,13 +564,23 @@ class ChainedIterator extends UnsafeSorterIterator {

private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;
private int numRecords;

public ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
assert iterators.size() > 0;
this.numRecords = 0;
for (UnsafeSorterIterator iter: iterators) {
this.numRecords += iter.getNumRecords();
}
this.iterators = iterators;
this.current = iterators.remove();
}

@Override
public int getNumRecords() {
return numRecords;
}

@Override
public boolean hasNext() {
while (!current.hasNext() && !iterators.isEmpty()) {
Expand All @@ -575,6 +591,9 @@ public boolean hasNext() {

@Override
public void loadNext() throws IOException {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
}
current.loadNext();
}

Expand Down
Loading

0 comments on commit f1cd333

Please sign in to comment.