<a href="https://colab.research.google.com/github/hkhong72/big_data/blob/main/221021_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

사용자가 직접 카운트jar 를 사용할수있도록 해보자

com.adacho.counter 패키지생성

DelayCounter (Enum생성)

In [None]:
package com.adacho.counter;

public enum DelayCounter {
   not_available_arrival, scheduled_arrival, early_arrival,
   not_available_departure, scheduled_departure, early_departure
}

com.adacho.mapper 패키지에

DelayCountMapperCounter 클래스생성

In [None]:
package com.adacho.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.adacho.common.AirlinePerformanceParser;
import com.adacho.counter.DelayCounter;

public class DelayCountMapperCounter extends Mapper<LongWritable, Text, Text, IntWritable> {
	private final static IntWritable outputValue = new IntWritable(1);
	private Text outputKey = new Text();
	private String workType;

	@Override // mapper객체생성될때 한번 실행되는 메서드 = setup 메서드
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		workType = context.getConfiguration().get("workType");

	}

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

		if (workType.equals("departure")) { // 출발할때
			if (parser.isDepartureDelayAvailable()) {
				if (parser.getDepartureDelayTime() > 0) {// 도착딜레이시간이양수면 -> 지연상황
					outputKey.set(parser.getYear() + "," + parser.getMonth());
					context.write(outputKey, outputValue);
				} else if (parser.getDepartureDelayTime() == 0) {// 예정시간과 같은 정시에도착
					context.getCounter(DelayCounter.scheduled_departure).increment(1);
				} else if (parser.getDepartureDelayTime() < 0) {// 정시보다도 빨리도착
					context.getCounter(DelayCounter.early_departure).increment(1);
				}
			} else {// 도착 지연시간이 존재하지 않는다면 (데이터가 누락됬다면?)
				context.getCounter(DelayCounter.not_available_departure).increment(1);
			}
		}
		if (workType.equals("arrival")) { // 도착일때
			if (parser.isArriveDelayAvailable()) {
				if (parser.getArriveDelayTime() > 0) { // 도착딜레이시간이양수면 -> 지연상황
					outputKey.set(parser.getYear() + "," + parser.getMonth());
					context.write(outputKey, outputValue);
				} else if (parser.getArriveDelayTime() == 0) { // 예정시간과 같은 정시에도착
					context.getCounter(DelayCounter.scheduled_arrival).increment(1);
				} else if (parser.getArriveDelayTime() < 0) {// 정시보다도 빨리도착
					context.getCounter(DelayCounter.early_arrival).increment(1);
				}
			} else {// 도착 지연시간이 존재하지 않는다면 (데이터가 누락됬다면?)
				context.getCounter(DelayCounter.not_available_arrival).increment(1);
			}
		}
	}
}


com.adacho.driver 패키지에

DelayCountCounter 클래스 생성

In [None]:
package com.adacho.driver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.adacho.mapper.DelayCountMapperCounter;
import com.adacho.reducer.DelayCountReducer;

public class DelayCountCounter extends Configured implements Tool{
	public static void main(String[] args) throws Exception{
		int res = ToolRunner.run(new Configuration(), new DelayCountCounter(), args);
		System.out.println("MapReduce Result : " + res);
	}
	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
		if(otherArgs.length !=2) {
			System.err.println("Usage: DelayCountCounter <input> <output>");
			System.exit(1);
		}
		
		Job job = Job.getInstance(getConf(), "DelayCountCounter");
		
		job.setJarByClass(DelayCountCounter.class); //드라이브 클래스 저장
		job.setMapperClass(DelayCountMapperCounter.class); //메퍼클래스저장
		job.setReducerClass(DelayCountReducer.class); //리듀스클래스저장
		
		job.setInputFormatClass(TextInputFormat.class); //두개중에 길이가 긴 것 임포트
		job.setOutputFormatClass(TextOutputFormat.class); //두개중에 길이가 긴 것 임포트
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //여러개에 인풋데이타가 있으면 다 줄 수 있음
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		
		job.waitForCompletion(true); //job이끝날때까지 기다리고있다가 동기방식으로
		
		return 0;
	}

}


maven clean

manen install

윈도우 c java_workspace airperformance target jar확인하고 경로복사

터미널에서 cd 복사한경로

scp .\airPerformance-0.1.jar ubuntu:~work/java

리눅스 cd /work/java
 
