# Trabajo Practico N°1 - Hadoop MapReduce

## Introducción

En el siguiente trabajo se busca desarrollar una serie de soluciones a problemas de Big Data utilizando el paradigma Hadoop MapReduce en un entorno simulado.


## Eje. 1

Implementamos una solución basada en dos JOBS, el primero se encarga de determinar el total para cada retador y cada retado y el segundo se encarga de determinar el máximo retador y el máximo retado. 


In [22]:
from MRE import Job

root_path = './' #Simplemente para poder usar rootpaths
input_path = root_path + "input/"
output_path = root_path + "output/"
input_dir = input_path
output_dir1 = output_path + "eje1a/"
output_dir2 = output_path + "eje1b/"

def fmap1 (key, value, context):
    data = value.split('\t')
    id_retador = key
    id_retado = data[0]
    context.write(("R",id_retador), 1)
    context.write(("Re", id_retado), 1)

def fcomb(key, value, context):
    c = 0
    for v in value: 
        c += v
    context.write(key, c)

def freduce1 (key, value, context):
    n = 0
    for v in value:
        n += v
    context.write(key, n)

def fmap2(key,value,context):
    data = value.split()
    
    context.write(key,(data[0],int(data[1])))

def freduce2(key,value,context):
    max_id = -1
    max_occurrence = 0
    for v in value:
        if (max_occurrence < v[1]):
            max_occurrence = v[1]
            max_id = v[0]
    context.write(max_id, max_occurrence)

job1 = Job(input_dir,output_dir1,fmap1,freduce1)
job1.setCombiner(fcomb)
job2 = Job(output_dir1,output_dir2,fmap2,freduce2)
job1.waitForCompletion()
job2.waitForCompletion()

    

True

## Eje. 2

Similar al ejercicio anterior implementamos una solución basada en dos Jobs, el primero se encarga de determinar para cada id los puntos en promedio y el segundo determina el máximo.

In [23]:
from MRE import Job

root_path = './' #Simplemente para poder usar rootpaths
input_path = root_path + "input/"
output_path = root_path + "output/"
input_dir = input_path
output_dir1 = output_path + "eje2a/"
output_dir2 = output_path + "eje2b/"

def fmap1 (key, value, context):
    data = value.split('\t')
    id_retado = data[0]
    points = data[1]
    context.write(key,(int(points),1))
    context.write(id_retado, (0,0))
    
    

def fcomb(key, value, context):
    p = 0
    c = 0
    for v in value: 
        p += v[0]
        c += v[1]
    context.write(key, (p,c))

def freduce1 (key, value, context):
    total_points = 0
    n = 0
    for v in value:
        total_points += v[0]
        n += v[1]
    context.write(key, (total_points + 1)/(n + 1))

def fmap2(key,value,context):
    context.write(1, (key, float(value)))

def freduce2(key,value,context):
    max_id = -1
    max_points = 0
    for v in value:
        if (max_points < v[1]):
            max_points = v[1]
            max_id = v[0]
    context.write(max_id, max_points)

job1 = Job(input_dir,output_dir1,fmap1,freduce1)
job1.setCombiner(fcomb)
job2 = Job(output_dir1,output_dir2,fmap2,freduce2)
job1.waitForCompletion()
job2.waitForCompletion()


True

## Eje. 3

Para esta consulta implementamos dos Jobs, el primero se encarga de filtrar las combinaciones retador-retado devolviendo una sola tupla por cada combinación diferente el segundo utiliza esto para detectar cuales tienen mas de H (valor que ingresa por parámetro) retados diferentes.

In [24]:
from MRE import Job

root_path = './' #Simplemente para poder usar rootpaths
input_path = root_path + "input/"
output_path = root_path + "output/"
input_dir = input_path
output_dir1 = output_path + "eje3a/"
output_dir2 = output_path + "eje3b/"

def fmap1 (key, value, context):
    data = value.split('\t')
    id_retado = data[0]
    context.write((key, id_retado), 1)
    

def freduce1 (key, value, context):
    for v in value:
       res = v
    context.write(key, res)

def fmap2(key,value,context):
    context.write(key, 1)

def freduce2(key,value,context):
    c = 0
    for v in value: 
        c += 1
    if (c >= context["h"]):
        context.write(key, c)

param = {"h": 12}
job1 = Job(input_dir,output_dir1,fmap1,freduce1)
job1.setCombiner(fcomb)
job2 = Job(output_dir1,output_dir2,fmap2,freduce2)
job2.setParams(param)
job1.waitForCompletion()
job2.waitForCompletion()


True

## Eje. 4

