diff --git a/README.md b/README.md index 948dc85..b8e38ee 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/-hH64FG6) ## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! # Цель работы diff --git a/comparing.md b/comparing.md new file mode 100644 index 0000000..02b1e86 --- /dev/null +++ b/comparing.md @@ -0,0 +1,33 @@ +# Тестировалось на локальном компьютере в докере + +## Максимальный размер на который разбираются данные +С увеличеснием максимального размера, на который разбираются данные, время выполнения падает. +Это наверное связано с тем, что данные небольшие, и их проще обработать без разбиения на более мелкие кусочки. +``` +кол-во воркеров размер данных время (мс) +1 512kb 9807 +1 1mb 8803 +1 2mb 7853 +1 4mb 6829 +1 16mb 6790 +1 64mb 6753 +``` + + +## Количество воркеров +С увеличеснием количества воркеров, время выполнения растёт. +Скорее всего это потому что тест происходит локально на одном физическом компьютере. +``` +кол-во воркеров размер данных время (мс) +1 64mb 6809 +2 64mb 6819 +4 64mb 6748 +6 64mb 8781 +8 64mb 8807 +16 64mb 10772 +``` + +Одновременное увеличение кол-ва воркеров и уменьшение размера данных приводит только +к ещё большему увеличению времени работы. + + diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..6fb3650 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,29 @@ +version: '3' + +services: + namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + container_name: namenode + ports: + - "9870:9870" # Web UI + - "9000:9000" # RPC + environment: + - CLUSTER_NAME=hadoop_cluster + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + - HDFS_CONF_dfs_replication=1 + volumes: + - hadoop_namenode:/hadoop/dfs/name + + datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + container_name: datanode + depends_on: + - namenode + environment: + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + volumes: + - hadoop_datanode:/hadoop/dfs/data + +volumes: + hadoop_namenode: + hadoop_datanode: diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2f71910 --- /dev/null +++ b/pom.xml @@ -0,0 +1,32 @@ + + + 4.0.0 + + org.example + sales-analytics + 1.0-SNAPSHOT + + + 8 + 8 + UTF-8 + + + + + + org.apache.hadoop + hadoop-common + 3.2.1 + + + org.apache.hadoop + hadoop-mapreduce-client-core + 3.2.1 + + + + + \ No newline at end of file diff --git a/result.txt b/result.txt new file mode 100644 index 0000000..1aee7fb --- /dev/null +++ b/result.txt @@ -0,0 +1,20 @@ +clothing 4560302171.99 911487 +video games 4560108307.50 913326 +baby products 4541435362.25 907186 +beauty products 4533874327.85 906417 +gardening tools 4531880837.74 905841 +automotive 4529861310.74 904962 +music instruments 4512294466.14 902389 +furniture 4503986763.16 900244 +electronics 4497526631.04 903266 +pet supplies 4488741730.38 896724 +stationery 4481794912.39 898265 +home appliances 4473888361.73 895815 +sports equipment 4469387812.34 894287 +groceries 4466915230.97 895470 +footwear 4465574983.36 894424 +jewelry 4463823670.79 893980 +office equipment 4463564947.38 892370 +toys 4462453654.12 892741 +books 4457620825.95 890948 +health & wellness 4454082892.49 890475 diff --git a/src/main/java/org/example/SalesAnalysisJob.java b/src/main/java/org/example/SalesAnalysisJob.java new file mode 100644 index 0000000..70128d6 --- /dev/null +++ b/src/main/java/org/example/SalesAnalysisJob.java @@ -0,0 +1,76 @@ +package org.example; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.example.calc.SalesData; +import org.example.calc.SalesMapper; +import org.example.calc.SalesReducer; +import org.example.sort.ValueAsKeyData; +import org.example.sort.ValueAsKeyMapper; +import org.example.sort.ValueAsKeyReducer; + + +public class SalesAnalysisJob { + public static void main(String[] args) throws Exception { + if (args.length != 4) { + System.err.println("Usage: hadoop jar /tmp/sales-analytics-1.0-SNAPSHOT.jar org.example.SalesAnalysisJob "); + System.exit(-1); + } + + String inputDir = args[0]; + String outputDir = args[1]; + int reducersCount = Integer.parseInt(args[2]); + int datablockSizeMb = Integer.parseInt(args[3]) * ((int) Math.pow(2, 10)); // * 1 kb + String intermediateResultDir = outputDir + "-intermediate"; + + long startTime = System.currentTimeMillis(); + Configuration conf = new Configuration(); + conf.set("mapreduce.input.fileinputformat.split.maxsize", Integer.toString(datablockSizeMb)); + + // Анализ продаж + Job salesAnalysisJob = Job.getInstance(conf, "sales analysis"); + salesAnalysisJob.setNumReduceTasks(reducersCount); // Количество reduce задач + salesAnalysisJob.setJarByClass(SalesAnalysisJob.class); + salesAnalysisJob.setMapperClass(SalesMapper.class); + salesAnalysisJob.setReducerClass(SalesReducer.class); + salesAnalysisJob.setMapOutputKeyClass(Text.class); + salesAnalysisJob.setMapOutputValueClass(SalesData.class); + salesAnalysisJob.setOutputKeyClass(Text.class); + salesAnalysisJob.setOutputValueClass(Text.class); + + FileInputFormat.addInputPath(salesAnalysisJob, new Path(inputDir)); + Path intermediateOutput = new Path(intermediateResultDir); + FileOutputFormat.setOutputPath(salesAnalysisJob, intermediateOutput); + + boolean success = salesAnalysisJob.waitForCompletion(true); + + if (!success) { + System.exit(1); + } + + // Сортировка + Job sortByValueJob = Job.getInstance(conf, "sorting by revenue"); + sortByValueJob.setJarByClass(SalesAnalysisJob.class); + sortByValueJob.setMapperClass(ValueAsKeyMapper.class); + sortByValueJob.setReducerClass(ValueAsKeyReducer.class); + + sortByValueJob.setMapOutputKeyClass(DoubleWritable.class); + sortByValueJob.setMapOutputValueClass(ValueAsKeyData.class); + + sortByValueJob.setOutputKeyClass(ValueAsKeyData.class); + sortByValueJob.setOutputValueClass(Text.class); + + FileInputFormat.addInputPath(sortByValueJob, intermediateOutput); + FileOutputFormat.setOutputPath(sortByValueJob, new Path(outputDir)); + + long endTime = System.currentTimeMillis(); + System.out.println("Jobs completed in " + (endTime - startTime) + " milliseconds."); + + System.exit(sortByValueJob.waitForCompletion(true) ? 0 : 1); + } +} diff --git a/src/main/java/org/example/calc/SalesData.java b/src/main/java/org/example/calc/SalesData.java new file mode 100644 index 0000000..30b6379 --- /dev/null +++ b/src/main/java/org/example/calc/SalesData.java @@ -0,0 +1,39 @@ +package org.example.calc; + +import org.apache.hadoop.io.Writable; + +import java.io.DataOutput; +import java.io.IOException; + +public class SalesData implements Writable { + private double revenue; + private int quantity; + + public SalesData() { + } + + public SalesData(double revenue, int quantity) { + this.revenue = revenue; + this.quantity = quantity; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeDouble(revenue); + out.writeInt(quantity); + } + + @Override + public void readFields(java.io.DataInput in) throws IOException { + revenue = in.readDouble(); + quantity = in.readInt(); + } + + public double getRevenue() { + return revenue; + } + + public int getQuantity() { + return quantity; + } +} \ No newline at end of file diff --git a/src/main/java/org/example/calc/SalesMapper.java b/src/main/java/org/example/calc/SalesMapper.java new file mode 100644 index 0000000..7c07a59 --- /dev/null +++ b/src/main/java/org/example/calc/SalesMapper.java @@ -0,0 +1,21 @@ +package org.example.calc; + +import java.io.IOException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +public class SalesMapper extends Mapper { + private Text categoryKey = new Text(); + + @Override + protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String[] fields = value.toString().split(","); + if (fields.length == 5 && !fields[0].equals("transaction_id")) { + String category = fields[2]; + double price = Double.parseDouble(fields[3]); + int quantity = Integer.parseInt(fields[4]); + categoryKey.set(category); + context.write(categoryKey, new SalesData(price * quantity, quantity)); + } + } +} diff --git a/src/main/java/org/example/calc/SalesReducer.java b/src/main/java/org/example/calc/SalesReducer.java new file mode 100644 index 0000000..ce85693 --- /dev/null +++ b/src/main/java/org/example/calc/SalesReducer.java @@ -0,0 +1,22 @@ +package org.example.calc; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class SalesReducer extends Reducer { + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + double totalRevenue = 0.0; + int totalQuantity = 0; + + for (SalesData val : values) { + totalRevenue += val.getRevenue(); + totalQuantity += val.getQuantity(); + } + + context.write(key, new Text(String.format("%.2f\t%d", totalRevenue, totalQuantity))); + } +} + diff --git a/src/main/java/org/example/sort/ValueAsKeyData.java b/src/main/java/org/example/sort/ValueAsKeyData.java new file mode 100644 index 0000000..bc40eda --- /dev/null +++ b/src/main/java/org/example/sort/ValueAsKeyData.java @@ -0,0 +1,40 @@ +package org.example.sort; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class ValueAsKeyData implements Writable { + private String category; + private int quantity; + + public ValueAsKeyData() { + } + + public ValueAsKeyData(String category, int quantity) { + this.category = category; + this.quantity = quantity; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(category); + out.writeInt(quantity); + } + + @Override + public void readFields(DataInput in) throws IOException { + category = in.readUTF(); + quantity = in.readInt(); + } + + public String getCategory() { + return category; + } + + public int getQuantity() { + return quantity; + } +} diff --git a/src/main/java/org/example/sort/ValueAsKeyMapper.java b/src/main/java/org/example/sort/ValueAsKeyMapper.java new file mode 100644 index 0000000..1a56c27 --- /dev/null +++ b/src/main/java/org/example/sort/ValueAsKeyMapper.java @@ -0,0 +1,25 @@ +package org.example.sort; + + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class ValueAsKeyMapper extends Mapper { + private DoubleWritable outKey = new DoubleWritable(); + + @Override + protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String[] fields = value.toString().split("\t"); + if (fields.length == 3) { + String categoryKey = fields[0]; + double val = Double.parseDouble(fields[1]); + int quantity = Integer.parseInt(fields[2]); + outKey.set(-1 * val); + context.write(outKey, new ValueAsKeyData(categoryKey, quantity)); + } + } +} + diff --git a/src/main/java/org/example/sort/ValueAsKeyReducer.java b/src/main/java/org/example/sort/ValueAsKeyReducer.java new file mode 100644 index 0000000..f762d14 --- /dev/null +++ b/src/main/java/org/example/sort/ValueAsKeyReducer.java @@ -0,0 +1,19 @@ +package org.example.sort; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class ValueAsKeyReducer extends Reducer { + + @Override + protected void reduce(DoubleWritable key, Iterable values, Context context) throws IOException, InterruptedException { + for (ValueAsKeyData value : values) { + Text category = new Text(value.getCategory()); + context.write(category, new Text(String.format("%.2f\t%d", -1 * key.get(), value.getQuantity()))); + } + } +} +