<h1>Práctica 1 PySpark</h1>  

<h5>Autor: Guillermo García López<h5>

Procesado de un fichero de datos de sensores
Vamos a trabajar con el conjunto de datos del Heterogeneity Dataset for Human Activity Recognition
(HHAR) que contiene información de los sensores de movimientos de teléfonos y relojes. El enlace a los
datos es: https://archive.ics.uci.edu/ml/datasets/Heterogeneity+Activity+Recognition+Data+Set .

Los datos contienen mediciones de sensores de movimiento mientras los usuarios realizaban determinadas
acciones. El objetivo del conjunto de datos es el de reconocer las acciones que los usuarios realizan.
Aunque en esta práctica nos limitaremos a procesar el fichero, que sería en cualquier caso el paso previo
necesario al reconocimiento. Las posibles acciones que contiene el fichero son: ‘Biking’, ‘Sitting’,
‘Standing’, ‘Walking’, ‘Stair Up’ y ‘Stair down’. Los sensores medidos son: giróscopo y acelerómetro.
Los tipos de dispositivos son teléfonos y relojes.

Los fichero que vamos a utilizar son: Phones_accelerometer.csv, Phones_gyroscope.csv,
Watch_accelerometer.csv y Watch_gyroscope.csv. Las columnas de los ficheros son: 'Index',
'Arrival_Time', 'Creation_Time', 'x', 'y', 'z', 'User', 'Model', 'Device', 'gt'. El contenido de las distintas
columnas es:

- Index: El identificador del registro.

- Arrival_Time: el tiempo de la medición cuando la medida llega a la aplicación.

- Creation_Time: Timestamp dado por el SO.

- X,y,z: Valores de la medición dados por en los ejes: x,y,z.

- User: Identificador del usuario que realiza la acción con valores de ‘a’ a ‘i’.

- Model: Modelo del teléfono/reloj.

- Device: El aparato concreto que toma las mediciones. Para un mismo modelo pueden tener varios
aparatos.

- Gt: Actividad que el usuario está realizando de entre: bike sit, stand, walk, stairsup, stairsdown
and null.




Para cada ejecución de una acción por parte de un usuario, los ficheros contienen una serie de filas
(mediciones) que describen el movimiento. El objetivo será el agregar usando como clave primaria la terna
usuario (User), modelo (Model) y movimiento ejecutado (gt). En concreto, hay que crear un RDD (por
cada fichero) con un registro por cada usuario, modelo y clase con la media, desviación estándar y valor
máximo y mínimo de la secuencia del movimiento ejecutado. Una vez hecho esto, se deberá concatenar
mediante join los registros de giróscopo y acelerómetro de los relojes por un lado y de los teléfonos por
otro. Finalmente se creará un RDD único (mediante union) con los RDDs de teléfonos y relojes.

Ejemplo del fichero:
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

Ejemplo de juguete:

0,1424696633908,1424696631913248572,-1.0,0.6,8.2,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.0,0.8,8.2,a,nexus4,nexus4_1,stand

RDD de salida posible tras procesar el fichero (pueden tener un formato distinto pero debe incluir esta
información):

User,Model,gt,media(x,y,z),desviacion(x,y,z),max(x,y,z),min(x,y,z)

a, nexus4,stand,-3.0,0.7,8.2,2.8,0.14,0.0,-1.0,0.8,8.2,-5.0,0.6,8.2

Analiza los tiempos de ejecución variando el número de cores a utilizar.

<h6>Resolución:</h6>

In [None]:
# Librerías Spark:
from pyspark import SparkContext   
from pyspark import SparkConf
from statistics import mean, stdev # media, stdev de librería statistics
import time as t                   # medida de tiempo para cálculo de tiempos de ejecución

Se "detiene" el SparkContext antes de resetear la config para cambiar número de cores (este comando no debería
ejecutarse en la primera ejecución al no haber creada una SparkSession):

In [None]:
sc.stop()

Una vez realizada la ejecución con un número determinado de cores, se cambia la configuración para modificar 
dicho número:

In [None]:
#cores = 4
#sc = SparkContext(conf=SparkConf().set("spark.executor.cores", cores)
#                                  .set("spark.driver.cores", cores))
sc = SparkContext(conf=SparkConf())

Tiempo inicial de ejecución:

In [None]:
init_time = t.time()

In [None]:
'''Se crea una función que crea un RDD mediante la importación de un fichero csv, y devuelve los datos en tupla con
el siguiente orden: User | Model | Gt | Index | Arrival_Time | Creation_Time | x | y | z | Device , casteando
los datos según el tipo de dato.

Input  -> nombre de fichero .csv
Output -> rdd con formato User | Model | Gt | Index | Arrival_Time | Creation_Time | x | y | z | Device

'''