En este ejercicio se implemento un lógica de joins para resolver la generación de parámetros necesarias para el calculo. Ademas el calculo de los máximos y del error también fueron implementados utilizando Jobs, ya que consideramos que también son problemas de Big Data, si bien el ejercicio copila y corre, los resultados obtenidos difieren de los enviados por la cátedra, no pudimos encontrar el problema que genera dicha diferencia.


In [None]:
from MRE import Job

root_path = './' 
input_path = root_path + "input/"
output_path = root_path + "output/"
initial_input_dir = input_path
pp_input_dir = output_path +"eje2a/"
ids_input_dir = output_path +"eje3a/"
ph_output_dir = output_path + "ph/"
ph_prev_output_dir = output_path + "ph_prev/"
join1_output_path = output_path + "join1/"
join2_output_path = output_path + "join2/"
join3_output_path = output_path + "join3/"
mse_output_path = output_path + "mse/"
max_output_path = output_path + "final/"

def fmap1(key, value, context):
    data = value.split('\t')
    id_retado = data[0]
    context.write(key,1)
    context.write(id_retado,1)

def fred1(key,value,context):
    context.write(key,context['ph'])

def fmapCopy(key, value, context):
    context.write(key, float(value))

def fredCopy(key, value, context):
    for v in value:
        context.write(key, v)

def fmap2Avg (key, value, context):
    context.write(('PP', key), value)

def fmap2Ids(key, value, context):
    data = value.split()
    id_retado = data[0]
    context.write(('IDs', key), id_retado)

def fshuffle(key1, key2):
    if (key1[1] == key2[1]):
        return 0
    elif (key1[1] < key2[1]):
        return -1
    else: 
        return 1

def fsort(key1,key2):
    if (key1[0] == key2[0]):
        return 0
    elif (key1[0] == 'PP'):
        return -1
    else:
        return 1

    
def fred2(key, value, context):
    pp = None
    id_retado = 0
    
    for v in value:
        if pp is None: 
            pp = v
        else:           
            id_retado = v
            context.write(key[1], (id_retado, pp))
    
    if id_retado == 0:
        context.write(key[1], (0, pp))

def fmap3Avg (key, value, context):
    context.write(('PP', key), (value,0))
    
def fmap3IDs(key, value, context):
    data = value.split()
    id_retado = data[0]
    pp1 = data[1]
    context.write(('IDs', id_retado), (key,pp1))

def fred3(key,value,context):
    pp2 = None
    id_retador = None
    pp1 = None
    
    for v in value:
        if pp2 is None:
            if key[1] == "0":
                pp2 = 1.0
                id_retador = v[0]
                pp1 = float(v[1])
                context.write(id_retador, (key[1], pp1, pp2))
            else:
                pp2 = float(v[0])
        else:
            id_retador = v[0]
            pp1 = float(v[1])
            context.write(id_retador, (key[1], pp1, pp2))
        print(key, v)

def fmap4Avg (key, value, context):
    context.write(('PP', key), value) 
    
def fmap4IDs(key, value, context):
    data = value.split()
    id_retado = data[0]
    pp1 = data[1]
    pp2 = data[2]
    context.write(('IDs', id_retado), (key,pp1,pp2))

def fred4(key,value,context):
    ph = None
    
    for v in value:
        if ph is None and key[1] != "0":  
            ph = float(v[0])
        elif ph is None:
            ph = 1.0
            id_retador = v[0]
            pp1 = float(v[1])
            pp2 = float(v[2])
            context.write(id_retador, (key[1],pp1,pp2,ph))
        else:
            id_retador = v[0]
            pp1 = float(v[1])
            pp2 = float(v[2])
            context.write(id_retador, (key[1],pp1,pp2,ph))
        
def fmapPh(key,value,context):
    id_retador = key
    data = value.split()
    pp1 = float(data[1])
    pp2 = float(data[2])
    pph = float(data[3])
    calc = pph * pp1/pp2
    context.write(id_retador, calc)

def fcomb(key,value,context):
    c = 0
    for v in value: 
        c += v
    context.write(key, c)

def fredPh(key,value,context):
    total_sum = 0
    alpha = context["alpha"]
    for v in value: 
        total_sum += v
    final_ph = alpha * total_sum + (1 - alpha)
    context.write(key, final_ph)

def fmap1MSE1(key, value, context):
    context.write(key, ('act', float(value)))

def fmap2MSE1(key, value, context):
    context.write(key,('ant', float(value)))

def fredMSE1(key,value,context):
    for v in value:
        if (v[0] == "act"):
            act = v[1]
        else:
            ant = v[1]
    context.write(key, (act - ant) ** 2)

