# Práctica 1

## Ejercicio 1: MapReduce.

### Ejercicio 1.1: Contador de clientes valorados por países

En primer lugar, se partirá de los ficheros "clientes.csv" y "countries.csv" ubicados en el directorio relativo "./datos". El objetivo de este ejercicio es la producción de un fichero "recuento_buenos.csv" con la estructura que contendrá: una columna con los los distintos países del fichero "countries.csv" y una segunda columna con un valor numérico que corresponderá con la cantidad de clientes "buenos" de ese país. 

Con el fin de conseguir el fichero mencionado, se modifica el código proporcionado en el notebook "mrjob-join.ipynb" visto en el curso de la manera oportuna.

In [None]:
! mkdir -p datos

In [None]:
import os
os.chdir("./datos")
os.environ["HADOOP_HOME"] = '/usr/lib/hadoop-3.3.6'
os.environ["PATH"] = os.environ["PATH"] + ":" + os.environ["HADOOP_HOME"] +"/bin"
os.environ["HADOOP_CONF_DIR"] = os.environ["HADOOP_HOME"] + "/etc/hadoop"

In [None]:
%%writefile mrjob-recuento.py
from mrjob.job import MRJob

class MRJoinModified(MRJob):

    # Realiza la ordenación secuandaria
    SORT_VALUES = True

    def mapper(self, _, line):
        splits = line.rstrip("\n").split(",")

        if len(splits) == 2:  # Datos de paises
            symbol = 'A'  # Ordena los datos de paises antes que los datos de personas
            country2digit = splits[1] # Valor con el que realiza el cruce
            country_name = splits[0]
            yield country2digit, [symbol, splits]
        else:  # Datos de personas
            symbol = 'B'
            country2digit = splits[2] # Valor con el que realiza el cruce
            yield country2digit, [symbol, splits]

    def reducer(self, key, values):
        countries = [] # Añade países
        good_ratings_count = 0 # Recuento de buenos clientes por país

        for value in values:
            if value[0] == 'A':
                countries.append([value[1][0]]) # Corchetes extra para adecuarse a la estructura especificada en el enunciado
            if value[0] == 'B' and value[1][1] == 'bueno':
                good_ratings_count += 1

        for country in countries:
            if good_ratings_count > 0: # No se añaden países sin clientes "buenos"
                yield country, good_ratings_count

if __name__ == '__main__':
    MRJoinModified.run()
    

Una vez modificado el código, se porcede a ejecutar en el entorno hadoop. Para ello, en primer lugar se suben los archivos al cluster. Finalmente se lanza la job que permite obtener el fichero resultado.

In [None]:
! hdfs dfs -mkdir /datos
! hdfs dfs -put ./countries.csv  /datos
! hdfs dfs -put ./clients.csv  /datos

In [None]:
! python3 mrjob-recuento.py hdfs:///datos -r hadoop --output-dir hdfs:///recuento_buenos

In [None]:
! hdfs dfs -tail /recuento_buenos/part-00000

### Ejercicio 1.2: País con mejores clientes.

En esta segunda parte, se querrá obtener algún país cuyo recuento de clientes buenos sea máximo. Para lograr el objetivo, se puede modificar el código anteriormente desarrollado incluyendo otro reducer que reciba el output del primero para que, ya teniendo todos la misma clave, se pueda calcular el máximo.

