Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/-hH64FG6)
## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!!
# Цель работы

Expand Down
33 changes: 33 additions & 0 deletions comparing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Тестировалось на локальном компьютере в докере

## Максимальный размер на который разбираются данные
С увеличеснием максимального размера, на который разбираются данные, время выполнения падает.
Это наверное связано с тем, что данные небольшие, и их проще обработать без разбиения на более мелкие кусочки.
```
кол-во воркеров размер данных время (мс)
1 512kb 9807
1 1mb 8803
1 2mb 7853
1 4mb 6829
1 16mb 6790
1 64mb 6753
```


## Количество воркеров
С увеличеснием количества воркеров, время выполнения растёт.
Скорее всего это потому что тест происходит локально на одном физическом компьютере.
```
кол-во воркеров размер данных время (мс)
1 64mb 6809
2 64mb 6819
4 64mb 6748
6 64mb 8781
8 64mb 8807
16 64mb 10772
```

Одновременное увеличение кол-ва воркеров и уменьшение размера данных приводит только
к ещё большему увеличению времени работы.


29 changes: 29 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: '3'

services:
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
container_name: namenode
ports:
- "9870:9870" # Web UI
- "9000:9000" # RPC
environment:
- CLUSTER_NAME=hadoop_cluster
- CORE_CONF_fs_defaultFS=hdfs://namenode:9000
- HDFS_CONF_dfs_replication=1
volumes:
- hadoop_namenode:/hadoop/dfs/name

datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
container_name: datanode
depends_on:
- namenode
environment:
- CORE_CONF_fs_defaultFS=hdfs://namenode:9000
volumes:
- hadoop_datanode:/hadoop/dfs/data

volumes:
hadoop_namenode:
hadoop_datanode:
32 changes: 32 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>sales-analytics</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>


</project>
20 changes: 20 additions & 0 deletions result.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
clothing 4560302171.99 911487
video games 4560108307.50 913326
baby products 4541435362.25 907186
beauty products 4533874327.85 906417
gardening tools 4531880837.74 905841
automotive 4529861310.74 904962
music instruments 4512294466.14 902389
furniture 4503986763.16 900244
electronics 4497526631.04 903266
pet supplies 4488741730.38 896724
stationery 4481794912.39 898265
home appliances 4473888361.73 895815
sports equipment 4469387812.34 894287
groceries 4466915230.97 895470
footwear 4465574983.36 894424
jewelry 4463823670.79 893980
office equipment 4463564947.38 892370
toys 4462453654.12 892741
books 4457620825.95 890948
health & wellness 4454082892.49 890475
76 changes: 76 additions & 0 deletions src/main/java/org/example/SalesAnalysisJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.example.calc.SalesData;
import org.example.calc.SalesMapper;
import org.example.calc.SalesReducer;
import org.example.sort.ValueAsKeyData;
import org.example.sort.ValueAsKeyMapper;
import org.example.sort.ValueAsKeyReducer;


public class SalesAnalysisJob {
public static void main(String[] args) throws Exception {
if (args.length != 4) {
System.err.println("Usage: hadoop jar /tmp/sales-analytics-1.0-SNAPSHOT.jar org.example.SalesAnalysisJob <input path> <final output path> <REDUCERS_COUNT=1> <DATABLOCK_SIZE_KB=1>");
System.exit(-1);
}

String inputDir = args[0];
String outputDir = args[1];
int reducersCount = Integer.parseInt(args[2]);
int datablockSizeMb = Integer.parseInt(args[3]) * ((int) Math.pow(2, 10)); // * 1 kb
String intermediateResultDir = outputDir + "-intermediate";

long startTime = System.currentTimeMillis();
Configuration conf = new Configuration();
conf.set("mapreduce.input.fileinputformat.split.maxsize", Integer.toString(datablockSizeMb));

// Анализ продаж
Job salesAnalysisJob = Job.getInstance(conf, "sales analysis");
salesAnalysisJob.setNumReduceTasks(reducersCount); // Количество reduce задач
salesAnalysisJob.setJarByClass(SalesAnalysisJob.class);
salesAnalysisJob.setMapperClass(SalesMapper.class);
salesAnalysisJob.setReducerClass(SalesReducer.class);
salesAnalysisJob.setMapOutputKeyClass(Text.class);
salesAnalysisJob.setMapOutputValueClass(SalesData.class);
salesAnalysisJob.setOutputKeyClass(Text.class);
salesAnalysisJob.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(salesAnalysisJob, new Path(inputDir));
Path intermediateOutput = new Path(intermediateResultDir);
FileOutputFormat.setOutputPath(salesAnalysisJob, intermediateOutput);

boolean success = salesAnalysisJob.waitForCompletion(true);

if (!success) {
System.exit(1);
}

// Сортировка
Job sortByValueJob = Job.getInstance(conf, "sorting by revenue");
sortByValueJob.setJarByClass(SalesAnalysisJob.class);
sortByValueJob.setMapperClass(ValueAsKeyMapper.class);
sortByValueJob.setReducerClass(ValueAsKeyReducer.class);

sortByValueJob.setMapOutputKeyClass(DoubleWritable.class);
sortByValueJob.setMapOutputValueClass(ValueAsKeyData.class);

sortByValueJob.setOutputKeyClass(ValueAsKeyData.class);
sortByValueJob.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(sortByValueJob, intermediateOutput);
FileOutputFormat.setOutputPath(sortByValueJob, new Path(outputDir));

long endTime = System.currentTimeMillis();
System.out.println("Jobs completed in " + (endTime - startTime) + " milliseconds.");

System.exit(sortByValueJob.waitForCompletion(true) ? 0 : 1);
}
}
39 changes: 39 additions & 0 deletions src/main/java/org/example/calc/SalesData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.example.calc;

