# Práctica 2: SparkSQL TAREA 2

1. El primer objetivo de esta tarea es comprobar que el formato parquet reduce los tamaños de los ficheros de forma considerable con respecto a ficheros de texto en formato csv, por ejemplo. Para ello, debes generar un fichero parquet para cada uno de los ficheros csv y entregar una tabla en la que aparezcan los tamaños de cada uno de los ficheros csv y de sus correspondientes ficheros parquet.

2. El segundo objetivo de esta tarea es medir el tiempo de ejecución de la tarea 1 cuando ésta se realiza de distintas formas. Consideraremos los siguientes casos:
    Caso 1. Se crean RDDs para cada uno de los ficheros csv (esto corresponde al notebook realizado en la Práctica 1)
    Caso 2. Se crean DataFrames a partir de los RDDs (esto corresponde a la Tarea 1 de la Práctica 2)
    Caso 3. Se crean DataFrames a partir de los ficheros parquet generados en el apartado 1
    Caso 4. Se crean DataFrames a partir de los ficheros csv originales. Para ello, puedes utilizar la función spark.read.csv

In [11]:
# imports necesarios
from pyspark.sql import Row
from pyspark.sql import functions as F
import os


# función que crea los ficheros parquet a partir de un Dataframe creado por cada fichero.
def crearParquet(elem, name) :
    parts_elem = elem.map(lambda l: l.split(","))
    parts_elem = parts_elem.map(lambda p: Row(Index=p[0], Arrival_Time=p[1], Creation_Time=p[2], x=p[3], y=p[4],\
                                              z=p[5], User=p[6], Model=p[7], Device=p[8], gt=p[9]))
    df = spark.createDataFrame(parts_elem)
    nombre = name+'_parquet'

    df.write.mode("overwrite").parquet(nombre)
   
    return None


# importar ficheros
watch_acc = sc.textFile('datos/Watch_accelerometer.csv')
watch_giros = sc.textFile('datos/Watch_gyroscope.csv')
movil_acc = sc.textFile('datos/Phones_accelerometer.csv')
movil_giros = sc.textFile('datos/Phones_gyroscope.csv')

# generamos ficheros parquet
crearParquet(watch_acc, 'Watch_accelerometer')
crearParquet(watch_giros, 'Watch_gyroscope')
crearParquet(movil_acc, 'Phones_accelerometer')
crearParquet(movil_giros, 'Phones_gyroscope')


In [12]:
# Vemos los tamaños de los ficheros parquet y de los ficheros csv
# función que obtiene la extensión de un fichero.
def obtener_extension(fichero):
    pos = fichero.rfind(".")
    ext = fichero[pos + 1:len(fichero):].lower()

    return ext


# función que calcula los tamaños de los ficheros parquet generados. Devuelve un diccionario con los ficheros
# y su tamaño.
def calcular_tamanios_parquet(direc1, direc2, direc3, direc4):
    list_dir = (direc1, direc2, direc3, direc4)
    dic = {}
    
    for i in list_dir :
        archivo = os.getcwd() + os.sep + i + os.sep
        listDir = os.listdir(i)
        
        for x in listDir :
            tamanio = 0
            if obtener_extension(x)=='parquet':
                file = archivo + x
                file_stat = os.stat(file)
                tamanio = file_stat.st_size
                dic[i] = (tamanio, 'parquet')
                   
    return dic


# función que calcula los tamaños de los ficheros csv. Devuelve un diccionario con los ficheros y su tamaño.
def calcular_tamanios_csv(direc):
    tamanio = 0
    dic = {}
    archivo = os.getcwd() + os.sep + direc + os.sep
    listDir = os.listdir(direc)
    
    for x in listDir :
        if obtener_extension(x)=='csv' and (x=='Phones_accelerometer.csv' or x=='Phones_gyroscope.csv' \
                                            or x=='Watch_accelerometer.csv'or x=='Watch_gyroscope.csv' ) :
            file = archivo + x
            file_stat = os.stat(file)
            tamanio = file_stat.st_size
            dic[x] = (tamanio, 'csv')
    
    return dic


# función que crea la tabla con los ficheros y sus tamaños.
def tabla_tamanios(dic_csv, dic_parquet) :
    data_csv = sc.parallelize([(k,)+(v,) for k,v in dic_csv.items()]).toDF(['Nombre Fichero','Tamaño, Tipo'])
    data_parquet = sc.parallelize([(k,)+(v,) for k,v in dic_parquet.items()]).toDF(['Nombre Fichero','Tamaño, Tipo'])
    
    df_final = data_csv.unionAll(data_parquet)
    
    return df_final