yarn jar ./airPerformance-0.1.jar com.adacho.driver.DelayCountCounter -D workType=departure air-input dep-delay-count-counter

com.adacho.counter.DelayCounter
                early_departure=3484826
                not_available_departure=75723
                scheduled_departure=367259

                확인

hdfs dfs -ls 로 dep-delay-count-counter확인

hdfs dfs -cat dep-delay-count-counter/part-r-00000








매퍼패키지에 DelayCountMapperMultiOut 클래스 생성

In [None]:
package com.adacho.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import com.adacho.common.AirlinePerformanceParser;
import com.adacho.counter.DelayCounter;

public class DelayCountMapperMultiOut extends Mapper<LongWritable, Text, Text, IntWritable>{
	private final static IntWritable outputValue = new IntWritable(1);
	private Text outputKey = new Text();
	//workType은 안쓸꺼임 둘다 구할거라서
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		AirlinePerformanceParser parser = new AirlinePerformanceParser(value);

		
		if (parser.isDepartureDelayAvailable()) {
			if (parser.getDepartureDelayTime() > 0) {// 도착딜레이시간이양수면 -> 지연상황
				outputKey.set("D : " + parser.getYear() + "," + parser.getMonth());
				context.write(outputKey, outputValue);
			} else if (parser.getDepartureDelayTime() == 0) {// 예정시간과 같은 정시에도착
				context.getCounter(DelayCounter.scheduled_departure).increment(1);
			} else if (parser.getDepartureDelayTime() < 0) {// 정시보다도 빨리도착
				context.getCounter(DelayCounter.early_departure).increment(1);
			}
		} else {// 도착 지연시간이 존재하지 않는다면 (데이터가 누락됬다면?)
			context.getCounter(DelayCounter.not_available_departure).increment(1);
		}
		
		
		if (parser.isArriveDelayAvailable()) {
			if (parser.getArriveDelayTime() > 0) { // 도착딜레이시간이양수면 -> 지연상황
				outputKey.set("A : " + parser.getYear() + "," + parser.getMonth());
				context.write(outputKey, outputValue);
			} else if (parser.getArriveDelayTime() == 0) { // 예정시간과 같은 정시에도착
				context.getCounter(DelayCounter.scheduled_arrival).increment(1);
			} else if (parser.getArriveDelayTime() < 0) {// 정시보다도 빨리도착
				context.getCounter(DelayCounter.early_arrival).increment(1);
			}
		} else {// 도착 지연시간이 존재하지 않는다면 (데이터가 누락됬다면?)
			context.getCounter(DelayCounter.not_available_arrival).increment(1);
		}
		
	}

}


com.adacho.reducer 패키지에

DelayCountReducerMultiOut 클래스 생성

In [None]:
package com.adacho.reducer;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class DelayCountReducerMultiOut extends Reducer<Text, IntWritable, Text, IntWritable>{
	private IntWritable result = new IntWritable();
	private MultipleOutputs<Text, IntWritable> mos;
	private Text outputKey = new Text();
	
	@Override
	protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		mos = new MultipleOutputs<Text, IntWritable>(context);
	}

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		String[] columns = key.toString().split(","); //키를 문자열로바꾸고콤마 기준으로짜름
		
		outputKey.set(columns[1] + "," + columns[2]); //연도랑월을붙임
		int sum = 0;
		for(IntWritable value : values) {
			sum += value.get();
		}
		result.set(sum);
		
		if(columns[0].equals("D")) { //짜른것에 첫번째인덱스는 D or A
			mos.write("departure", outputKey, result);
		}else { //A일때
			mos.write("arrival", outputKey, result);
		}
	}

	@Override
	protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		mos.close(); //메모리릭발생방지를 위해 닫아줘야 함
	}
	
	
}


드라이버 패키지에

DelayCountMultiOut 클래스생성



In [None]:
package com.adacho.driver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.adacho.mapper.DelayCountMapperMultiOut;
import com.adacho.reducer.DelayCountReducerMultiOut;

public class DelayCountMultiOut extends Configured implements Tool{
	public static void main(String[] args) throws Exception{
		int res = ToolRunner.run(new Configuration(), new DelayCountMultiOut(), args);
		System.out.println("MapReduce Result : " + res);
	}
	
	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
		if(otherArgs.length !=2) {
			System.err.println("Usage: DelayCountMultiOut <input> <output>");
			System.exit(1);
		}
		