In [None]:
%%writefile mrjob-recuento-mejor.py
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRJoinModified(MRJob):

    # Realiza la ordenación secuandaria
    SORT_VALUES = True

    def mapper(self, _, line):
        splits = line.rstrip("\n").split(",")

        if len(splits) == 2:  # Datos de paises
            symbol = 'A'  # Ordena los datos de paises antes que los datos de personas
            country2digit = splits[1] # Valor con el que realiza el cruce
            country_name = splits[0]
            yield country2digit, [symbol, splits]
        else:  # Datos de personas
            symbol = 'B'
            country2digit = splits[2] # Valor con el que realiza el cruce
            yield country2digit, [symbol, splits]

    def reducer(self, key, values):
        countries = [] # Añade países
        good_ratings_count = 0 # Recuento de buenos clientes por país

        for value in values:
            if value[0] == 'A':
                countries.append([value[1][0]]) # Corchetes extra para adecuarse a la estructura especificada en el enunciado
            if value[0] == 'B' and value[1][1] == 'bueno':
                good_ratings_count += 1

        for country in countries:
            if good_ratings_count > 0: # No se añaden países sin clientes "buenos"
                yield None, [country, good_ratings_count]

    def reducer_max(self, _, values): # Segundo reducer sin clave para calcular el máximo
        max_value = max(values, key = lambda x: x[1])
        yield max_value[1], max_value[0]
    
    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer),
            MRStep(reducer=self.reducer_max)
        ]

        
if __name__ == '__main__':
    MRJoinModified.run()
    

In [None]:
! hdfs dfs -rm -r /recuento_mejor

In [None]:
! python3 mrjob-recuento-mejor.py hdfs:///datos -r hadoop --output-dir hdfs:///recuento_mejor

In [None]:
! hdfs dfs -tail /recuento_mejor/part-00000

### Ejervivio 1.3: Mejorando el país con mejores clientes.

En este último apartado del ejercicio, se mejora el código anterio para que, en caso de haber más de un país con el número máximo de buenos clientes, se muestren todos. Nuevamente, se modificará el código del apartado anterior para lograr el objetivo.

In [None]:
%%writefile mrjob-recuento-mejores.py
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRJoinModified(MRJob):

    # Realiza la ordenación secuandaria
    SORT_VALUES = True

    def mapper(self, _, line):
        splits = line.rstrip("\n").split(",")

        if len(splits) == 2:  # Datos de paises
            symbol = 'A'  # Ordena los datos de paises antes que los datos de personas
            country2digit = splits[1] # Valor con el que realiza el cruce
            country_name = splits[0]
            yield country2digit, [symbol, splits]
        else:  # Datos de personas
            symbol = 'B'
            country2digit = splits[2] # Valor con el que realiza el cruce
            yield country2digit, [symbol, splits]

    def reducer(self, key, values):
        countries = [] # Añade países
        good_ratings_count = 0 # Recuento de buenos clientes por país

        for value in values:
            if value[0] == 'A':
                countries.append([value[1][0]]) # Corchetes extra para adecuarse a la estructura especificada en el enunciado
            if value[0] == 'B' and value[1][1] == 'bueno':
                good_ratings_count += 1

        for country in countries:
            if good_ratings_count > 0: # No se añaden países sin clientes "buenos"
                yield None, [country, good_ratings_count]

    def reducer_max(self, _, values): # Segundo reducer sin clave para calcular el máximo
        max_count = -1
        max_countries = []

        for country, count in values:
            if count > max_count:
                max_count = count
                max_countries = [country]
            elif count == max_count:
                max_countries.append(country)

        for country in max_countries:
            yield max_count, country

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer),
            MRStep(reducer=self.reducer_max)
        ]

        
if __name__ == '__main__':
    MRJoinModified.run()
    

In [None]:
! hdfs dfs -rm -r /recuento_mejores

In [None]:
! python3 mrjob-recuento-mejores.py hdfs:///datos -r hadoop --output-dir hdfs:///recuento_mejores

In [None]:
! hdfs dfs -tail /recuento_mejores/part-00000

## Ejercicio 2: Hive

### Ejercicios 2.1 y 2.2

En primer lugar, se crearán las bases de datos exigidas por el ejercicio. En este caso se opta por una base de datos interna para el fichero "API_SE.PRM.CMPT.FE.ZS_DS2_es_csv_v2_5641106.csv" puesto que al tratarse de un fichero que contiene, en esencia, series temporales, su rápido acceso y tratamiento es una prioridad. Por otra parte, para el fichero "Metadata_Country_API_SE.PRM.CMPT.FE.ZS_DS2_es_csv_v2_5641106.csv" se optará por una tabla externa por no ser una tabla que varíe su estructura (contiene datos estáticos sobre países y como mucho cambiará algún valor), ergo se prioriza la fácil actualización de la misma. 