# llamamos a las funciones para obtener finalmente un DF con los ficheros y su tamaño. Como se puede apreciar 
# los ficheros parquet son aproximadamente 3 veces más pequeños que sus homólogos csv.
dic_csv = calcular_tamanios_csv('datos')
dic_parquet = calcular_tamanios_parquet('Watch_accelerometer_parquet','Watch_gyroscope_parquet',\
                              'Phones_accelerometer_parquet','Phones_gyroscope_parquet')
df_final = tabla_tamanios(dic_csv,dic_parquet)

df_final.show()


+--------------------+-----------------+
|      Nombre Fichero|     Tamaño, Tipo|
+--------------------+-----------------+
|Phones_accelerome...| [1291856327,csv]|
|Phones_gyroscope.csv| [1379145657,csv]|
| Watch_gyroscope.csv|  [308337025,csv]|
|Watch_acceleromet...|  [327168052,csv]|
|Watch_acceleromet...|[7048682,parquet]|
|Watch_gyroscope_p...|[1901802,parquet]|
|Phones_gyroscope_...| [862971,parquet]|
|Phones_accelerome...|[3105640,parquet]|
+--------------------+-----------------+



In [13]:
# otra forma de realizarlo
# función que crea los ficheros parquet a partir del csv
def crearParquetCSV(csvFile, name) :
    csvFile = csvFile.toDF('Index', 'Arrival_Time', 'Creation_Time', 'x', 'y',\
                                              'z', 'User', 'Model', 'Device', 'gt')
    nombre = name+'_parquet'
    
    csvFile.write.format("parquet").mode("overwrite").save(nombre)    
    
    return None


# importar ficheros
w_acc = spark.read.format("csv").option("header", "false").option("inferSchema", "true")\
                        .load("datos/Watch_accelerometer.csv")
w_giros = spark.read.format("csv").option("header", "false").option("inferSchema", "true")\
                        .load("datos/Watch_gyroscope.csv")
m_acc = spark.read.format("csv").option("header", "false").option("inferSchema", "true")\
                        .load("datos/Phones_accelerometer.csv")
m_giros = spark.read.format("csv").option("header", "false").option("inferSchema", "true")\
                        .load("datos/Phones_gyroscope.csv")

# generamos ficheros parquet
crearParquetCSV(w_acc, 'w_acc')
crearParquetCSV(w_giros, 'w_giros')
crearParquetCSV(m_acc, 'm_acc')
crearParquetCSV(m_acc, 'm_giros')


In [14]:
# Vemos los tamaños de los ficheros parquet y de los ficheros csv
# llamamos a las funciones para obtener finalmente un DF con los ficheros y su tamaño. Como se puede apreciar 
# los ficheros parquet son claramente más pequeños que sus homólogos csv.
dic_csv = calcular_tamanios_csv('datos')
dic_parquet = calcular_tamanios_parquet('w_acc_parquet','w_giros_parquet','m_acc_parquet','m_giros_parquet')
df_final = tabla_tamanios(dic_csv,dic_parquet)

df_final.show()


+--------------------+------------------+
|      Nombre Fichero|      Tamaño, Tipo|
+--------------------+------------------+
|Phones_accelerome...|  [1291856327,csv]|
|Phones_gyroscope.csv|  [1379145657,csv]|
| Watch_gyroscope.csv|   [308337025,csv]|
|Watch_acceleromet...|   [327168052,csv]|
|       m_acc_parquet|[17096676,parquet]|
|     m_giros_parquet|[17096676,parquet]|
|     w_giros_parquet| [9132076,parquet]|
|       w_acc_parquet|[13084180,parquet]|
+--------------------+------------------+



In [10]:
# Medir los tiempos para cada caso
# 1. Se crean RDDs para cada uno de los ficheros csv (esto corresponde al notebook realizado en la Práctica 1) 

import math