import org.apache.hadoop.io.Writable;

import java.io.DataOutput;
import java.io.IOException;

public class SalesData implements Writable {
private double revenue;
private int quantity;

public SalesData() {
}

public SalesData(double revenue, int quantity) {
this.revenue = revenue;
this.quantity = quantity;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(revenue);
out.writeInt(quantity);
}

@Override
public void readFields(java.io.DataInput in) throws IOException {
revenue = in.readDouble();
quantity = in.readInt();
}

public double getRevenue() {
return revenue;
}

public int getQuantity() {
return quantity;
}
}
21 changes: 21 additions & 0 deletions src/main/java/org/example/calc/SalesMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.example.calc;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SalesMapper extends Mapper<Object, Text, Text, SalesData> {
private Text categoryKey = new Text();

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length == 5 && !fields[0].equals("transaction_id")) {
String category = fields[2];
double price = Double.parseDouble(fields[3]);
int quantity = Integer.parseInt(fields[4]);
categoryKey.set(category);
context.write(categoryKey, new SalesData(price * quantity, quantity));
}
}
}
22 changes: 22 additions & 0 deletions src/main/java/org/example/calc/SalesReducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.example.calc;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SalesReducer extends Reducer<Text, SalesData, Text, Text> {
@Override
protected void reduce(Text key, Iterable<SalesData> values, Context context) throws IOException, InterruptedException {
double totalRevenue = 0.0;
int totalQuantity = 0;

for (SalesData val : values) {
totalRevenue += val.getRevenue();
totalQuantity += val.getQuantity();
}

context.write(key, new Text(String.format("%.2f\t%d", totalRevenue, totalQuantity)));
}
}

40 changes: 40 additions & 0 deletions src/main/java/org/example/sort/ValueAsKeyData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.example.sort;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ValueAsKeyData implements Writable {
private String category;
private int quantity;

public ValueAsKeyData() {
}

public ValueAsKeyData(String category, int quantity) {
this.category = category;
this.quantity = quantity;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(category);
out.writeInt(quantity);
}

@Override
public void readFields(DataInput in) throws IOException {
category = in.readUTF();
quantity = in.readInt();
}

public String getCategory() {
return category;
}

public int getQuantity() {
return quantity;
}
}
25 changes: 25 additions & 0 deletions src/main/java/org/example/sort/ValueAsKeyMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.example.sort;


import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ValueAsKeyMapper extends Mapper<Object, Text, DoubleWritable, ValueAsKeyData> {
private DoubleWritable outKey = new DoubleWritable();

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
if (fields.length == 3) {
String categoryKey = fields[0];
double val = Double.parseDouble(fields[1]);
int quantity = Integer.parseInt(fields[2]);
outKey.set(-1 * val);
context.write(outKey, new ValueAsKeyData(categoryKey, quantity));
}
}
}

19 changes: 19 additions & 0 deletions src/main/java/org/example/sort/ValueAsKeyReducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.example.sort;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ValueAsKeyReducer extends Reducer<DoubleWritable, ValueAsKeyData, Text, Text> {

@Override
protected void reduce(DoubleWritable key, Iterable<ValueAsKeyData> values, Context context) throws IOException, InterruptedException {
for (ValueAsKeyData value : values) {
Text category = new Text(value.getCategory());
context.write(category, new Text(String.format("%.2f\t%d", -1 * key.get(), value.getQuantity())));
}
}
}