Posteriormente se cargarán los datos desde los ficheros proporcionados (a los que se les ha hecho una ligera modificación para que solo contengan los datos pertinentes).

In [None]:
os.chdir("..")

In [None]:
! mkdir -p hive_datos

In [None]:
os.chdir("hive_datos")
os.environ["HIVE_HOME"] = '/usr/lib/apache-hive-3.1.3-bin/'
os.environ["PATH"] = os.environ["PATH"] + ":" + os.environ["HIVE_HOME"] +"/bin"
os.environ["HIVE_CONF_DIR"] = os.environ["HIVE_HOME"] + "/conf"

In [None]:
! hdfs dfs -mkdir /datos_hive
! hdfs dfs -put ./Metadata_Country_API_SE.PRM.CMPT.FE.ZS_DS2_es_csv_v2_5641106.csv /datos_hive
! hdfs dfs -put ./API_SE.PRM.CMPT.FE.ZS_DS2_es_csv_v2_5641106.csv /datos_hive

In [None]:
! hdfs dfs -ls /datos_hive

In [None]:
hiveql_script = """
CREATE database IF NOT EXISTS tablas_practica
COMMENT 'BD para la realización de TP1'
LOCATION '/datos_hive'
With dbproperties ('Creada por'='Daniel Rece','Creada el'='19-Nov-2023');

USE tablas_practica
CREATE TABLE IF NOT EXISTS tabla_interna (
    Country_Name STRING,
    Country_Code STRING,
    Indicator_Name STRING,
    Indicator_Code STRING,"""

for year in range(1960, 2023, 1):
    hiveql_script = hiveql_script + f"""
    {year} DOUBLE,"""

hiveql_script = hiveql_script[0:len(hiveql_script)-1] + f""") 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';"""

with open('crear_tabla_interna.hql', 'w') as file:
    file.write(hiveql_script)


In [None]:
! beeline -u "jdbc:hive2:///" -f crear_tabla_interna.hql

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -e "load data inpath '/datos_hive/API_SE.PRM.CMPT.FE.ZS_DS2_es_csv_v2_5641106.csv' into table tabla_interna;"

In [None]:
hiveql_script = """
CREATE EXTERNAL TABLE IF NOT EXISTS tabla_externa (
    Country_Name STRING,
    Country_Code STRING,
    Region STRING,
    Income_Group STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/datos_hive/Metadata_Country_API_SE.PRM.CMPT.FE.ZS_DS2_es_csv_v2_5641106.csv';
"""
with open('crear_tabla_externa.hql', 'w') as file:
    file.write(hiveql_script)

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -f crear_tabla_externa.hql

### Ejercicio 2.3

Se crea la vista exigida en el enunciado.

In [None]:
hiveql_view = """
CREATE VIEW vista_pais_info AS
SELECT
    i.Country_Name,
    i.Country_Code,
    i.`2018` AS Rate_2018,
    e.Income_Group
FROM
    tabla_interna i
JOIN
    tabla_externa e
ON
    i.Country_Code = e.Country_Code;
"""

with open('crear_vista_pais_info.hql', 'w') as file:
    file.write(hiveql_view)

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -f crear_vista_pais_info.hql

### Ejercicio 2.4

Se crean y ejecutan consultas que respondan a las preguntas realizadas en el enunciado.

In [None]:
# Tasa en el año 2018 en España
tasa_2018_esp = """
SELECT Rate_2018
FROM vista_pais_info
WHERE Country_Name = 'Spain';
"""

# Media de las tasas del 2018 para países de ingreso bajo
media_tasa_2018_ingreso_bajo = """
SELECT AVG(Rate_2018) AS Avg_Rate_2018_LowIncome
FROM vista_pais_info
WHERE Income_Group = '"Países de ingreso bajo"';
"""


# Cinco países con mayor tasa en 2020
top_paises_2020 = """
SELECT Country_Name, Rate_2020
FROM (
    SELECT Country_Name, Rate_2020,
    ROW_NUMBER() OVER (ORDER BY Rate_2020 DESC) AS rank
    FROM vista_pais_info
) ranked
WHERE rank <= 5;
"""