# función calcularRDDs.
# Se mapean las columnas deseadas, y se forma la clave con las 3 columnas requeridas. En base a esta clave se hace el 
# reduce y se obtienen los valores que se necesitan para realizar los distintos cálculos. 
# Luego se acaba formando un RDD con la información necesaria.
def calcularRDDs(rdd_inicial):
    rdd_calculos = rdd_inicial.map(lambda a: ((a[6], a[7], a[9]), float(a[3]), float(a[4]), float(a[5])))
    rdd_calculos = rdd_calculos.map(lambda a: (a[0],(1,a[1],a[1]*a[1],a[2],a[2]*a[2],a[3],a[3]*a[3],a[1],a[1],a[2],\
                                                     a[2],a[3],a[3])))
    rdd_calculos = rdd_calculos.reduceByKey(lambda a,b: ((a[0]+b[0]),(a[1]+b[1]),(a[2]+b[2]),(a[3]+b[3]),(a[4]+b[4]),\
                                                    (a[5]+b[5]),(a[6]+b[6]),max(a[7],b[7]),min(a[8],b[8]),\
                                                    max(a[9],b[9]),min(a[10],b[10]),max(a[11],b[11]),min(a[12],b[12])))

    rdd_calculos = rdd_calculos.map(lambda a: (a[0], (a[1][1]/a[1][0], a[1][3]/a[1][0], a[1][5]/a[1][0],\
                                    math.sqrt((a[1][2]/a[1][0] - (a[1][1]/a[1][0])**2)),\
                                    math.sqrt((a[1][4]/a[1][0] - (a[1][3]/a[1][0])**2)),\
                                    math.sqrt((a[1][6]/a[1][0] - (a[1][5]/a[1][0])**2)),\
                                    a[1][7], a[1][9], a[1][11], a[1][8], a[1][10], a[1][12]))) 
    
    return rdd_calculos


# función transformarRDDs.
# Se reciben los 4 RDDs, uno por fichero, y se llama a la función de calcular para cada uno, 
# y luego se realizan los join entre los del mismo tipo y el union final.
def transformarRDDs(rdd_1, rdd_2, rdd_3, rdd_4):
    rdd_watch_acc = calcularRDDs(rdd_1)
    rdd_watch_giros = calcularRDDs(rdd_2)
    rdd_phone_acc = calcularRDDs(rdd_3)
    rdd_phone_giros = calcularRDDs(rdd_4)
    
    rdd_watch = rdd_watch_acc.join(rdd_watch_giros)
    rdd_movil = rdd_phone_acc.join(rdd_phone_giros)
    
    rdd_union = rdd_watch.union(rdd_movil)
    
    return rdd_union


# cargar los ficheros
# Para hacer la prueba conjunta de todo y ver el tiempo que lleva
rdd_watch_acc = sc.textFile("datos/Watch_accelerometer.csv").map(lambda a: a.split(','))
rdd_watch_giros = sc.textFile("datos/Watch_gyroscope.csv").map(lambda a: a.split(','))
rdd_movil_acc = sc.textFile("datos/Phones_accelerometer.csv").map(lambda a: a.split(','))
rdd_movil_giros = sc.textFile("datos/Phones_gyroscope.csv").map(lambda a: a.split(','))

# llamamos a la función que engloba los cálculos y que hara los join y union para obtener el RDD final
rdd_final = transformarRDDs(rdd_watch_acc, rdd_watch_giros, rdd_movil_acc, rdd_movil_giros)

# mostramos el RDD final. Se puede ver como ha tardado 8min.
%time print(rdd_final.collect())


