/
StandardDeviations.java
109 lines (95 loc) · 4.2 KB
/
StandardDeviations.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package com.tetsuyaodaka.hadoop.math.matrix;
import java.io.IOException;
import java.math.BigDecimal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* StandardDeviationsクラス
*
* (すでに計算された)平均と観測値をつかって、(不偏)標準偏差を算出する。
*
*/
public class StandardDeviations {
/*
* 全データを読み込んで、変量のインデックスをキーとして、Textで書き出す。
*
*/
public static class MapAll extends Mapper<LongWritable, Text, IntWritable, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String strArr[] = value.toString().split("\t");
String keyArr[] = strArr[0].split(" ");
int var= Integer.parseInt(keyArr[0]); // number of column
context.write(new IntWritable(var), value);
}
}
/*
* 変量ごとの算術平均の計算結果を読んで、変量のインデックスをキーとして、Textで書き出す。
* この際、平均値の後ろにmeanとつけて、reduceで読んだときのマークとする。
*
*/
public static class MapMean extends Mapper<LongWritable, Text, IntWritable, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] strArr = line.split("\t");
value = new Text(strArr[1]+" mean");
context.write(new IntWritable(Integer.parseInt(strArr[0])), value);
}
}
public static class Reduce extends Reducer<IntWritable, Text, IntWritable, DoubleWritable>{
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
double sum = 0;
double num = 0;
double mean = 0;
for(Text value: values){
if(this.isMean(value.toString())){
mean = this.getMean(value.toString());
} else {
String strArr[] = value.toString().split("\t");
sum += Double.parseDouble(strArr[1]) * Double.parseDouble(strArr[1]);
num++;
}
}
double ss = (sum - num*mean*mean)/(num-1);
double s = Math.sqrt(ss);
BigDecimal bd = new BigDecimal(s);
BigDecimal r = bd.setScale(2, BigDecimal.ROUND_HALF_UP);
context.write(key, new DoubleWritable(r.doubleValue()));
}
private boolean isMean(String line){
if(line.indexOf("mean")==-1) return false;
return true;
}
private double getMean(String line){
String[] strArr = line.split(" ");
return Double.parseDouble(strArr[0]);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job(new Configuration(), "StandardDeviations");
job.setJarByClass(StandardDeviations.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(DoubleWritable.class);
// Mapperごとに読み込むファイルを変える。
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapAll.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapMean.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
boolean success = job.waitForCompletion(true);
System.out.println(success);
}
}