From 1d42a8e5fdcb6c8ac2147a2796f3fe159c4a650b Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Mon, 9 Dec 2024 11:20:05 +0000 Subject: [PATCH 1/9] Setting up GitHub Classroom Feedback From 9bf10f879f6a8f515577bf32124c2b1e4198fcc3 Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Mon, 9 Dec 2024 11:20:08 +0000 Subject: [PATCH 2/9] add deadline --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 948dc85..b8e38ee 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/-hH64FG6) ## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! # Цель работы From 45bb01c1c963c464974c782fcfe1daf3ec69e681 Mon Sep 17 00:00:00 2001 From: neevin Date: Wed, 11 Dec 2024 01:19:50 +0300 Subject: [PATCH 3/9] works --- docker-compose.yaml | 29 +++++++++++++ pom.xml | 32 ++++++++++++++ .../java/org/example/SalesAnalysisJob.java | 27 ++++++++++++ src/main/java/org/example/SalesData.java | 43 +++++++++++++++++++ src/main/java/org/example/SalesMapper.java | 22 ++++++++++ src/main/java/org/example/SalesReducer.java | 21 +++++++++ 6 files changed, 174 insertions(+) create mode 100644 docker-compose.yaml create mode 100644 pom.xml create mode 100644 src/main/java/org/example/SalesAnalysisJob.java create mode 100644 src/main/java/org/example/SalesData.java create mode 100644 src/main/java/org/example/SalesMapper.java create mode 100644 src/main/java/org/example/SalesReducer.java diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..6fb3650 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,29 @@ +version: '3' + +services: + namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + container_name: namenode + ports: + - "9870:9870" # Web UI + - "9000:9000" # RPC + environment: + - CLUSTER_NAME=hadoop_cluster + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + - HDFS_CONF_dfs_replication=1 + volumes: + - hadoop_namenode:/hadoop/dfs/name + + datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + container_name: datanode + depends_on: + - namenode + environment: + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + volumes: + - hadoop_datanode:/hadoop/dfs/data + +volumes: + hadoop_namenode: + hadoop_datanode: diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2f71910 --- /dev/null +++ b/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + org.example + sales-analytics + 1.0-SNAPSHOT + + + 8 + 8 + UTF-8 + + + + + + org.apache.hadoop + hadoop-common + 3.2.1 + + + org.apache.hadoop + hadoop-mapreduce-client-core + 3.2.1 + + + + + \ No newline at end of file diff --git a/src/main/java/org/example/SalesAnalysisJob.java b/src/main/java/org/example/SalesAnalysisJob.java new file mode 100644 index 0000000..139d8c0 --- /dev/null +++ b/src/main/java/org/example/SalesAnalysisJob.java @@ -0,0 +1,27 @@ +package org.example; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class SalesAnalysisJob { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + Job job = Job.getInstance(conf, "sales analysis"); + job.setJarByClass(SalesAnalysisJob.class); + job.setMapperClass(SalesMapper.class); + job.setReducerClass(SalesReducer.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(SalesData.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} + diff --git a/src/main/java/org/example/SalesData.java b/src/main/java/org/example/SalesData.java new file mode 100644 index 0000000..ef20164 --- /dev/null +++ b/src/main/java/org/example/SalesData.java @@ -0,0 +1,43 @@ +package org.example; + +import org.apache.hadoop.io.Writable; + +import java.io.IOException; + +public class SalesData implements Writable { + private double revenue; + private int quantity; + + public SalesData() { + } + + public SalesData(double revenue, int quantity) { + this.revenue = revenue; + this.quantity = quantity; + } + + @Override + public void write(java.io.DataOutput out) throws IOException { + out.writeDouble(revenue); + out.writeInt(quantity); + } + + @Override + public void readFields(java.io.DataInput in) throws IOException { + revenue = in.readDouble(); + quantity = in.readInt(); + } + + public double getRevenue() { + return revenue; + } + + public int getQuantity() { + return quantity; + } + + public void add(SalesData other) { + this.revenue += other.revenue; + this.quantity += other.quantity; + } +} \ No newline at end of file diff --git a/src/main/java/org/example/SalesMapper.java b/src/main/java/org/example/SalesMapper.java new file mode 100644 index 0000000..c26a7c6 --- /dev/null +++ b/src/main/java/org/example/SalesMapper.java @@ -0,0 +1,22 @@ +package org.example; + +import java.io.IOException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +public class SalesMapper extends Mapper { + private Text categoryKey = new Text(); + + @Override + protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String[] fields = value.toString().split(","); + if (fields.length == 5 && !fields[0].equals("transaction_id")) { + String category = fields[2]; + double price = Double.parseDouble(fields[3]); + int quantity = Integer.parseInt(fields[4]); + categoryKey.set(category); + context.write(categoryKey, new SalesData(price * quantity, quantity)); + } + } +} diff --git a/src/main/java/org/example/SalesReducer.java b/src/main/java/org/example/SalesReducer.java new file mode 100644 index 0000000..75dc4e4 --- /dev/null +++ b/src/main/java/org/example/SalesReducer.java @@ -0,0 +1,21 @@ +package org.example; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import java.io.IOException; + +public class SalesReducer extends Reducer { + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + double totalRevenue = 0.0; + int totalQuantity = 0; + + for (SalesData val : values) { + totalRevenue += val.getRevenue(); + totalQuantity += val.getQuantity(); + } + + context.write(key, new Text(String.format("%.2f\t%d", totalRevenue, totalQuantity))); + } +} + From 62b57424cd4951041f4ca08d1f3d79b16449e6aa Mon Sep 17 00:00:00 2001 From: neevin Date: Wed, 11 Dec 2024 22:35:58 +0300 Subject: [PATCH 4/9] added second map-reduce --- .../java/org/example/CustomKeyComparator.java | 23 +++++++ .../org/example/CustomTextOutputFormat.java | 45 ++++++++++++++ .../org/example/DescendingIntComparator.java | 19 ++++++ .../java/org/example/SalesAnalysisJob.java | 60 +++++++++++++++---- .../java/org/example/ValueAsKeyMapper.java | 25 ++++++++ .../java/org/example/ValueAsKeyReducer.java | 20 +++++++ 6 files changed, 180 insertions(+), 12 deletions(-) create mode 100644 src/main/java/org/example/CustomKeyComparator.java create mode 100644 src/main/java/org/example/CustomTextOutputFormat.java create mode 100644 src/main/java/org/example/DescendingIntComparator.java create mode 100644 src/main/java/org/example/ValueAsKeyMapper.java create mode 100644 src/main/java/org/example/ValueAsKeyReducer.java diff --git a/src/main/java/org/example/CustomKeyComparator.java b/src/main/java/org/example/CustomKeyComparator.java new file mode 100644 index 0000000..9359aa3 --- /dev/null +++ b/src/main/java/org/example/CustomKeyComparator.java @@ -0,0 +1,23 @@ +package org.example; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; + + +public class CustomKeyComparator extends WritableComparator { + + protected CustomKeyComparator() { + super(Text.class, true); + } + + @Override + public int compare(Object a, Object b) { + Text key1 = (Text) a; + Text key2 = (Text) b; + // Ваше кастомное сравнение ключей + return key1.toString().compareTo(key2.toString()); + } +} + + + diff --git a/src/main/java/org/example/CustomTextOutputFormat.java b/src/main/java/org/example/CustomTextOutputFormat.java new file mode 100644 index 0000000..98f6927 --- /dev/null +++ b/src/main/java/org/example/CustomTextOutputFormat.java @@ -0,0 +1,45 @@ +package org.example; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class CustomTextOutputFormat extends TextOutputFormat { + @Override + public RecordWriter getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + + Path file = getDefaultWorkFile(job, ".txt"); + FileSystem fs = file.getFileSystem(job.getConfiguration()); + FSDataOutputStream fileOut = fs.create(file, false); + + return new CustomRecordWriter(fileOut); + } + + public static class CustomRecordWriter extends RecordWriter { + private DataOutputStream out; + + public CustomRecordWriter(DataOutputStream out) { + this.out = out; + } + + @Override + public void write(Text key, Text value) throws IOException { + // Ваш кастомный формат, например, разбиение табуляцией или другим символом + out.writeBytes(key.toString() + ": " + value.toString() + "\n"); + } + + @Override + public void close(TaskAttemptContext context) throws IOException { + out.close(); + } + } +} + diff --git a/src/main/java/org/example/DescendingIntComparator.java b/src/main/java/org/example/DescendingIntComparator.java new file mode 100644 index 0000000..7c04b85 --- /dev/null +++ b/src/main/java/org/example/DescendingIntComparator.java @@ -0,0 +1,19 @@ +package org.example; + + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.WritableComparator; + +public class DescendingIntComparator extends WritableComparator { + protected DescendingIntComparator() { + super(IntWritable.class, true); + } + + @Override + public int compare(Object a, Object b) { + IntWritable key1 = (IntWritable) a; + IntWritable key2 = (IntWritable) b; + return -1 * key1.compareTo(key2); + } +} + diff --git a/src/main/java/org/example/SalesAnalysisJob.java b/src/main/java/org/example/SalesAnalysisJob.java index 139d8c0..f647121 100644 --- a/src/main/java/org/example/SalesAnalysisJob.java +++ b/src/main/java/org/example/SalesAnalysisJob.java @@ -2,26 +2,62 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import org.apache.hadoop.io.IntWritable; + public class SalesAnalysisJob { public static void main(String[] args) throws Exception { + if (args.length != 3) { + System.err.println("Usage: SalesPipelineJob "); + System.exit(-1); + } + Configuration conf = new Configuration(); - Job job = Job.getInstance(conf, "sales analysis"); - job.setJarByClass(SalesAnalysisJob.class); - job.setMapperClass(SalesMapper.class); - job.setReducerClass(SalesReducer.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(SalesData.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - FileInputFormat.addInputPath(job, new Path(args[0])); - FileOutputFormat.setOutputPath(job, new Path(args[1])); - System.exit(job.waitForCompletion(true) ? 0 : 1); + + // Первая задача: Анализ продаж + Job salesAnalysisJob = Job.getInstance(conf, "sales analysis"); + salesAnalysisJob.setJarByClass(SalesAnalysisJob.class); + salesAnalysisJob.setMapperClass(SalesMapper.class); + salesAnalysisJob.setReducerClass(SalesReducer.class); + salesAnalysisJob.setMapOutputKeyClass(Text.class); + salesAnalysisJob.setMapOutputValueClass(SalesData.class); + salesAnalysisJob.setOutputKeyClass(Text.class); + salesAnalysisJob.setOutputValueClass(Text.class); +// salesAnalysisJob.setSortComparatorClass(CustomKeyComparator.class); +// salesAnalysisJob.setOutputFormatClass(CustomTextOutputFormat.class); + + FileInputFormat.addInputPath(salesAnalysisJob, new Path(args[0])); + Path intermediateOutput = new Path(args[1]); + FileOutputFormat.setOutputPath(salesAnalysisJob, intermediateOutput); + + boolean success = salesAnalysisJob.waitForCompletion(true); + + if (!success) { + System.exit(1); + } + + // Вторая задача: Сортировка по значениям + Job sortByValueJob = Job.getInstance(conf, "sort by value"); + sortByValueJob.setJarByClass(SalesAnalysisJob.class); + sortByValueJob.setMapperClass(ValueAsKeyMapper.class); + sortByValueJob.setReducerClass(ValueAsKeyReducer.class); + sortByValueJob.setMapOutputKeyClass(DoubleWritable.class); + sortByValueJob.setMapOutputValueClass(Text.class); + sortByValueJob.setOutputKeyClass(Text.class); + sortByValueJob.setOutputValueClass(DoubleWritable.class); + sortByValueJob.setSortComparatorClass(DescendingIntComparator.class); + + FileInputFormat.addInputPath(sortByValueJob, intermediateOutput); + FileOutputFormat.setOutputPath(sortByValueJob, new Path(args[2])); + + System.exit(sortByValueJob.waitForCompletion(true) ? 0 : 1); } } + diff --git a/src/main/java/org/example/ValueAsKeyMapper.java b/src/main/java/org/example/ValueAsKeyMapper.java new file mode 100644 index 0000000..ef99d49 --- /dev/null +++ b/src/main/java/org/example/ValueAsKeyMapper.java @@ -0,0 +1,25 @@ +package org.example; + + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class ValueAsKeyMapper extends Mapper { + private DoubleWritable outKey = new DoubleWritable(); + + @Override + protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String[] fields = value.toString().split("\t"); + if (fields.length == 3) { + String originalKey = fields[0]; + double val = Double.parseDouble(fields[1]); + outKey.set(val); + context.write(outKey, new Text(originalKey)); + } + } +} + diff --git a/src/main/java/org/example/ValueAsKeyReducer.java b/src/main/java/org/example/ValueAsKeyReducer.java new file mode 100644 index 0000000..445b99c --- /dev/null +++ b/src/main/java/org/example/ValueAsKeyReducer.java @@ -0,0 +1,20 @@ +package org.example; + + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class ValueAsKeyReducer extends Reducer { + + @Override + protected void reduce(DoubleWritable key, Iterable values, Context context) throws IOException, InterruptedException { + for (Text value : values) { + context.write(value, key); + } + } +} + From 2443a9a8c875326b3c6788984a91af5f59ba8ce2 Mon Sep 17 00:00:00 2001 From: neevin Date: Wed, 11 Dec 2024 22:53:58 +0300 Subject: [PATCH 5/9] fixed sorting --- src/main/java/org/example/SalesAnalysisJob.java | 3 ++- src/main/java/org/example/ValueAsKeyMapper.java | 3 +-- src/main/java/org/example/ValueAsKeyReducer.java | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/example/SalesAnalysisJob.java b/src/main/java/org/example/SalesAnalysisJob.java index f647121..7e7be04 100644 --- a/src/main/java/org/example/SalesAnalysisJob.java +++ b/src/main/java/org/example/SalesAnalysisJob.java @@ -49,9 +49,10 @@ public static void main(String[] args) throws Exception { sortByValueJob.setReducerClass(ValueAsKeyReducer.class); sortByValueJob.setMapOutputKeyClass(DoubleWritable.class); sortByValueJob.setMapOutputValueClass(Text.class); +// sortByValueJob.setSortComparatorClass(DescendingDoubleComparator.class); + sortByValueJob.setOutputKeyClass(Text.class); sortByValueJob.setOutputValueClass(DoubleWritable.class); - sortByValueJob.setSortComparatorClass(DescendingIntComparator.class); FileInputFormat.addInputPath(sortByValueJob, intermediateOutput); FileOutputFormat.setOutputPath(sortByValueJob, new Path(args[2])); diff --git a/src/main/java/org/example/ValueAsKeyMapper.java b/src/main/java/org/example/ValueAsKeyMapper.java index ef99d49..6564d04 100644 --- a/src/main/java/org/example/ValueAsKeyMapper.java +++ b/src/main/java/org/example/ValueAsKeyMapper.java @@ -2,7 +2,6 @@ import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; @@ -17,7 +16,7 @@ protected void map(Object key, Text value, Context context) throws IOException, if (fields.length == 3) { String originalKey = fields[0]; double val = Double.parseDouble(fields[1]); - outKey.set(val); + outKey.set(-1 * val); context.write(outKey, new Text(originalKey)); } } diff --git a/src/main/java/org/example/ValueAsKeyReducer.java b/src/main/java/org/example/ValueAsKeyReducer.java index 445b99c..e8fd832 100644 --- a/src/main/java/org/example/ValueAsKeyReducer.java +++ b/src/main/java/org/example/ValueAsKeyReducer.java @@ -13,7 +13,8 @@ public class ValueAsKeyReducer extends Reducer values, Context context) throws IOException, InterruptedException { for (Text value : values) { - context.write(value, key); + DoubleWritable dw = new DoubleWritable(-1 * key.get()); + context.write(value, dw); } } } From 5fd765446a04de2ae446b9f14cec75e72a5ca010 Mon Sep 17 00:00:00 2001 From: neevin Date: Wed, 11 Dec 2024 22:54:14 +0300 Subject: [PATCH 6/9] fixed sorting --- .../org/example/DescendingIntComparator.java | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 src/main/java/org/example/DescendingIntComparator.java diff --git a/src/main/java/org/example/DescendingIntComparator.java b/src/main/java/org/example/DescendingIntComparator.java deleted file mode 100644 index 7c04b85..0000000 --- a/src/main/java/org/example/DescendingIntComparator.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.example; - - -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.WritableComparator; - -public class DescendingIntComparator extends WritableComparator { - protected DescendingIntComparator() { - super(IntWritable.class, true); - } - - @Override - public int compare(Object a, Object b) { - IntWritable key1 = (IntWritable) a; - IntWritable key2 = (IntWritable) b; - return -1 * key1.compareTo(key2); - } -} - From f43acf05e5b1502761dd25f8908c12530092b5dd Mon Sep 17 00:00:00 2001 From: neevin Date: Wed, 11 Dec 2024 23:30:25 +0300 Subject: [PATCH 7/9] refactor --- .../java/org/example/CustomKeyComparator.java | 23 ---------- .../org/example/CustomTextOutputFormat.java | 45 ------------------- .../java/org/example/SalesAnalysisJob.java | 34 ++++++++------ .../java/org/example/ValueAsKeyReducer.java | 21 --------- .../org/example/{ => calc}/SalesData.java | 10 ++--- .../org/example/{ => calc}/SalesMapper.java | 3 +- .../org/example/{ => calc}/SalesReducer.java | 3 +- .../java/org/example/sort/ValueAsKeyData.java | 40 +++++++++++++++++ .../example/{ => sort}/ValueAsKeyMapper.java | 9 ++-- .../org/example/sort/ValueAsKeyReducer.java | 19 ++++++++ 10 files changed, 90 insertions(+), 117 deletions(-) delete mode 100644 src/main/java/org/example/CustomKeyComparator.java delete mode 100644 src/main/java/org/example/CustomTextOutputFormat.java delete mode 100644 src/main/java/org/example/ValueAsKeyReducer.java rename src/main/java/org/example/{ => calc}/SalesData.java (76%) rename src/main/java/org/example/{ => calc}/SalesMapper.java (92%) rename src/main/java/org/example/{ => calc}/SalesReducer.java (95%) create mode 100644 src/main/java/org/example/sort/ValueAsKeyData.java rename src/main/java/org/example/{ => sort}/ValueAsKeyMapper.java (71%) create mode 100644 src/main/java/org/example/sort/ValueAsKeyReducer.java diff --git a/src/main/java/org/example/CustomKeyComparator.java b/src/main/java/org/example/CustomKeyComparator.java deleted file mode 100644 index 9359aa3..0000000 --- a/src/main/java/org/example/CustomKeyComparator.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.example; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparator; - - -public class CustomKeyComparator extends WritableComparator { - - protected CustomKeyComparator() { - super(Text.class, true); - } - - @Override - public int compare(Object a, Object b) { - Text key1 = (Text) a; - Text key2 = (Text) b; - // Ваше кастомное сравнение ключей - return key1.toString().compareTo(key2.toString()); - } -} - - - diff --git a/src/main/java/org/example/CustomTextOutputFormat.java b/src/main/java/org/example/CustomTextOutputFormat.java deleted file mode 100644 index 98f6927..0000000 --- a/src/main/java/org/example/CustomTextOutputFormat.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.example; - -import java.io.DataOutputStream; -import java.io.IOException; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - -public class CustomTextOutputFormat extends TextOutputFormat { - @Override - public RecordWriter getRecordWriter(TaskAttemptContext job) - throws IOException, InterruptedException { - - Path file = getDefaultWorkFile(job, ".txt"); - FileSystem fs = file.getFileSystem(job.getConfiguration()); - FSDataOutputStream fileOut = fs.create(file, false); - - return new CustomRecordWriter(fileOut); - } - - public static class CustomRecordWriter extends RecordWriter { - private DataOutputStream out; - - public CustomRecordWriter(DataOutputStream out) { - this.out = out; - } - - @Override - public void write(Text key, Text value) throws IOException { - // Ваш кастомный формат, например, разбиение табуляцией или другим символом - out.writeBytes(key.toString() + ": " + value.toString() + "\n"); - } - - @Override - public void close(TaskAttemptContext context) throws IOException { - out.close(); - } - } -} - diff --git a/src/main/java/org/example/SalesAnalysisJob.java b/src/main/java/org/example/SalesAnalysisJob.java index 7e7be04..bcef2c5 100644 --- a/src/main/java/org/example/SalesAnalysisJob.java +++ b/src/main/java/org/example/SalesAnalysisJob.java @@ -7,10 +7,14 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.example.calc.SalesData; +import org.example.calc.SalesMapper; +import org.example.calc.SalesReducer; +import org.example.sort.ValueAsKeyData; +import org.example.sort.ValueAsKeyMapper; +import org.example.sort.ValueAsKeyReducer; -import org.apache.hadoop.io.IntWritable; - public class SalesAnalysisJob { public static void main(String[] args) throws Exception { if (args.length != 3) { @@ -18,9 +22,13 @@ public static void main(String[] args) throws Exception { System.exit(-1); } + String inputDir = args[0]; + String intermediateResultDir = args[1]; + String outputDir = args[2]; + Configuration conf = new Configuration(); - // Первая задача: Анализ продаж + // Анализ продаж Job salesAnalysisJob = Job.getInstance(conf, "sales analysis"); salesAnalysisJob.setJarByClass(SalesAnalysisJob.class); salesAnalysisJob.setMapperClass(SalesMapper.class); @@ -29,11 +37,9 @@ public static void main(String[] args) throws Exception { salesAnalysisJob.setMapOutputValueClass(SalesData.class); salesAnalysisJob.setOutputKeyClass(Text.class); salesAnalysisJob.setOutputValueClass(Text.class); -// salesAnalysisJob.setSortComparatorClass(CustomKeyComparator.class); -// salesAnalysisJob.setOutputFormatClass(CustomTextOutputFormat.class); - FileInputFormat.addInputPath(salesAnalysisJob, new Path(args[0])); - Path intermediateOutput = new Path(args[1]); + FileInputFormat.addInputPath(salesAnalysisJob, new Path(inputDir)); + Path intermediateOutput = new Path(intermediateResultDir); FileOutputFormat.setOutputPath(salesAnalysisJob, intermediateOutput); boolean success = salesAnalysisJob.waitForCompletion(true); @@ -42,20 +48,20 @@ public static void main(String[] args) throws Exception { System.exit(1); } - // Вторая задача: Сортировка по значениям - Job sortByValueJob = Job.getInstance(conf, "sort by value"); + // Сортировка + Job sortByValueJob = Job.getInstance(conf, "sorting by revenue"); sortByValueJob.setJarByClass(SalesAnalysisJob.class); sortByValueJob.setMapperClass(ValueAsKeyMapper.class); sortByValueJob.setReducerClass(ValueAsKeyReducer.class); + sortByValueJob.setMapOutputKeyClass(DoubleWritable.class); - sortByValueJob.setMapOutputValueClass(Text.class); -// sortByValueJob.setSortComparatorClass(DescendingDoubleComparator.class); + sortByValueJob.setMapOutputValueClass(ValueAsKeyData.class); - sortByValueJob.setOutputKeyClass(Text.class); - sortByValueJob.setOutputValueClass(DoubleWritable.class); + sortByValueJob.setOutputKeyClass(ValueAsKeyData.class); + sortByValueJob.setOutputValueClass(Text.class); FileInputFormat.addInputPath(sortByValueJob, intermediateOutput); - FileOutputFormat.setOutputPath(sortByValueJob, new Path(args[2])); + FileOutputFormat.setOutputPath(sortByValueJob, new Path(outputDir)); System.exit(sortByValueJob.waitForCompletion(true) ? 0 : 1); } diff --git a/src/main/java/org/example/ValueAsKeyReducer.java b/src/main/java/org/example/ValueAsKeyReducer.java deleted file mode 100644 index e8fd832..0000000 --- a/src/main/java/org/example/ValueAsKeyReducer.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.example; - - -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Reducer; - -import java.io.IOException; - -public class ValueAsKeyReducer extends Reducer { - - @Override - protected void reduce(DoubleWritable key, Iterable values, Context context) throws IOException, InterruptedException { - for (Text value : values) { - DoubleWritable dw = new DoubleWritable(-1 * key.get()); - context.write(value, dw); - } - } -} - diff --git a/src/main/java/org/example/SalesData.java b/src/main/java/org/example/calc/SalesData.java similarity index 76% rename from src/main/java/org/example/SalesData.java rename to src/main/java/org/example/calc/SalesData.java index ef20164..30b6379 100644 --- a/src/main/java/org/example/SalesData.java +++ b/src/main/java/org/example/calc/SalesData.java @@ -1,7 +1,8 @@ -package org.example; +package org.example.calc; import org.apache.hadoop.io.Writable; +import java.io.DataOutput; import java.io.IOException; public class SalesData implements Writable { @@ -17,7 +18,7 @@ public SalesData(double revenue, int quantity) { } @Override - public void write(java.io.DataOutput out) throws IOException { + public void write(DataOutput out) throws IOException { out.writeDouble(revenue); out.writeInt(quantity); } @@ -35,9 +36,4 @@ public double getRevenue() { public int getQuantity() { return quantity; } - - public void add(SalesData other) { - this.revenue += other.revenue; - this.quantity += other.quantity; - } } \ No newline at end of file diff --git a/src/main/java/org/example/SalesMapper.java b/src/main/java/org/example/calc/SalesMapper.java similarity index 92% rename from src/main/java/org/example/SalesMapper.java rename to src/main/java/org/example/calc/SalesMapper.java index c26a7c6..7c07a59 100644 --- a/src/main/java/org/example/SalesMapper.java +++ b/src/main/java/org/example/calc/SalesMapper.java @@ -1,7 +1,6 @@ -package org.example; +package org.example.calc; import java.io.IOException; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; diff --git a/src/main/java/org/example/SalesReducer.java b/src/main/java/org/example/calc/SalesReducer.java similarity index 95% rename from src/main/java/org/example/SalesReducer.java rename to src/main/java/org/example/calc/SalesReducer.java index 75dc4e4..ce85693 100644 --- a/src/main/java/org/example/SalesReducer.java +++ b/src/main/java/org/example/calc/SalesReducer.java @@ -1,7 +1,8 @@ -package org.example; +package org.example.calc; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; + import java.io.IOException; public class SalesReducer extends Reducer { diff --git a/src/main/java/org/example/sort/ValueAsKeyData.java b/src/main/java/org/example/sort/ValueAsKeyData.java new file mode 100644 index 0000000..bc40eda --- /dev/null +++ b/src/main/java/org/example/sort/ValueAsKeyData.java @@ -0,0 +1,40 @@ +package org.example.sort; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class ValueAsKeyData implements Writable { + private String category; + private int quantity; + + public ValueAsKeyData() { + } + + public ValueAsKeyData(String category, int quantity) { + this.category = category; + this.quantity = quantity; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(category); + out.writeInt(quantity); + } + + @Override + public void readFields(DataInput in) throws IOException { + category = in.readUTF(); + quantity = in.readInt(); + } + + public String getCategory() { + return category; + } + + public int getQuantity() { + return quantity; + } +} diff --git a/src/main/java/org/example/ValueAsKeyMapper.java b/src/main/java/org/example/sort/ValueAsKeyMapper.java similarity index 71% rename from src/main/java/org/example/ValueAsKeyMapper.java rename to src/main/java/org/example/sort/ValueAsKeyMapper.java index 6564d04..1a56c27 100644 --- a/src/main/java/org/example/ValueAsKeyMapper.java +++ b/src/main/java/org/example/sort/ValueAsKeyMapper.java @@ -1,4 +1,4 @@ -package org.example; +package org.example.sort; import org.apache.hadoop.io.DoubleWritable; @@ -7,17 +7,18 @@ import java.io.IOException; -public class ValueAsKeyMapper extends Mapper { +public class ValueAsKeyMapper extends Mapper { private DoubleWritable outKey = new DoubleWritable(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); if (fields.length == 3) { - String originalKey = fields[0]; + String categoryKey = fields[0]; double val = Double.parseDouble(fields[1]); + int quantity = Integer.parseInt(fields[2]); outKey.set(-1 * val); - context.write(outKey, new Text(originalKey)); + context.write(outKey, new ValueAsKeyData(categoryKey, quantity)); } } } diff --git a/src/main/java/org/example/sort/ValueAsKeyReducer.java b/src/main/java/org/example/sort/ValueAsKeyReducer.java new file mode 100644 index 0000000..f762d14 --- /dev/null +++ b/src/main/java/org/example/sort/ValueAsKeyReducer.java @@ -0,0 +1,19 @@ +package org.example.sort; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class ValueAsKeyReducer extends Reducer { + + @Override + protected void reduce(DoubleWritable key, Iterable values, Context context) throws IOException, InterruptedException { + for (ValueAsKeyData value : values) { + Text category = new Text(value.getCategory()); + context.write(category, new Text(String.format("%.2f\t%d", -1 * key.get(), value.getQuantity()))); + } + } +} + From 28ba8c77e5a3a9de8bc6e0c0d47186e0d17271a5 Mon Sep 17 00:00:00 2001 From: neevin Date: Thu, 12 Dec 2024 00:03:02 +0300 Subject: [PATCH 8/9] final --- src/main/java/org/example/SalesAnalysisJob.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/example/SalesAnalysisJob.java b/src/main/java/org/example/SalesAnalysisJob.java index bcef2c5..05583de 100644 --- a/src/main/java/org/example/SalesAnalysisJob.java +++ b/src/main/java/org/example/SalesAnalysisJob.java @@ -17,15 +17,16 @@ public class SalesAnalysisJob { public static void main(String[] args) throws Exception { - if (args.length != 3) { - System.err.println("Usage: SalesPipelineJob "); + if (args.length != 2) { + System.err.println("Usage: hadoop jar /tmp/sales-analytics-1.0-SNAPSHOT.jar org.example.SalesAnalysisJob "); System.exit(-1); } String inputDir = args[0]; - String intermediateResultDir = args[1]; - String outputDir = args[2]; + String outputDir = args[1]; + String intermediateResultDir = outputDir + "-intermediate"; + long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); // Анализ продаж @@ -63,8 +64,9 @@ public static void main(String[] args) throws Exception { FileInputFormat.addInputPath(sortByValueJob, intermediateOutput); FileOutputFormat.setOutputPath(sortByValueJob, new Path(outputDir)); + long endTime = System.currentTimeMillis(); + System.out.println("Jobs completed in " + (endTime - startTime) + " milliseconds."); + System.exit(sortByValueJob.waitForCompletion(true) ? 0 : 1); } } - - From e7b0d8b7c2711d396464d7ffa1e43b17791eeb50 Mon Sep 17 00:00:00 2001 From: neevin Date: Thu, 12 Dec 2024 00:49:50 +0300 Subject: [PATCH 9/9] final --- comparing.md | 33 +++++++++++++++++++ result.txt | 20 +++++++++++ .../java/org/example/SalesAnalysisJob.java | 8 +++-- 3 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 comparing.md create mode 100644 result.txt diff --git a/comparing.md b/comparing.md new file mode 100644 index 0000000..02b1e86 --- /dev/null +++ b/comparing.md @@ -0,0 +1,33 @@ +# Тестировалось на локальном компьютере в докере + +## Максимальный размер на который разбираются данные +С увеличеснием максимального размера, на который разбираются данные, время выполнения падает. +Это наверное связано с тем, что данные небольшие, и их проще обработать без разбиения на более мелкие кусочки. +``` +кол-во воркеров размер данных время (мс) +1 512kb 9807 +1 1mb 8803 +1 2mb 7853 +1 4mb 6829 +1 16mb 6790 +1 64mb 6753 +``` + + +## Количество воркеров +С увеличеснием количества воркеров, время выполнения растёт. +Скорее всего это потому что тест происходит локально на одном физическом компьютере. +``` +кол-во воркеров размер данных время (мс) +1 64mb 6809 +2 64mb 6819 +4 64mb 6748 +6 64mb 8781 +8 64mb 8807 +16 64mb 10772 +``` + +Одновременное увеличение кол-ва воркеров и уменьшение размера данных приводит только +к ещё большему увеличению времени работы. + + diff --git a/result.txt b/result.txt new file mode 100644 index 0000000..1aee7fb --- /dev/null +++ b/result.txt @@ -0,0 +1,20 @@ +clothing 4560302171.99 911487 +video games 4560108307.50 913326 +baby products 4541435362.25 907186 +beauty products 4533874327.85 906417 +gardening tools 4531880837.74 905841 +automotive 4529861310.74 904962 +music instruments 4512294466.14 902389 +furniture 4503986763.16 900244 +electronics 4497526631.04 903266 +pet supplies 4488741730.38 896724 +stationery 4481794912.39 898265 +home appliances 4473888361.73 895815 +sports equipment 4469387812.34 894287 +groceries 4466915230.97 895470 +footwear 4465574983.36 894424 +jewelry 4463823670.79 893980 +office equipment 4463564947.38 892370 +toys 4462453654.12 892741 +books 4457620825.95 890948 +health & wellness 4454082892.49 890475 diff --git a/src/main/java/org/example/SalesAnalysisJob.java b/src/main/java/org/example/SalesAnalysisJob.java index 05583de..70128d6 100644 --- a/src/main/java/org/example/SalesAnalysisJob.java +++ b/src/main/java/org/example/SalesAnalysisJob.java @@ -17,20 +17,24 @@ public class SalesAnalysisJob { public static void main(String[] args) throws Exception { - if (args.length != 2) { - System.err.println("Usage: hadoop jar /tmp/sales-analytics-1.0-SNAPSHOT.jar org.example.SalesAnalysisJob "); + if (args.length != 4) { + System.err.println("Usage: hadoop jar /tmp/sales-analytics-1.0-SNAPSHOT.jar org.example.SalesAnalysisJob "); System.exit(-1); } String inputDir = args[0]; String outputDir = args[1]; + int reducersCount = Integer.parseInt(args[2]); + int datablockSizeMb = Integer.parseInt(args[3]) * ((int) Math.pow(2, 10)); // * 1 kb String intermediateResultDir = outputDir + "-intermediate"; long startTime = System.currentTimeMillis(); Configuration conf = new Configuration(); + conf.set("mapreduce.input.fileinputformat.split.maxsize", Integer.toString(datablockSizeMb)); // Анализ продаж Job salesAnalysisJob = Job.getInstance(conf, "sales analysis"); + salesAnalysisJob.setNumReduceTasks(reducersCount); // Количество reduce задач salesAnalysisJob.setJarByClass(SalesAnalysisJob.class); salesAnalysisJob.setMapperClass(SalesMapper.class); salesAnalysisJob.setReducerClass(SalesReducer.class);