# Computación a Gran Escala
**Curso 2022/2023**

## Lab 6: Spark SQL - Tarea 2
**Autores:**  

Miguel García González - miguel.garcia02@estudiante.uam.es  
Belén Vivas García - belen.vivas@estudiante.uam.es

### Inicialización de Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as f
import os

# !pip install tabulate
from tabulate import tabulate

import numpy as np

In [2]:
spark = SparkSession.builder.appName('Lab6-T2').getOrCreate()
sc = spark.sparkContext



In [3]:
# Rutas de ficheros CSV
PHONES_ACCELEROMETER_PATH = 'data/datos_sensores/activity_recognition_exp/Phones_accelerometer.csv'
PHONES_GYROSCOPE_PATH = 'data/datos_sensores/activity_recognition_exp/Phones_gyroscope.csv'
WATCH_ACCELEROMETER_PATH = 'data/datos_sensores/activity_recognition_exp/Watch_accelerometer.csv'
WATCH_GYROSCOPE_PATH = 'data/datos_sensores/activity_recognition_exp/Watch_gyroscope.csv'

### 1. Formato Parquet

#### Funciones de gestión de ficheros

Estas funciones nos servirán para generar ficheros Parquet a partir de CSV y calcular y comparar sus distintos tamaños.

In [4]:
"""
Function: csv_to_parquet

Params:
    - csv_file: fichero CSV a convertir
    
Return:
    Ruta del fichero Parquet correspondiente al CSV

Esta funcion recibe un fichero CSV y genera un fichero Parquet a partir de este. Tiene en
cuenta la extension para cambiarla correctamente. Devuelve la ruta donde se ha generado el
nuevo fichero.
"""
def csv_to_parquet(csv_file):

    # Leemos el fichero CSV en un dataframe
    file_df = spark.read.csv(csv_file)
    
    # Creamos la ruta con la extension correcta
    filename = csv_file[:-4]
    parquet_filename = '{}.parquet'.format(filename)
    
    # Generamos el fichero Parquet
    file_df.write.mode('overwrite').parquet(parquet_filename)
    
    return parquet_filename


"""
Function: compare_files

Params:
    - pa_csv: ruta del fichero CSV de Phones accelerometer
    - pg_csv: ruta del fichero CSV de Phones gyroscope
    - wa_csv: ruta del fichero CSV de Watch accelerometer
    - wg_csv: ruta del fichero CSV de Watch gyroscope
    - pa_parquet: ruta del fichero Parquet de Phones accelerometer
    - pg_parquet: ruta del fichero Parquet de Phones gyroscope
    - wa_parquet: ruta del fichero Parquet de Watch accelerometer
    - wg_parquet: ruta del fichero Parquet de Watch gyroscope
    
Esta funcion calcula el tamano de cada uno de los ficheros CSV y Parquet y crea una tabla donde compara
los tamanos en Bytes.
"""
def compare_files(pa_csv, pg_csv, wa_csv, wg_csv, pa_parquet, pg_parquet, wa_parquet, wg_parquet):

    # Obtenemos los tamanos de los CSV
    pa_csv_size = os.path.getsize(pa_csv)
    pg_csv_size = os.path.getsize(pg_csv)
    wa_csv_size = os.path.getsize(wa_csv)
    wg_csv_size = os.path.getsize(wg_csv)
    
    # Obtenemos los tamanos de los Parquet
    pa_parquet_size = get_parquet_size(pa_parquet)
    pg_parquet_size = get_parquet_size(pg_parquet)
    wa_parquet_size = get_parquet_size(wa_parquet)
    wg_parquet_size = get_parquet_size(wg_parquet)
    
    # Generamos la tabla comparando los tamanos
    table_data = [["Phones accelerometer", pa_csv_size, pa_parquet_size],
                  ["Phones gyroscope", pg_csv_size, pg_parquet_size],
                  ["Watch accelerometer", wa_csv_size, wa_parquet_size],
                  ["Watch gyroscope", wg_csv_size, wg_parquet_size]]
    
    table_columns = ["File", "CSV Size (Bytes)", "Parquet Size (Bytes)"]
    
    table = tabulate(table_data, headers=table_columns, tablefmt="fancy_grid")
    print(table)