		Job job = Job.getInstance(getConf(), "DelayCountMultiOut");
		
		job.setJarByClass(DelayCountMultiOut.class); //드라이브 클래스 저장
		job.setMapperClass(DelayCountMapperMultiOut.class); //메퍼클래스저장
		job.setReducerClass(DelayCountReducerMultiOut.class); //리듀스클래스저장
		
		job.setInputFormatClass(TextInputFormat.class); //두개중에 길이가 긴 것 임포트
		job.setOutputFormatClass(TextOutputFormat.class); //두개중에 길이가 긴 것 임포트
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //여러개에 인풋데이타가 있으면 다 줄 수 있음
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		
		MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class);
		job.waitForCompletion(true); //job이끝날때까지 기다리고있다가 동기방식으로
		
		return 0;
	}

}


메이븐 클린 메이븐 인스톨

scp .\airPerformance-0.1.jar ubuntu:~/work/java

yarn jar ./airPerformance-0.1.jar com.adacho.driver.DelayCountMultiOut air-input delay-count-mos

File Output Format Counters Bytes Written=0인거 확인

hdfs dfs -ls delay-count-mos

delay-count-mos에 departure-r-00000, arrival-r-00000 에 내용이 들어있는 것 확인

driver클래스에 addNamedOutput메서드에 매개변수이름 준대로 들어감



리눅스 꿀팁

홈디렉터리에서

vi .bashrc

맨아래에

set -o vi 삽입

:wq 저장후나와서

. .bashrc 로 적용한상태에서

esc누르고

/원하는히스토리키워드

n으로 다음껄로넘길수있음

수정할땐 vi모드기 때문에 i로 삽입

컨트롤c로 나오면댐


hdfs dfs -cat delay-count-mos/arrival-r-00000

을 sort정렬하고싶긴함, 정렬방법 ->



airOT199101 03 06 07
airOT199902 03 10 11
새폴더에 넣고 거기경로복사해서
cd 복사한경로
scp ./air* ubuntu:~/work/air-data

vi airOT199101.csv ~ 이번에 올린 총 8개

ddd로 헤더날리고저장

하둡파일시스템에 air-input에 있는거 다 지우고 다시 12개넣기

hdfs dfs -rm air-input/*

hdfs dfs -put ./* air-input/

슬랙에 올려준 txt 받기

com.adacho.common 패키지에

DateKey 클래스 생성

In [None]:
package com.adacho.common;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
public class DateKey implements WritableComparable<DateKey>{
	private String year;
	private Integer month;
	
	public DateKey() {}
	
	public DateKey(String year, Integer month) {
		this.year = year;
		this.month = month;
	}

	public String getYear() {
		return year;
	}

	public void setYear(String year) {
		this.year = year;
	}

	public Integer getMonth() {
		return month;
	}

	public void setMonth(Integer month) {
		this.month = month;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		WritableUtils.writeString(out, year);
		out.writeInt(month);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		year = WritableUtils.readString(in);
		month = in.readInt();
	}

	@Override
	public int compareTo(DateKey o) { //복합키로 비교할때 어떻게 비교할 것인가
		// TODO Auto-generated method stub
		int result = year.compareTo(o.year); //앞에꺼 빼기 뒤에꺼 느낌st -> 앞에꺼가크면양수
		if(result == 0) {
			result = month.compareTo(o.month); //년도가같으면 월도 비교해서 result값에 넣기
		}
		return result;
	}

	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return new StringBuilder().append(year).append(",").append(month).toString();
		// 년도 , 월 붙여서 스트링타입으로 변환
	}
	
	
	

}


com.adacho.common 패키지에

DateKeyComparator 클래스생성


In [None]:
package com.adacho.common;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class DateKeyComparator extends WritableComparator{
	protected DateKeyComparator() {
		super(DateKey.class,true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		// TODO Auto-generated method stub
		DateKey k1 = (DateKey)a;
		DateKey k2 = (DateKey)b;
		
		int cmp = k1.getYear().compareTo(k2.getYear()); //년도비교
		if(cmp != 0) {
			return cmp;
		}
		return k1.getMonth().compareTo(k2.getMonth()); //년도가같을때 월을비교함
		
		//return k1.getMonth() == k2.getMonth() ? 0: (k1.getMonth() < k2.getMonth() ? -1: 1);
		// k1과k2의 month비교하기
	}
	
	

}


common패키지에

GroupKeyPartitioner 클래스생성