Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into fix-typo-in-…
Browse files Browse the repository at this point in the history
…javautils
  • Loading branch information
sarutak committed Dec 24, 2014
2 parents 99f6f63 + 29fabb1 commit 82bc5d9
Show file tree
Hide file tree
Showing 14 changed files with 45 additions and 32 deletions.
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,9 @@
</execution>
</executions>
<configuration>
<tasks>
<target>
<unzip src="../python/lib/py4j-0.8.2.1-src.zip" dest="../python/build" />
</tasks>
</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.scheduler

import java.nio.ByteBuffer

import scala.language.existentials
import scala.util.control.NonFatal

import org.apache.spark._
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public void sortByKey() {
Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2));
}

@SuppressWarnings("unchecked")
@Test
public void repartitionAndSortWithinPartitions() {
List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>();
Expand Down Expand Up @@ -491,6 +492,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(33, sum);
}

@SuppressWarnings("unchecked")
@Test
public void aggregateByKey() {
JavaPairRDD<Integer, Integer> pairs = sc.parallelizePairs(
Expand Down Expand Up @@ -1556,7 +1558,7 @@ static class Class2 {}
@Test
public void testRegisterKryoClasses() {
SparkConf conf = new SparkConf();
conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class });
conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class });
Assert.assertEquals(
Class1.class.getName() + "," + Class2.class.getName(),
conf.get("spark.kryo.classesToRegister"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.Matchers

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import scala.collection.mutable.ArrayBuffer

class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with ShouldMatchers {
class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Matchers {
test("input metrics when reading text file with single split") {
val file = new File(getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(file))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F

test("accumulator not calculated for resubmitted result stage") {
//just for register
val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
val finalRdd = new MyRDD(sc, 1, Nil)
submit(finalRdd, Array(0))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void tearDown() {
public void tfIdf() {
// The tests are to check Java compatibility.
HashingTF tf = new HashingTF();
@SuppressWarnings("unchecked")
JavaRDD<ArrayList<String>> documents = sc.parallelize(Lists.newArrayList(
Lists.newArrayList("this is a sentence".split(" ")),
Lists.newArrayList("this is another sentence".split(" ")),
Expand All @@ -68,6 +69,7 @@ public void tfIdf() {
public void tfIdfMinimumDocumentFrequency() {
// The tests are to check Java compatibility.
HashingTF tf = new HashingTF();
@SuppressWarnings("unchecked")
JavaRDD<ArrayList<String>> documents = sc.parallelize(Lists.newArrayList(
Lists.newArrayList("this is a sentence".split(" ")),
Lists.newArrayList("this is another sentence".split(" ")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ protected UserDefinedType() { }
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@SuppressWarnings("unchecked")
UserDefinedType<UserType> that = (UserDefinedType<UserType>) o;
return this.sqlType().equals(that.sqlType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
import parquet.hadoop.util.ContextUtil
Expand Down Expand Up @@ -458,7 +458,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
// ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
// empty, thus normally the "_metadata" file is expected to be fairly small).
.orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
.map(ParquetFileReader.readFooter(conf, _))
.map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER))
.getOrElse(
throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void constructComplexRow() {
doubleValue, stringValue, timestampValue, null);

// Complex array
@SuppressWarnings("unchecked")
List<Map<String, Long>> arrayOfMaps = Arrays.asList(simpleMap);
List<Row> arrayOfRows = Arrays.asList(simpleStruct);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.test.TestSQLContext._

import scala.language.postfixOps

class DslQuerySuite extends QueryTest {
import org.apache.spark.sql.TestData._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.parquet

import scala.reflect.ClassTag

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
Expand Down Expand Up @@ -459,11 +461,17 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}

test("make RecordFilter for simple predicates") {
def checkFilter[T <: FilterPredicate](predicate: Expression, defined: Boolean = true): Unit = {
def checkFilter[T <: FilterPredicate : ClassTag](
predicate: Expression,
defined: Boolean = true): Unit = {
val filter = ParquetFilters.createFilter(predicate)
if (defined) {
assert(filter.isDefined)
assert(filter.get.isInstanceOf[T])
val tClass = implicitly[ClassTag[T]].runtimeClass
val filterGet = filter.get
assert(
tClass.isInstance(filterGet),
s"$filterGet of type ${filterGet.getClass} is not an instance of $tClass")
} else {
assert(filter.isEmpty)
}
Expand All @@ -484,7 +492,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA

checkFilter[Operators.And]('a.int === 1 && 'a.int < 4)
checkFilter[Operators.Or]('a.int === 1 || 'a.int < 4)
checkFilter[Operators.Not](!('a.int === 1))
checkFilter[Operators.NotEq[Integer]](!('a.int === 1))

checkFilter('a.int > 'b.int, defined = false)
checkFilter(('a.int > 'b.int) && ('a.int > 'b.int), defined = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.io.Writable
* when "spark.sql.hive.convertMetastoreParquet" is set to true.
*/
@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
"placeholder in the Hive MetaStore")
"placeholder in the Hive MetaStore", "1.2.0")
class FakeParquetSerDe extends SerDe {
override def getObjectInspector: ObjectInspector = new ObjectInspector {
override def getCategory: Category = Category.PRIMITIVE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,21 @@

public class UDFListListInt extends UDF {
/**
*
* @param obj
* SQL schema: array<struct<x: int, y: int, z: int>>
* Java Type: List<List<Integer>>
* @return
* SQL schema: array&lt;struct&lt;x: int, y: int, z: int&gt;&gt;
* Java Type: List&lt;List&lt;Integer&gt;&gt;
*/
@SuppressWarnings("unchecked")
public long evaluate(Object obj) {
if (obj == null) {
return 0l;
return 0L;
}
List<List> listList = (List<List>) obj;
List<List<?>> listList = (List<List<?>>) obj;
long retVal = 0;
for (List aList : listList) {
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) aList;
@SuppressWarnings("unchecked")
Integer someInt = (Integer) list.get(1);
for (List<?> aList : listList) {
Number someInt = (Number) aList.get(1);
try {
retVal += (long) (someInt.intValue());
retVal += someInt.longValue();
} catch (NullPointerException e) {
System.out.println(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void equalIterable(Iterable<?> a, Iterable<?> b) {

@Test
public void testInitialization() {
Assert.assertNotNull(ssc.sc());
Assert.assertNotNull(ssc.sparkContext());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -662,7 +662,7 @@ public void testStreamingContextTransform(){
listOfDStreams1,
new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
assert(listOfRDDs.size() == 2);
Assert.assertEquals(2, listOfRDDs.size());
return null;
}
}
Expand All @@ -675,7 +675,7 @@ public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
listOfDStreams2,
new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
assert(listOfRDDs.size() == 3);
Assert.assertEquals(3, listOfRDDs.size());
JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
Expand Down Expand Up @@ -969,7 +969,7 @@ public Integer call(Tuple2<String, Integer> in) throws Exception {
});

JavaTestUtils.attachTestOutputStream(reversed);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected, result);
}
Expand Down Expand Up @@ -1012,7 +1012,7 @@ public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected, result);
}
Expand Down Expand Up @@ -1163,9 +1163,9 @@ public void testGroupByKeyAndWindow() {
JavaTestUtils.attachTestOutputStream(groupWindowed);
List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);

assert(result.size() == expected.size());
Assert.assertEquals(expected.size(), result.size());
for (int i = 0; i < result.size(); i++) {
assert(convert(result.get(i)).equals(convert(expected.get(i))));
Assert.assertEquals(convert(expected.get(i)), convert(result.get(i)));
}
}

Expand Down Expand Up @@ -1383,7 +1383,7 @@ public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) thro
});

JavaTestUtils.attachTestOutputStream(sorted);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);

Assert.assertEquals(expected, result);
}
Expand Down

0 comments on commit 82bc5d9

Please sign in to comment.