[(('a', 'gear', 'walk'), ((-8.209435252657098, -0.6317477729848083, -2.5582866559249933, 3.978320463306628, 5.756819162944247, 2.5094203133216815, 5.0571527, 19.612701, 14.097658, -19.6133, -15.020623, -19.6133), (-0.06254020985950065, 0.04367344551857416, 0.009326976410292849, 1.2294450098207483, 0.7702535005849915, 1.6777683394502751, 5.064001, 6.2592278, 5.1100736, -8.6808405, -6.125271, -8.714663))), (('b', 'gear', 'sit'), ((-0.5438519885986548, -1.3366468831126068, 8.995904502108408, 1.1188140878018438, 2.1284080239323724, 0.8940432140906635, 9.387066, 5.8801594, 14.768035, -8.981249, -11.721413, -1.8722657), (0.003987934040125752, -0.03229710183932541, -0.07759407987452868, 0.47275066475550376, 0.2601034526048903, 0.4182781124210057, 4.36359, 4.0069923, 3.6842172, -5.8730693, -2.2261364, -4.023504))), (('c', 'lgwatch', 'null'), ((4.924406071091834, -5.147223723875772, 0.8241633636352341, 5.83427664104053, 3.202149495247289, 3.7545761722586217, 19.32988, 11.912689, 19.927063, -19.

In [15]:
# Se crean DataFrames a partir de los RDDs (esto corresponde a la Tarea 1 de la Práctica 2) 

# funciones 
# Función que crea el DF de cada fichero cargado
def crearDataframe(elem) :
    parts_elem = elem.map(lambda l: l.split(","))
    parts_elem = parts_elem.map(lambda p: Row(user=p[6], model=p[7], gt=p[9], x=p[3], y=p[4], z=p[5]))
    df = spark.createDataFrame(parts_elem)
    
    return df


# Función que para cada DF creado realiza los cálculos de máximo, mínimo, media y desviación de las variables x,
# y, z, agrupando por las columnas user, model y gt. Devuelve un DF con todos los cálculos.
def calculosDataframe(df_inicial) :
    df_max = df_inicial.groupBy('user', 'model', 'gt').agg(F.max('x'), F.max('y'), F.max('z'))
    df_min = df_inicial.groupBy('user', 'model', 'gt').agg(F.min('x'), F.min('y'), F.min('z'))
    df_avg = df_inicial.groupBy('user', 'model', 'gt').agg(F.avg('x'), F.avg('y'), F.avg('z'))
    df_std = df_inicial.groupBy('user', 'model', 'gt').agg(F.stddev('x'), F.stddev('y'), F.stddev('z'))

    df_calculos = df_max.join(df_min, on=['user', 'model', 'gt'], how='inner')\
             .join(df_avg, on=['user', 'model', 'gt'], how='inner')\
             .join(df_std, on=['user', 'model', 'gt'], how='inner') 

    return df_calculos


# Función que recibe los 4 DFs de cálculos, primero hace un join para los dos DFs de watch y luego otro join para
# los dos DFs de movil, para finalmente unir ambos DFs y obtener sólo un DF que contenga todos los registros calculados.
def unirDataframes(df_watch_acc, df_watch_giros, df_movil_acc, df_movil_giros) :
    df_watch = df_watch_acc.join(df_watch_giros, on=['user', 'model', 'gt'], how='full')
    df_movil = df_movil_acc.join(df_movil_giros, on=['user', 'model', 'gt'], how='full')
    
    df_final = df_watch.unionAll(df_movil)
    
    return df_final


# cargamos los ficheros
watch_acc_g = sc.textFile('datos/Watch_accelerometer.csv')
watch_giros_g = sc.textFile('datos/Watch_gyroscope.csv')
movil_acc_g = sc.textFile('datos/Phones_accelerometer.csv')
movil_giros_g = sc.textFile('datos/Phones_gyroscope.csv')

# creamos los dataframes para cada uno
parts_w_acc_g = crearDataframe(watch_acc_g)
parts_w_giros_g = crearDataframe(watch_giros_g)
parts_m_acc_g = crearDataframe(movil_acc_g)
parts_m_giros_g = crearDataframe(movil_giros_g)

# se realizan los cálculos sobre cada DF
df_watch_acc_g = calculosDataframe(parts_w_acc_g)
df_watch_giros_g = calculosDataframe(parts_w_giros_g)
df_movil_acc_g = calculosDataframe(parts_m_acc_g)
df_movil_giros_g = calculosDataframe(parts_m_giros_g)

# se obtiene el DF final con los cálculos, join y union.
df_final = unirDataframes(df_watch_acc_g, df_watch_giros_g, df_movil_acc_g, df_movil_giros_g)

# Con %time se muestra al final el tiempo transcurrido, en este caso 51 minutos.
%time df_final.show()


+----+-------+----------+---------+---------+---------+-------------+--------------+-------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+---------+-------------+---------+--------------+--------------+--------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+
|user|  model|        gt|   max(x)|   max(y)|   max(z)|       min(x)|        min(y)|       min(z)|             avg(x)|             avg(y)|              avg(z)|    stddev_samp(x)|    stddev_samp(y)|     stddev_samp(z)|   max(x)|       max(y)|   max(z)|        min(x)|        min(y)|        min(z)|              avg(x)|              avg(y)|              avg(z)|     stddev_samp(x)|     stddev_samp(y)|     stddev_samp(z)|
+----+-------+----------+---------+---------+---------+-------------+--------------+-------------+-------------------+-------------------+--------------------+-

In [1]:
# Se crean DataFrames a partir de los ficheros parquet generados en el apartado 1 
df_w_acc_parquet = spark.read.parquet("Watch_accelerometer_parquet")
df_w_giros_parquet = spark.read.parquet("Watch_gyroscope_parquet")
df_p_acc_parquet = spark.read.parquet("Phones_accelerometer_parquet")
df_p_giros_parquet = spark.read.parquet("Phones_gyroscope_parquet")

# una vez tenemos los DF ya llamamos a las funciones de hacer los DFs con cálculos y luego terminar uniéndolos,
# que utilizamos para el anterior apartado. Sólo que aquí los DFs que pasamos a la función que realiza los cálculos
# los hemos obtenido de los ficheros parquet.
df_watch_acc_p = calculosDataframe(df_w_acc_parquet)
df_watch_giros_p = calculosDataframe(df_w_giros_parquet)
df_phones_acc_p = calculosDataframe(df_p_acc_parquet)
df_phones_giros_p = calculosDataframe(df_p_giros_parquet)

# llamamos a la función que realiza los join y el union para obtener el DF final.
df_final_p = unirDataframes(df_watch_acc_p, df_watch_giros_p, df_phones_acc_p, df_phones_giros_p)

# Con %time se muestra al final el tiempo transcurrido, en este caso 7 minutos.
%time df_final_p.show()


+----+-------+----------+---------+---------+---------+-------------+--------------+-------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+---------+-------------+---------+--------------+--------------+--------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+
|user|  model|        gt|   max(x)|   max(y)|   max(z)|       min(x)|        min(y)|       min(z)|             avg(x)|             avg(y)|              avg(z)|    stddev_samp(x)|    stddev_samp(y)|     stddev_samp(z)|   max(x)|       max(y)|   max(z)|        min(x)|        min(y)|        min(z)|              avg(x)|              avg(y)|              avg(z)|     stddev_samp(x)|     stddev_samp(y)|     stddev_samp(z)|
+----+-------+----------+---------+---------+---------+-------------+--------------+-------------+-------------------+-------------------+--------------------+-

In [2]:
# Se crean DataFrames a partir de los ficheros csv originales. Para ello, puedes utilizar la función spark.read.csv
# Creamos un DF a partir de cada fichero csv directamente. Cambiamos los nombres de las columnas para que sean los
# mismos que los ficheros de origen.
df_watch_acc = spark.read.format("csv").option("header", "false").load("datos/Watch_accelerometer.csv")
df_watch_acc = df_watch_acc.selectExpr('_c0 as Index', '_c1 as Arrival_Time', '_c2 as Creation_Time', '_c3 as x',\
                                       '_c4 as y', '_c5 as z', '_c6 as user', '_c7 as model', '_c8 as Device',\
                                       '_c9 as gt')

df_watch_giros = spark.read.format("csv").option("header", "false").load("datos/Watch_gyroscope.csv")
df_watch_giros = df_watch_giros.selectExpr('_c0 as Index', '_c1 as Arrival_Time', '_c2 as Creation_Time', '_c3 as x',\
                                       '_c4 as y', '_c5 as z', '_c6 as user', '_c7 as model', '_c8 as Device',\
                                       '_c9 as gt')

df_phones_acc = spark.read.format("csv").option("header", "false").load("datos/Phones_accelerometer.csv")
df_phones_acc = df_phones_acc.selectExpr('_c0 as Index', '_c1 as Arrival_Time', '_c2 as Creation_Time', '_c3 as x',\
                                       '_c4 as y', '_c5 as z', '_c6 as user', '_c7 as model', '_c8 as Device',\
                                       '_c9 as gt')

df_phones_giros = spark.read.format("csv").option("header", "false").load("datos/Phones_gyroscope.csv")
df_phones_giros = df_phones_giros.selectExpr('_c0 as Index', '_c1 as Arrival_Time', '_c2 as Creation_Time',\
                                             '_c3 as x', '_c4 as y', '_c5 as z', '_c6 as user', '_c7 as model',\
                                             '_c8 as Device', '_c9 as gt')

# una vez tenemos los DF ya llamamos a las funciones de hacer los DFs con cálculos y luego terminar uniéndolos,
# que utilizamos para el anterior apartado.
df_watch_acc_g = calculosDataframe(df_watch_acc)
df_watch_giros_g = calculosDataframe(df_watch_giros)
df_phones_acc_g = calculosDataframe(df_phones_acc)
df_phones_giros_g = calculosDataframe(df_phones_giros)

df_final_g = unirDataframes(df_watch_acc_g, df_watch_giros_g, df_phones_acc_g, df_phones_giros_g)

# Con %time se muestra al final el tiempo transcurrido, en este caso 15 minutos.
%time df_final_g.show()


+----+-------+----------+---------+---------+---------+-------------+--------------+-------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+---------+-------------+---------+--------------+--------------+--------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+
|user|  model|        gt|   max(x)|   max(y)|   max(z)|       min(x)|        min(y)|       min(z)|             avg(x)|             avg(y)|              avg(z)|    stddev_samp(x)|    stddev_samp(y)|     stddev_samp(z)|   max(x)|       max(y)|   max(z)|        min(x)|        min(y)|        min(z)|              avg(x)|              avg(y)|              avg(z)|     stddev_samp(x)|     stddev_samp(y)|     stddev_samp(z)|
+----+-------+----------+---------+---------+---------+-------------+--------------+-------------+-------------------+-------------------+--------------------+-