# Computación a Gran Escala
**Máster Universitario en Ingeniería Informática (MUII) - Curso 2022/2023**

## Lab 5: Introducción a Spark

**Autores:**  
Miguel García González - miguel.garcia02@estudiante.uam.es  
Belén Vivas García - belen.vivas@estudiante.uam.es

In [1]:
import pyspark
import numpy as np

In [2]:
# Rutas de datos
PHONES_ACCELEROMETER_PATH = 'data/datos_sensores/activity_recognition_exp/Phones_accelerometer.csv'
PHONES_GYROSCOPE_PATH = 'data/datos_sensores/activity_recognition_exp/Phones_gyroscope.csv'
WATCH_ACCELEROMETER_PATH = 'data/datos_sensores/activity_recognition_exp/Watch_accelerometer.csv'
WATCH_GYRSOCOPE_PATH = 'data/datos_sensores/activity_recognition_exp/Watch_gyroscope.csv'

### Inicialización de PySpark

In [3]:
sc = pyspark.SparkContext()
sc



### Funciones para gestionar RDD

Esta lista de funciones nos servirán para crear, transformar y calcular los RDD.

In [4]:
"""
Function: create_RDD

Params:
    - filename: nombre del fichero
    
Return:
    RDD creado a partir del fichero de entrada

Esta función crea el RDD a partir del fichero CSV proporcionado
"""
def create_RDD(filename):
    
    rdd = sc.textFile(filename).map(lambda e: e.split(','))

    return rdd


"""
Function: process_RDD

Params:
    - rdd: RDD a procesar
    
Return:
    RDD procesado después de aplicar las transformaciones y realizar los cálculos necesarios

Esta función recibe el RDD creado y lo transforma de la siguiente manera:
1. Se queda con las columnas que necesitamos: [(User, Model, GT), x, y, z].
2. Realiza los cálculos de x, y y z de forma separada, en diferentes RDD.
3. Une los resultados de las tres variables y los reordena correctamente.
"""
def process_RDD(rdd):
    
    # Nos quedamos con las columnas que nos interesan
    rdd_reduced = rdd.map(lambda e: ((e[6], e[7], e[9]), float(e[3]), float(e[4]), float(e[5])))
    
    # Agrupamos las x y hacemos los calculos
    rdd_x = rdd_reduced.map(lambda e: (e[0], e[1]))
    rdd_x_processed = calculate_RDD(rdd_x)
    
    # Agrupamos las y y hacemos los calculos
    rdd_y = rdd_reduced.map(lambda e: (e[0], e[2]))
    rdd_y_processed = calculate_RDD(rdd_y)
    
    # Agrupamos las z y hacemos los calculos
    rdd_z = rdd_reduced.map(lambda e: (e[0], e[3]))
    rdd_z_processed = calculate_RDD(rdd_z)
    
    # Unimos las tres variables
    rdd_joined = rdd_x_processed.join(rdd_y_processed).join(rdd_z_processed)
    rdd_processed = arrange_RDD(rdd_joined)
    
    return rdd_processed


"""
Function: calculate_RDD

Params:
    - rdd: RDD de x, y o z
    
Return:
    RDD con los cálculos aplicados sobre la variable

Esta función recibe un RDD con forma [(User, Model, GT), variable], donde variable
es x, y o z, y calcula la media, desviación típica, máximo y mínimo de la variable
por cada clave diferente.
"""
def calculate_RDD(rdd):
    
    rdd_grouped = rdd.groupByKey().map(lambda e: (e[0], list(e[1])))
    rdd_calculated = rdd_grouped.map(lambda e: (e[0], (np.mean(e[1]), np.std(e[1]), max(e[1]), min(e[1]))))

    return rdd_calculated
    

"""
Function: arrange_RDD

Params:
    - rdd: RDD a reordenar

Return:
    RDD reordenado
    
Esta función reordena el RDD resultante de la función calculate_RDD de la siguiente forma:
[(User, Model, GT), (mean(x), mean(y), mean(z), std(x), std(y), std(z),
max(x), max(y), max(z), min(x), min(y), min(z))]
"""
def arrange_RDD(rdd):
    
    rdd_arranged = rdd.map(lambda e: (e[0], 
                                      (e[1][0][0][0], e[1][0][1][0], e[1][1][0], 
                                       e[1][0][0][1], e[1][0][1][1], e[1][1][1], 
                                       e[1][0][0][2], e[1][0][1][2], e[1][1][2], 
                                       e[1][0][0][3], e[1][0][1][3], e[1][1][3])))
    
    return rdd_arranged


