## Hadoop Combiner

Используется для уменьшениы выходных данных маппера, тем самым уменьшает сетевую нагрузки между мапперами и редьюсерами. В основном применяется для неравномерно распределенных данных, и тех данных, у которых количество значений соотвествующих определенному ключу очень много, проядка 100000 или больше. И редьюсер реализует дистрибутивную функцию, например, вычисление максимума, минимума или суммирование значений. 

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [6]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="Hadoop Combiner")
hdfs = "hdfs://localhost:9000/"

#### Пример № 1. Реализация MapReduce-задачи в интерфейсе Hadoop Streaming. Вычисление средного значения количества пунктов в формуле патента  патентообладателей определенной страны.
Это пример неравномерного распределения данных, так как больше всего патентов за период 1960 - 2000 было поданно США. В маппере возмем поле стран за ключи, значения за 1. 

In [1]:
# average_by_attribute_mapper.py

import sys

contry_index = 4 # CONTRY - страна патентаобладателя
claims_index = 8 # CLAIMS - количество пунктов в формуле патента

for line in sys.stdin:
    fields = line.split(",")
    if fields[claims_index] and fields[claims_index].isdigit():
        print(fields[contry_index][1:-1] + '\t' + fields[claims_index])


In [7]:
out_data = sc.textFile(hdfs + "/user/askar/output")

In [8]:
for val in out_data.collect()[:32]:
    print(val)

AD	7
AD	14
AD	28
AD	12
AD	9
AE	4
AE	12
AE	24
AE	16
AE	11
AE	35
AE	16
AE	20
AE	10
AE	7
AE	23
AE	26
AE	11
AE	12
AE	4
AG	20
AG	7
AG	8
AG	12
AG	3
AG	24
AG	14
AG	18
AI	10
AM	18
AN	3
AN	26


Reducer считает среднее значение данных по опредделенному ключу
Combiner = Reducer (в качестве комбайнера будем использовать редьюсер)

In [9]:
# average_by_attribute_reducer.py

import sys

last_key, count, summ = None, 0, 0.0
for line in sys.stdin:
    (key, val) = line.split("\t")
    
    if last_key and last_key != key:
        print("{}\t{}".format(last_key, summ / count))
        summ, count = 0.0, 0
    
    last_key = key
    summ += float(val)
    count += 1

if last_key:
    print("{}\t{}".format(last_key, summ / count))

In [10]:
out_data = sc.textFile(hdfs + "/user/askar/output")

Результаты на выходе комбайнеров: 

In [12]:
for val in out_data.collect()[:50]:
    print(val)

AD	7.0
AD	15.75
AE	15.5
AE	15.2
AG	13.5
AG	13.166666666666666
AI	10.0
AM	18.0
AN	11.333333333333334
AN	4.5
AR	8.191489361702128
AR	10.258555133079849
AT	12.070684314277669
AT	9.46241627387745
AU	10.206333973128599
AU	14.1228402865571
AW	15.5
AZ	11.0
BB	13.0
BB	10.666666666666666
BE	11.14691943127962
BE	12.64612374663732
BG	5.68
BG	4.829721362229102
BH	5.0
BH	8.0
BM	10.8
BM	9.625
BN	9.0
BO	8.777777777777779
BO	20.666666666666668
BR	10.23339317773788
BR	7.894894894894895
BS	13.783333333333333
BS	18.5
BY	15.0
BZ	28.0
CA	10.516544655929723
CA	13.705782951558392
CC	9.0
CD	14.0
CD	6.0
CH	10.744842439356155
CH	12.919727088948788
CI	7.666666666666667
CK	8.0
CL	15.582089552238806
CL	11.4
CN	13.222222222222221
CN	10.862003780718336


Результаты на выходе редьюсеров после использования комбайнеров:

In [13]:
out_data = sc.textFile(hdfs + "/user/askar/output")

In [14]:
for val in out_data.collect()[:50]:
    print(val)

AD	11.375
AE	15.35
AG	13.333333333333332
AI	10.0
AM	18.0
AN	7.916666666666667
AR	9.225022247390989
AT	10.76655029407756
AU	12.16458712984285
AW	15.5
AZ	11.0
BB	11.833333333333332
BE	11.896521588958471
BG	5.25486068111455
BH	6.5
BM	10.2125
BN	9.0
BO	14.722222222222223
BR	9.064144036316389
BS	16.141666666666666
BY	15.0
BZ	28.0
CA	12.111163803744057
CC	9.0
CD	10.0
CH	11.832284764152472
CI	7.666666666666667
CK	8.0
CL	13.491044776119402
CN	12.042113001470279
CO	12.044117647058822
CR	11.616666666666667
CS	7.60668016194332
CU	9.37784090909091
CY	10.785714285714286
CZ	12.823529411764707
DE	11.131700257745276
DK	10.832334668499442
DO	10.642857142857142
DZ	14.0
EC	12.13888888888889
EE	17.0
EG	11.73970588235294
ES	8.6256708539627
FI	10.248721177385207
FO	8.0
FR	10.97657827839095
GB	11.503545243661414
GE	7.5
GF	10.0


К сожалению, ответ не соотвествует реальности, так как редьюсер реализует недистрибутивную функцию.

#### Пример № 1. Реализация MapReduce-задачи в интерфейсе Java API. Вычисление средного значения количества пунктов в формуле патента  патентообладателей определенной страны.
Это пример неравномерного распределения данных, так как больше всего патентов за период 1960 - 2000 было поданно США. В маппере возмем поле стран за ключи, значения за 1. 

In [5]:
sc.stop()