# Número de países de Oriente Medio y Norte de Ágfrica (excluido altos ingresos) en cada grupo de ingresos
paises_por_grupo_ingresos = """
SELECT Income_Group, COUNT(*) AS Num_Countries
FROM vista_pais_info
WHERE Region = '"Oriente Medio y Norte de África (excluido altos ingresos)"'
GROUP BY Income_Group;
"""


with open('consulta1.hql', 'w') as file:
    file.write(tasa_2018_esp)
with open('consulta2.hql', 'w') as file:
    file.write(media_tasa_2018_ingreso_bajo)
with open('consulta3.hql', 'w') as file:
    file.write(top_paises_2020)
with open('consulta4.hql', 'w') as file:
    file.write(paises_por_grupo_ingresos)

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -e consulta1.hql

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -e consulta2.hql

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -e consulta3.hql

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -e consulta4.hql

### Ejercicio 1.5

Para realizar este ejercicio, se emplea el dataset: https://www.kaggle.com/datasets/dillonmyrick/high-school-student-performance-and-demographics/data

In [None]:
! hdfs dfs -put ./student_math_clean.csv /datos_hive
! hdfs dfs -put ./student_portuguese_clean.csv /datos_hive

In [None]:
table_math = """
CREATE database IF NOT EXISTS students
COMMENT 'BD para la realización de TP1, ejercicio 2.5'
LOCATION '/datos_hive'
With dbproperties ('Creada por'='Daniel Rece','Creada el'='20-Nov-2023');

USE students
CREATE EXTERNAL TABLE IF NOT EXISTS student_math (
    school STRING,
    sex STRING,
    age INT,
    address_type STRING,
    family_size STRING,
    parent_status STRING,
    mother_education STRING,
    father_education STRING,
    mother_job STRING,
    father_job STRING,
    reason STRING,
    guardian STRING,
    travel_time STRING,
    study_time STRING,
    class_failures INT,
    school_support STRING,
    family_support STRING,
    extra_paid_classes STRING,
    activities STRING,
    nursery STRING,
    higher_ed STRING,
    internet STRING,
    romantic_relationship STRING,
    family_relationship INT,
    free_time INT,
    social INT,
    weekday_alcohol INT,
    weekend_alcohol INT,
    health INT,
    absences INT,
    grade_1 INT,
    grade_2 INT,
    final_grade INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/datos_hive/student_math_clean.csv';
"""
table_port = """
CREATE EXTERNAL TABLE IF NOT EXISTS student_portuguese (
    school STRING,
    sex STRING,
    age INT,
    address_type STRING,
    family_size STRING,
    parent_status STRING,
    mother_education STRING,
    father_education STRING,
    mother_job STRING,
    father_job STRING,
    reason STRING,
    guardian STRING,
    travel_time STRING,
    study_time STRING,
    class_failures INT,
    school_support STRING,
    family_support STRING,
    extra_paid_classes STRING,
    activities STRING,
    nursery STRING,
    higher_ed STRING,
    internet STRING,
    romantic_relationship STRING,
    family_relationship INT,
    free_time INT,
    social INT,
    weekday_alcohol INT,
    weekend_alcohol INT,
    health INT,
    absences INT,
    grade_1 INT,
    grade_2 INT,
    final_grade INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/datos_hive/student_portuguese_clean.csv';
"""

with open('tabla_estudiantes_matematicas.hql', 'w') as file:
    file.write(table_math)
with open('tabla_estudiantes_portugues.hql', 'w') as file:
    file.write(table_port)

In [None]:
! beeline -u "jdbc:hive2:///" -f tabla_estudiantes_matematicas.hql

In [None]:

! beeline -u "jdbc:hive2:///students" -f tabla_estudiantes_portugues.hql

In [None]:
! beeline -u "jdbc:hive2:///students" -e "load data inpath '/datos_hive/student_math_clean.csv' into table tabla_interna;"