"""
Function: join_RDD

Params:
    - pa_rdd: RDD de Phones Accelerometer
    - pg_rdd: RDD de Phones Gyroscope
    - wa_rdd: RDD de Watch Accelerometer
    - wg_rdd: RDD de Watch Gyroscope

Return:
    RDD final

Esta función une los RDD de Phone por un lado y de Watch por otro mediante join, y los une en
un RDD mediante union.
"""
def join_RDD(pa_rdd, pg_rdd, wa_rdd, wg_rdd):

    # Join de Phones
    phone_rdd = pa_rdd.join(pg_rdd)
    
    # Join de Watch
    watch_rdd = wa_rdd.join(wg_rdd)

    # Union de Phones y Watch
    rdd = phone_rdd.union(watch_rdd)
    
    return rdd

### Lectura de ficheros y creación de RDD

En primer lugar, obtenemos los RDD correspondientes a cada uno de los ficheros:  
- Phones accelerometer
- Phones gyroscope
- Watch accelerometer
- Watch gyroscope

In [5]:
pa_rdd = create_RDD(PHONES_ACCELEROMETER_PATH)
pg_rdd = create_RDD(PHONES_GYROSCOPE_PATH)
wa_rdd = create_RDD(WATCH_ACCELEROMETER_PATH)
wg_rdd = create_RDD(WATCH_GYRSOCOPE_PATH)

### Procesamiento de RDD

Una vez creados los RDD, los procesamos para obtener el RDD resultante de extraer de cada uno su clave primaria (User, Model, GT) junto con la media, desviación típica, máximo y mínimo de cada una de sus variables x, y y z.

In [6]:
# Procesamos los RDD
pa_processed = process_RDD(pa_rdd)
pg_processed = process_RDD(pg_rdd)
wa_processed = process_RDD(wa_rdd)
wg_processed = process_RDD(wg_rdd)

Como ejemplo, mostramos un resultado de cada uno de los cuatro ficheros con la forma:  
(User, Model, GT), (mean(x), mean(y), mean(z), std(x), std(y), std(z), max(x), max(y), max(z), min(x), min(y), min(z))

**Phones accelerometer**

In [7]:
print(pa_processed.take(1))

[(('c', 'samsungold', 'null'), (0.23917620702809797, -0.3349388667009023, 8.114421251815715, 5.834342661102344, 1.5177564524227478, 2.0131729101124827, 17.621105, 9.653300999999999, 19.45983, -18.846922, -9.500074, -0.153227))]


**Phones gyroscope**

In [8]:
print(pg_processed.take(1))

[(('f', 's3', 'walk'), (0.01842778726676459, -0.012660344126209897, -0.003222167901645703, 0.29382231255328833, 0.2630793962304212, 0.4430401480519867, 1.8900169999999998, 1.5717562, 2.1303926, -1.9104811000000002, -1.1539245, -3.3166927999999998))]


**Watch accelerometer**

In [9]:
print(wa_processed.take(1))

[(('c', 'lgwatch', 'null'), (4.92440607109189, -5.1472237238757765, 0.8241633636352657, 5.834276641040468, 3.202149495247256, 3.754576172258605, 19.32988, 11.912689, 19.927063, -19.591232, -19.7034, -19.274078))]


**Watch gyroscope**

In [10]:
print(wg_processed.take(1))

[(('c', 'lgwatch', 'null'), (-0.0031122214191367237, 0.01574602648442476, 0.018811255957697556, 1.7130154430471274, 0.8776247151651124, 1.0021970692411626, 17.910233, 5.9364624, 5.6636047, -15.238663, -5.51947, -10.028076))]


### Construcción del RDD final

Finalmente, unimos los RDD de los teléfonos por un lado y de los relojes por otro mediante join, y unimos ambos en un RDD final mediante union.  
Como ejemplo, mostramos los primeros registros del RDD final, donde podemos observar la unión de todos los elementos.

In [11]:
rdd = join_RDD(pa_processed, pg_processed, wa_processed, wg_processed)
print(rdd.collect()[0:4])

[(('i', 's3', 'bike'), ((-0.8305826346212585, 0.074882657116146, 9.51892129518975, 1.1528066129881929, 0.8627064202538209, 1.0888751804312171, 3.4859576, 5.2002063, 19.258959, -9.586383999999999, -3.7924154000000003, -1.6950948000000001), (0.009889294178583957, -0.005613697093895955, -0.007020121538478878, 0.24356565441931763, 0.22319712782799161, 0.21100458412821368, 1.1701124, 2.129171, 1.1294898000000002, -1.0491611, -1.5415184, -1.1890492))), (('i', 's3mini', 'sit'), ((4.7803812322584704, 0.615286979184887, 8.172919329285557, 0.05898355009577621, 0.09144798136573146, 0.03319988576359514, 4.996699, 2.3032220000000003, 9.615114, 3.1136593999999995, 0.10055647, 7.878121000000001), (-0.012646493707418594, -0.010443389742527556, -0.009518636978439566, 0.03379875702999631, 0.032931960966928533, 0.024685389389252455, 1.5765933, 1.7068782, 1.7303002, -3.2258923, -1.557075, -1.8464342))), (('b', 'nexus4', 'stairsup'), ((-3.7862716931892564, 0.050938464237275445, 9.161005535057651, 2.1697384