def fmapMSE2(key,value,context):
    context.write(1,float(value))

def fredMSE2(key,value,context):
    total = 0 
    for v in value:
        total += v
    context.write(1,total)
    
def getSum(path):
    try:
        with open(path,"r") as f:
            for line in f:
                data = line.split()
                return float(data[1])
    except:
        return 1.0 

def fmapMax(key, value, context):
    context.write(1,(key, float(value)))

def freduceMax(key, value, context):
    top_10 = []
    
    for v in value:
        id_jugador,ph_value = v 
        
        if len(top_10) < 10:
            top_10.append((id_jugador, ph_value))
            top_10.sort(key=lambda x: x[1], reverse=True)
        else:
            if ph_value > top_10[0][1]: 
                top_10[0] = (id_jugador, ph_value)
                top_10.sort(key=lambda x: x[1], reverse=True)
    
    for id_jugador,ph_value in top_10:
        context.write(id_jugador,ph_value)

ph = Job(initial_input_dir,ph_output_dir,fmap1,fred1)
ph.setParams({'ph': 1.0})

copy_ph = Job(ph_output_dir, ph_prev_output_dir, fmapCopy, fredCopy)

join1 = Job(pp_input_dir,join1_output_path,fmap2Avg,fred2)
join1.addInputPath(ids_input_dir,fmap2Ids)
join1.setShuffleCmp(fshuffle)
join1.setSortCmp(fsort)

join2 = Job(pp_input_dir,join2_output_path,fmap3Avg,fred3)
join2.addInputPath(join1_output_path,fmap3IDs)
join2.setShuffleCmp(fshuffle)
join2.setSortCmp(fsort)

join3 = Job(ph_output_dir,join3_output_path,fmap4Avg,fred4) 
join3.addInputPath(join2_output_path,fmap4IDs)
join3.setShuffleCmp(fshuffle)
join3.setSortCmp(fsort)

calc_ph = Job(join3_output_path,ph_output_dir,fmapPh,fredPh)
calc_ph.setCombiner(fcomb)
calc_ph.setParams({'alpha': 0.1}) 

mse1 = Job(ph_output_dir,mse_output_path,fmap1MSE1,fredMSE1)
mse1.addInputPath(ph_prev_output_dir,fmap2MSE1)
mse2 = Job(mse_output_path, mse_output_path, fmapMSE2, fredMSE2)
mse2.setCombiner(fcomb)

max_ids = Job(ph_output_dir, max_output_path, fmapMax, freduceMax)

cota_error = 0.1
ph.waitForCompletion()
join1.waitForCompletion()
join2.waitForCompletion()
error = 1
i = 0
while(error >= cota_error):
    i += 1
    copy_ph.waitForCompletion()
    join3.waitForCompletion()
    calc_ph.waitForCompletion()
    mse1.waitForCompletion()
    mse2.waitForCompletion()
    error = getSum(mse_output_path + "output.txt")
    print(error)
print(f"Iteraciones completadas: {i}")
max_ids.waitForCompletion()

('IDs', '0') ('114', '1.0')
('IDs', '0') ('6', '1.0')
('IDs', '0') ('119', '1.0')
('IDs', '0') ('101', '1.0')
('IDs', '0') ('2', '1.0')
('IDs', '0') ('99', '1.0')
('IDs', '0') ('105', '1.0')
('IDs', '0') ('139', '1.0')
('IDs', '0') ('106', '1.0')
('IDs', '0') ('24', '1.0')
('IDs', '0') ('92', '1.0')
('IDs', '0') ('112', '1.0')
('IDs', '1') ('1818.2631578947369', 0)
('IDs', '1') ('40', '1812.6666666666667')
('IDs', '1') ('28', '1784.6666666666667')
('IDs', '1') ('37', '1731.5555555555557')
('IDs', '1') ('84', '1413.0')
('IDs', '1') ('13', '2894.25')
('IDs', '1') ('56', '1874.25')
('IDs', '1') ('67', '2682.875')
('IDs', '1') ('77', '1803.7142857142858')
('IDs', '1') ('59', '2227.769230769231')
('IDs', '1') ('89', '2845.8333333333335')
('IDs', '1') ('20', '2774.25')
('IDs', '1') ('29', '1716.5')
('IDs', '1') ('98', '2620.6')
('IDs', '1') ('25', '2261.5')
('IDs', '10') ('994.5', 0)
('IDs', '10') ('1', '1818.2631578947369')
('IDs', '10') ('58', '1884.1666666666667')
('IDs', '100') ('2265.0'

True