In [None]:
! beeline -u "jdbc:hive2:///tablas_practica" -e "load data inpath '/datos_hive/student_portuguese_clean.csv' into table student_portuguese;"

Con estos comandos, ya se habría creado y cargado el dataset. A continuación, se procederá a hacer las consultas.

In [None]:
# Esta consulta permite comprobar si las notas de matemáticas y portugués de los distintos alumnos están inversamente correlacionadas en casos no extremos.

consulta1 = """
CREATE TEMPORARY TABLE if not exists i_values_temp AS
SELECT i_value
FROM (
    SELECT 5 AS i_value UNION ALL
    SELECT 6 UNION ALL
    SELECT 7 UNION ALL
    SELECT 8 UNION ALL
    SELECT 9 UNION ALL
    SELECT 10 UNION ALL
    SELECT 11 UNION ALL
    SELECT 12 UNION ALL
    SELECT 13 UNION ALL
    SELECT 14 UNION ALL
    SELECT 15
) i_values;

CREATE TEMPORARY TABLE if not exists j_values_temp AS
SELECT j_value
FROM (
    SELECT 5 AS j_value UNION ALL
    SELECT 6 UNION ALL
    SELECT 7 UNION ALL
    SELECT 8 UNION ALL
    SELECT 9 UNION ALL
    SELECT 10 UNION ALL
    SELECT 11 UNION ALL
    SELECT 12 UNION ALL
    SELECT 13 UNION ALL
    SELECT 14 UNION ALL
    SELECT 15
) j_values;

SELECT i_value, j_value, COUNT(*) AS student_count
FROM i_values_temp i
JOIN j_values_temp j ON ABS(i.i_value - j.j_value) <= 10
JOIN student_math m 
JOIN student_portuguese p 
ON (m.final_grade > i.i_value) AND (p.final_grade < j.j_value)
GROUP BY i.i_value, j.j_value
ORDER BY student_count DESC
LIMIT 10;
"""

with open('consulta_ej2_5_1.hql', 'w') as file:
    file.write(consulta1)

In [None]:
! beeline -u "jdbc:hive2:///students" -e consulta1_ej2_5_1.hql

In [None]:
#Se calcula el promedio de notas finales por diferentes grupos de edad y el nivel de tiempo de estudio. El cruce se realiza por distintos campos al no disponer de un identificador unívoco.
consulta2 = """
SELECT 
    CASE 
        WHEN m.age BETWEEN 15 AND 17 THEN '15-17' 
        WHEN m.age BETWEEN 18 AND 20 THEN '18-20' 
        ELSE '21+'
    END AS age_group,
    CASE 
        WHEN m.study_time = '2 to 5 hours' THEN 'Moderate'
        WHEN m.study_time = '5 to 10 hours' THEN 'Above Average'
        WHEN m.study_time = '>10 hours' THEN 'High'
        ELSE 'Low'
    END AS study_time_level,
    AVG(m.final_grade) AS avg_final_grade_math
    AVG(p.final_grade) AS avg_final_grade_por
FROM student_math m 
JOIN student_portuguese p 
ON m.school = p.school AND m.sex = p.sex AND m.age = p.age AND m.address_type = p.address_type AND m.family_size = p.family_size AND m.parent_status = p.parent_status 
GROUP BY 
    CASE 
        WHEN m.age BETWEEN 15 AND 17 THEN '15-17' 
        WHEN m.age BETWEEN 18 AND 20 THEN '18-20' 
        ELSE '21+'
    END,
    CASE 
        WHEN m.study_time = '2 to 5 hours' THEN 'Moderate'
        WHEN m.study_time = '5 to 10 hours' THEN 'Above Average'
        WHEN m.study_time = '>10 hours' THEN 'High'
        ELSE 'Low'
    END
ORDER BY avg_final_grade DESC
LIMIT 10;
"""

with open('consulta_ej2_5_2.hql', 'w') as file:
    file.write(consulta2)

In [None]:
! beeline -u "jdbc:hive2:///students" -e consulta1_ej2_5_2.hql