"""
Function: get_parquet_size

Params:
    - parquet_file: ruta del directorio
    
Return:
    Tamano de los ficheros del directorio

Esta funcion devuelve la suma del tamano de todos los ficheros del directorio pasado como argumento,
obteniendo asi su tamano real.
"""
def get_parquet_size(parquet_file):
    
    size = 0
    for f in os.scandir(parquet_file):
            size += os.path.getsize(f)
    
    return size

#### Comparación de tamaños

En primer lugar, generamos los cuatro ficheros Parquet correspondientes a los CSV:

In [5]:
PA_PARQUET_PATH = csv_to_parquet(PHONES_ACCELEROMETER_PATH)
PG_PARQUET_PATH = csv_to_parquet(PHONES_GYROSCOPE_PATH)
WA_PARQUET_PATH = csv_to_parquet(WATCH_ACCELEROMETER_PATH)
WG_PARQUET_PATH = csv_to_parquet(WATCH_GYROSCOPE_PATH)

Comparamos los tamaños de cada uno de ellos generando una tabla:

In [6]:
compare_files(PHONES_ACCELEROMETER_PATH,
             PHONES_GYROSCOPE_PATH,
             WATCH_ACCELEROMETER_PATH,
             WATCH_GYROSCOPE_PATH,
             PA_PARQUET_PATH,
             PG_PARQUET_PATH,
             WA_PARQUET_PATH,
             WG_PARQUET_PATH)

╒══════════════════════╤════════════════════╤════════════════════════╕
│ File                 │   CSV Size (Bytes) │   Parquet Size (Bytes) │
╞══════════════════════╪════════════════════╪════════════════════════╡
│ Phones accelerometer │         1291856327 │              312868407 │
├──────────────────────┼────────────────────┼────────────────────────┤
│ Phones gyroscope     │         1379145657 │              325737970 │
├──────────────────────┼────────────────────┼────────────────────────┤
│ Watch accelerometer  │          327168052 │               91203672 │
├──────────────────────┼────────────────────┼────────────────────────┤
│ Watch gyroscope      │          308337025 │               81702488 │
╘══════════════════════╧════════════════════╧════════════════════════╛


Como vemos, el tamaño de los ficheros Parquet es considerablemente menor que el de los ficheros CSV.

### 2. Tiempos de Ejecución

A continuación, compararemos los tiempos de ejecución de la Tarea 1 utilizando los cuatro métodos.

In [7]:
# Nombres de las columnas que nos serviran para los casos 3 y 4
column_names = ["index", "arrival_time", "creation_time", "x", "y", "z", "user", "model", "device", "gt"]

#### 2.1. RDD

In [8]:
%%time

"""
Function: create_RDD

Params:
    - filename: nombre del fichero
    
Return:
    RDD creado a partir del fichero de entrada

Esta funcion crea el RDD a partir del fichero CSV proporcionado
"""
def create_RDD(filename):
    
    rdd = sc.textFile(filename).map(lambda e: e.split(','))

    return rdd


"""
Function: process_RDD

Params:
    - rdd: RDD a procesar
    
Return:
    RDD procesado despues de aplicar las transformaciones y realizar los calculos necesarios

Esta funcion recibe el RDD creado y lo transforma de la siguiente manera:
1. Se queda con las columnas que necesitamos: [(User, Model, GT), x, y, z].
2. Realiza los calculos de x, y y z de forma separada, en diferentes RDD.
3. Une los resultados de las tres variables y los reordena correctamente.
"""
def process_RDD(rdd):
    
    # Nos quedamos con las columnas que nos interesan
    rdd_reduced = rdd.map(lambda e: ((e[6], e[7], e[9]), float(e[3]), float(e[4]), float(e[5])))
    
    # Agrupamos las x y hacemos los calculos
    rdd_x = rdd_reduced.map(lambda e: (e[0], e[1]))
    rdd_x_processed = calculate_RDD(rdd_x)
    
    # Agrupamos las y y hacemos los calculos
    rdd_y = rdd_reduced.map(lambda e: (e[0], e[2]))
    rdd_y_processed = calculate_RDD(rdd_y)
    
    # Agrupamos las z y hacemos los calculos
    rdd_z = rdd_reduced.map(lambda e: (e[0], e[3]))
    rdd_z_processed = calculate_RDD(rdd_z)
    
    # Unimos las tres variables
    rdd_joined = rdd_x_processed.join(rdd_y_processed).join(rdd_z_processed)
    rdd_processed = arrange_RDD(rdd_joined)
    
    return rdd_processed