def creaRDD(file):
    
    RDD = sc.textFile(file) \
            .map(lambda el: el.split(',')) \
            .map(lambda el: (el[6], el[7], el[9], int(el[0]), int(el[1]), float(el[2]), \
                             float(el[3]), float(el[4]), float(el[5]), el[8]) ) 
            
    return RDD

In [None]:
aux1 = creaRDD("./small_data/Phones_accelerometer.csv")
#print(aux1.collect()[0])

In [None]:
'''Función a la cual se le pasa un RDD y una variable tipo string x,y ó z y retorna otro RDD con (clave 3 campos), 
media(col), std(col), max(col), min(col). Se usan las funciones mean, std de la librería statistics.

Input: rdd a tratar; variable string a seleccionar
Output: rdd con ((clave), media, std, max, min)
'''

def calc_estad(rdd, var):
    if var == 'x':
        alt = rdd.map(lambda el: ( (el[0], el[1], el[2]) , el[6]) )
    elif var == 'y':
        alt = rdd.map(lambda el: ( (el[0], el[1], el[2]) , el[7]) )
    else:
        alt = rdd.map(lambda el: ( (el[0], el[1], el[2]) , el[8]) )
        
    rdd_est = alt.groupByKey() \
                 .map(lambda el: (el[0], list(el[1]) ) ) \
                 .map(lambda el: (el[0], (mean(el[1]), stdev(el[1]), max(el[1]), min(el[1])) ) ) 
            
            
    return rdd_est

In [None]:
# Para tener en un mismo RDD todos los datos de las tres variables x, y, z, se fusionan mediante join's:
rddPhAc = calc_estad(aux1, 'x').join(calc_estad(aux1, 'y')).join(calc_estad(aux1, 'z'))
#print(rddPhAc.collect())

In [None]:
# Se repite el tratamiento para el fichero de Phones_gyroscope.csv:
aux2 = creaRDD("./small_data/Phones_gyroscope.csv")
#print(aux2.collect()[0])

In [None]:
#print(calc_estad(aux2, 'x').collect())

In [None]:
rddPhGyr = calc_estad(aux2, 'x').join(calc_estad(aux2, 'y')).join(calc_estad(aux2, 'z'))
#print(rddPhGyr.collect())

In [None]:
# Una vez que tenemos los RDDs resultantes de los ficheros Phones_accelerometer.csv y Phones_gyroscope.csv
# realizamos join para fusionar ambos:
rddPhones = rddPhAc.join(rddPhGyr)

In [None]:
#print(rddPhones.collect())

In [None]:
# Repetimos el tratamiento para ambos ficheros de relojes Watch_accelerometer.csv y Watch_gyroscope.csv:
# Watch_accelerometer.csv
aux3 = creaRDD("./small_data/Watch_accelerometer.csv")
#print(aux3.collect()[0])

In [None]:
#print(calc_estad(aux3, 'x').collect())

In [None]:
rddWatAc = calc_estad(aux3, 'x').join(calc_estad(aux3, 'y')).join(calc_estad(aux3, 'z'))
#print(rddWatAc.collect())

In [None]:
# Watch_gyroscope.csv
aux4 = creaRDD("./small_data/Watch_gyroscope.csv")
#print(aux4.collect()[0])

In [None]:
#print(calc_estad(aux4, 'x').collect())

In [None]:
rddWatGyr = calc_estad(aux4, 'x').join(calc_estad(aux4, 'y')).join(calc_estad(aux4, 'z'))
#print(rddWatGyr.collect())

In [None]:
# Se fusionan ambos con join's:
rddWatches = rddWatAc.join(rddWatGyr)
#print(rddWatches.collect())

In [None]:
# Finalmente se unen los RDD's de teléfonos y relojes:
_rdd = rddPhones.union(rddWatches)

In [None]:
# Y se muestran los resultados:
#print(_rdd.collect())

Nótese que el formato del fichero resultante tras el tratamiento y concatenación de los 4 ficheros de sensores no es exactamente el pedido, si bien contiene toda la información requerida en el enuncciado 
con clave común para los ficheros y valor de cada fichero y en el formato siguiente:

[(clave de campos User + Model + Gt), valores_x_fich1, valores_y_fich1, valores_z_fich1, 
                                      valores_x_fich2, valores_y_fich2, valores_z_fich2,
                                      ..................................................]

Tiempo final de ejecución:

In [None]:
end_time = t.time()

In [None]:
total_time = end_time - init_time
#print(total_time)

<h6>Análisis de tiempos variando cores:</h6>
Los resultados de ejecuciones tras variar el número de cores son los siguientes:

- Tiempo de ejecución con 1 core:  4.4723 s

- Tiempo de ejecución con 4 cores: 4.9908 s

- Tiempo de ejecución con 8 cores: 4.3156 s

es decir, con estas medidas no pueden sacarse conclusiones acerca de cómo afecta el número de cores al rendimiento.



In [None]:
# Añadido para la ejecución desde otro notebook de la P2:
def foo():
    print(total_time)
foo()