-
Notifications
You must be signed in to change notification settings - Fork 46
/
Partitioning.java
67 lines (39 loc) · 2.02 KB
/
Partitioning.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.packt.sfjd.ch7;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.HashPartitioner;
import org.apache.spark.RangePartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
public class Partitioning {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "C:\\softwares\\Winutils");
SparkConf conf = new SparkConf().setMaster("local").setAppName("Partitioning");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaPairRDD<Integer, String> pairRdd = jsc.parallelizePairs(
Arrays.asList(new Tuple2<Integer, String>(1, "A"),new Tuple2<Integer, String>(2, "B"),
new Tuple2<Integer, String>(3, "C"),new Tuple2<Integer, String>(4, "D"),
new Tuple2<Integer, String>(5, "E"),new Tuple2<Integer, String>(6, "F"),
new Tuple2<Integer, String>(7, "G"),new Tuple2<Integer, String>(8, "H")),3);
RDD<Tuple2<Integer, String>> rdd = JavaPairRDD.toRDD(pairRdd);
System.out.println(pairRdd.getNumPartitions());
// JavaPairRDD<Integer, String> hashPartitioned = pairRdd.partitionBy(new HashPartitioner(2));
//
// System.out.println(hashPartitioned.getNumPartitions());
RangePartitioner rangePartitioner = new RangePartitioner(4, rdd, true, scala.math.Ordering.Int$.MODULE$ , scala.reflect.ClassTag$.MODULE$.apply(Integer.class));
JavaPairRDD<Integer, String> rangePartitioned = pairRdd.partitionBy(rangePartitioner);
JavaRDD<String> mapPartitionsWithIndex = rangePartitioned.mapPartitionsWithIndex((index, tupleIterator) -> {
List<String> list=new ArrayList<>();
while(tupleIterator.hasNext()){
list.add("Partition number:"+index+",key:"+tupleIterator.next()._1());
}
return list.iterator();
}, true);
System.out.println(mapPartitionsWithIndex.collect());
}
}