"""
Function: calculate_RDD

Params:
    - rdd: RDD de x, y o z
    
Return:
    RDD con los calculos aplicados sobre la variable

Esta funcion recibe un RDD con forma [(User, Model, GT), variable], donde variable
es x, y o z, y calcula la media, desviacion tipica, maximo y minimo de la variable
por cada clave diferente.
"""
def calculate_RDD(rdd):
    
    rdd_grouped = rdd.groupByKey().map(lambda e: (e[0], list(e[1])))
    rdd_calculated = rdd_grouped.map(lambda e: (e[0], (np.mean(e[1]), np.std(e[1]), max(e[1]), min(e[1]))))

    return rdd_calculated
    

"""
Function: arrange_RDD

Params:
    - rdd: RDD a reordenar

Return:
    RDD reordenado
    
Esta funcion reordena el RDD resultante de la funcion calculate_RDD de la siguiente forma:
[(User, Model, GT), (mean(x), mean(y), mean(z), std(x), std(y), std(z),
max(x), max(y), max(z), min(x), min(y), min(z))]
"""
def arrange_RDD(rdd):
    
    rdd_arranged = rdd.map(lambda e: (e[0], 
                                      (e[1][0][0][0], e[1][0][1][0], e[1][1][0], 
                                       e[1][0][0][1], e[1][0][1][1], e[1][1][1], 
                                       e[1][0][0][2], e[1][0][1][2], e[1][1][2], 
                                       e[1][0][0][3], e[1][0][1][3], e[1][1][3])))
    
    return rdd_arranged


"""
Function: join_RDD

Params:
    - pa_rdd: RDD de Phones Accelerometer
    - pg_rdd: RDD de Phones Gyroscope
    - wa_rdd: RDD de Watch Accelerometer
    - wg_rdd: RDD de Watch Gyroscope

Return:
    RDD final

Esta funcion une los RDD de Phone por un lado y de Watch por otro mediante join, y los une en
un RDD mediante union.
"""
def join_RDD(pa_rdd, pg_rdd, wa_rdd, wg_rdd):

    # Join de Phones
    phone_rdd = pa_rdd.join(pg_rdd)

    # Join de Watch
    watch_rdd = wa_rdd.join(wg_rdd)

    # Union de Phones y Watch
    rdd = phone_rdd.union(watch_rdd)
    
    return rdd


# Creamos los RDD
pa_rdd = create_RDD(PHONES_ACCELEROMETER_PATH)
pg_rdd = create_RDD(PHONES_GYROSCOPE_PATH)
wa_rdd = create_RDD(WATCH_ACCELEROMETER_PATH)
wg_rdd = create_RDD(WATCH_GYROSCOPE_PATH)


# Procesamos los RDD
pa_processed = process_RDD(pa_rdd)
pg_processed = process_RDD(pg_rdd)
wa_processed = process_RDD(wa_rdd)
wg_processed = process_RDD(wg_rdd)

# Unimos los RDD
rdd = join_RDD(pa_processed, pg_processed, wa_processed, wg_processed)
print(rdd.collect())

[(('i', 's3', 'bike'), ((-0.8305826346212585, 0.074882657116146, 9.51892129518975, 1.1528066129881929, 0.8627064202538209, 1.0888751804312171, 3.4859576, 5.2002063, 19.258959, -9.586383999999999, -3.7924154000000003, -1.6950948000000001), (0.009889294178583957, -0.005613697093895955, -0.007020121538478878, 0.24356565441931763, 0.22319712782799161, 0.21100458412821368, 1.1701124, 2.129171, 1.1294898000000002, -1.0491611, -1.5415184, -1.1890492))), (('i', 's3mini', 'sit'), ((4.7803812322584704, 0.615286979184887, 8.172919329285557, 0.05898355009577621, 0.09144798136573146, 0.03319988576359514, 4.996699, 2.3032220000000003, 9.615114, 3.1136593999999995, 0.10055647, 7.878121000000001), (-0.012646493707418594, -0.010443389742527556, -0.009518636978439566, 0.03379875702999631, 0.032931960966928533, 0.024685389389252455, 1.5765933, 1.7068782, 1.7303002, -3.2258923, -1.557075, -1.8464342))), (('b', 'nexus4', 'stairsup'), ((-3.7862716931892564, 0.050938464237275445, 9.161005535057651, 2.1697384

Obtenemos un tiempo de ejecución (Wall time) de **1min 34s**

#### 2.2. DataFrames a partir de los RDD

In [9]:
%%time

"""
Function: create_DF

Params:
    - filename: fichero CSV sobre el que se va a crear un DataFrame

Return:
    DataFrame creado

Esta funcion recibe un fichero CSV y devuelve un DataFrame. Se encarga de crear la cabecera con
el nombre de los campos.
"""
def create_DF(filename):
    
    file = sc.textFile(filename).map(lambda e: e.split(","))
    file_header = file.map(lambda e: Row(index=e[0],
                                         arrival_time=e[1],
                                         creation_time=e[2],
                                         x=float(e[3]),
                                         y=float(e[4]),
                                         z=float(e[5]),
                                         user=e[6],
                                         model=e[7],
                                         device=e[8],
                                         gt=e[9]))
    df = spark.createDataFrame(file_header)
    
    return df


"""
Function: process_DF

Params:
    - df: DataFrame a procesar

Return:
    DataFrame procesado

Esta funcion transforma el DataFrame a la forma:
User | Model | GT | mean(x) | mean(y) | mean(z) | std(x) | std(y) | std(z) |
max(x) | max(y) | max(z) | min(x) | min(y) | min(z)
reduciendo primero el DataFrame unicamente con los campos necesarios, agrupando despues en
distintos DataFrames la clave primaria con la media, desviacion tipica, maximo y minimo y,
finlamente, uniendo cada uno de ellos de nuevo.
"""
def process_DF(df):
    
    # Nos quedamos con los campos que nos interesan
    reduced_df = df.select(df['user'], df['model'], df['gt'], df['x'], df['y'], df['z'])
    
    # Agrupamos por clave y hacemos cada uno de los calculos
    mean_df = reduced_df.groupBy('user', 'model', 'gt').agg(f.mean('x'), 
                                                            f.mean('y'), 
                                                            f.mean('z'))
    
    stdev_df = reduced_df.groupBy('user', 'model', 'gt').agg(f.stddev_pop('x'), 
                                                             f.stddev_pop('y'), 
                                                             f.stddev_pop('z'))
    
    max_df = reduced_df.groupBy('user', 'model', 'gt').agg(f.max('x'), 
                                                           f.max('y'), 
                                                           f.max('z'))
    
    min_df = reduced_df.groupBy('user', 'model', 'gt').agg(f.min('x'), 
                                                           f.min('y'),
                                                           f.min('z'))
    
    # Juntamos los resultados
    processed_df = mean_df.join(
        stdev_df, on=['user', 'model', 'gt']).join(
        max_df, on=['user', 'model', 'gt']).join(
        min_df, on=['user', 'model', 'gt'])

    return processed_df


"""
Function: join_DF

Params:
    - pa_df: DataFrame de Phones accelerometer
    - pg_df: DataFrame de Phones gyroscope
    - wa_df: DataFrame de Watch accelerometer
    - wg_df: DataFrame de Watch gyroscope

Return:
    DataFrame final

Esta funcion concatena los registros de los telefonos por un lado y de los relojes por otro.
Una vez hecho esto, los une en un DataFrame final.
"""
def join_DF(pa_df, pg_df, wa_df, wg_df):
    
    # Unimos phones por un lado y watches por otro mediante join
    phones_df = pa_df.join(pg_df, on=['user', 'model', 'gt'])
    watch_df = wa_df.join(wg_df, on=['user', 'model', 'gt'])

    # Unimos todos mediante union
    final_df = phones_df.union(watch_df)
    
    return final_df


# Creamos los DataFrames
pa_df = create_DF(PHONES_ACCELEROMETER_PATH)
pg_df = create_DF(PHONES_GYROSCOPE_PATH)
wa_df = create_DF(WATCH_ACCELEROMETER_PATH)
wg_df = create_DF(WATCH_GYROSCOPE_PATH)


# Procesamos los DataFrames
processed_pa_df = process_DF(pa_df)
processed_pg_df = process_DF(pg_df)
processed_wa_df = process_DF(wa_df)
processed_wg_df = process_DF(wg_df)


# Unimos los DataFrames
final_df = join_DF(processed_pa_df, processed_pg_df, processed_wa_df, processed_wg_df)
final_df.show()

+----+------+----------+--------------------+--------------------+------------------+-------------------+--------------------+-------------------+-------------------+------------------+------------------+-------------------+-------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+
|user| model|        gt|              avg(x)|              avg(y)|            avg(z)|      stddev_pop(x)|       stddev_pop(y)|      stddev_pop(z)|             max(x)|            max(y)|            max(z)|             min(x)|             min(y)|            min(z)|              avg(x)|              avg(y)|              avg(z)|       stddev_pop(x)|       stddev_pop(y)|       stddev_pop(z)|            max(x)|            max(y)|            max(z)|             min(x)|             min(y)|       

Obtenemos un tiempo de ejecución (Wall time) de **4min 20s**

#### 2.3. DataFrames a partir de los ficheros Parquet

In [10]:
%%time

# Leemos los ficheros Parquet generados en el apartado 1
pa_parquet_df = spark.read.parquet(PA_PARQUET_PATH)
pa_parquet_df = pa_parquet_df.toDF(*column_names)

pg_parquet_df = spark.read.parquet(PG_PARQUET_PATH)
pg_parquet_df = pg_parquet_df.toDF(*column_names)

wa_parquet_df = spark.read.parquet(WA_PARQUET_PATH)
wa_parquet_df = wa_parquet_df.toDF(*column_names)

wg_parquet_df = spark.read.parquet(WG_PARQUET_PATH)
wg_parquet_df = wg_parquet_df.toDF(*column_names)


# Una vez obtenidos los DataFrames generados con los ficheros Parquet,
# los procesamos con las funciones implementadas en el apartado 1
pa_parquet_processed_df = process_DF(pa_parquet_df)
pg_parquet_processed_df = process_DF(pg_parquet_df)
wa_parquet_processed_df = process_DF(wa_parquet_df)
wg_parquet_processed_df = process_DF(wg_parquet_df)

# Unimos los DataFrames
final_parquet_df = join_DF(pa_parquet_processed_df,
                          pg_parquet_processed_df,
                          wa_parquet_processed_df,
                          wg_parquet_processed_df)
final_parquet_df.show()

+----+------+----------+-------------------+--------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+-----------------+-------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+
|user| model|        gt|             avg(x)|              avg(y)|            avg(z)|      stddev_pop(x)|       stddev_pop(y)|      stddev_pop(z)|            max(x)|            max(y)|           max(z)|             min(x)|              min(y)|            min(z)|              avg(x)|              avg(y)|              avg(z)|       stddev_pop(x)|       stddev_pop(y)|       stddev_pop(z)|            max(x)|            max(y)|            max(z)|              min(x)|              min(y)|      

Obtenemos un tiempo de ejecución de **1min 1s**

#### 2.4. DataFrames a partir de los ficheros CSV

In [11]:
%%time

# Leemos los ficheros CSV
pa_csv_df = spark.read.csv(PHONES_ACCELEROMETER_PATH)
pa_csv_df = pa_csv_df.toDF(*column_names)

pg_csv_df = spark.read.csv(PHONES_GYROSCOPE_PATH)
pg_csv_df = pg_csv_df.toDF(*column_names)

wa_csv_df = spark.read.csv(WATCH_ACCELEROMETER_PATH)
wa_csv_df = wa_csv_df.toDF(*column_names)

wg_csv_df = spark.read.csv(WATCH_GYROSCOPE_PATH)
wg_csv_df = wg_csv_df.toDF(*column_names)


# Una vez obtenidos los DataFrames generados con los ficheros CSV
# los procesamos con las funciones implementadas en el apartado 1
pa_csv_processed_df = process_DF(pa_csv_df)
pg_csv_processed_df = process_DF(pg_csv_df)
wa_csv_processed_df = process_DF(wa_csv_df)
wg_csv_processed_df = process_DF(wg_csv_df)

# Unimos los DataFrames
final_csv_df = join_DF(pa_csv_processed_df,
                      pg_csv_processed_df,
                      wa_csv_processed_df,
                      wg_csv_processed_df)
final_csv_df.show()

+----+------+----------+-------------------+--------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+-----------------+-------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+
|user| model|        gt|             avg(x)|              avg(y)|            avg(z)|      stddev_pop(x)|       stddev_pop(y)|      stddev_pop(z)|            max(x)|            max(y)|           max(z)|             min(x)|              min(y)|            min(z)|              avg(x)|              avg(y)|              avg(z)|       stddev_pop(x)|       stddev_pop(y)|       stddev_pop(z)|            max(x)|            max(y)|            max(z)|              min(x)|              min(y)|      

Obtenemos un tiempo de ejecución de **1min 38s**

Como podemos ver, trabajar con ficheros Parquet es más rápido que trabajar con ficheros CSV.