In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark

In [99]:
! pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_312
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
Revision de351e30a90dd988b133b3d00fa6218bfcaba8b8
Url https://github.com/apache/spark
Type --help for more information.


In [16]:
data_path = '/media/daniel/Seagate Basic/spark_data/sensores/' # en esta carpeta deben encontrarse los 4 ficheros

In [8]:
spark = SparkSession.builder.master("local[*]")\
          .appName("practica 2 Ecosistema Spark parte 1")\
          .getOrCreate()

21/12/19 11:11:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# 1. Creación de dataframes proporcionando el schema

Los cuatro conjuntos de datos comparten la misma estructura, asique se usa el mismo esquema para todos.

In [87]:
schema = StructType([StructField('index', IntegerType(), True),
                     StructField('arrival_time', LongType(), True),
                     StructField('creation_time', LongType(), True),
                     StructField('x', FloatType(), True),
                     StructField('y', FloatType(), True),
                     StructField('z', FloatType(), True),
                     StructField('user', StringType(), True),
                     StructField('model', StringType(), True),
                     StructField('device', StringType(), True),
                     StructField('gt', StringType(), True)])

In [88]:
df_phones_acc   = spark.read.csv(data_path+'Phones_accelerometer.csv', schema=schema)
df_phones_gyros = spark.read.csv(data_path+'Phones_gyroscope.csv', schema=schema)
df_watch_acc    = spark.read.csv(data_path+'Watch_accelerometer.csv', schema=schema)
df_watch_gyros  = spark.read.csv(data_path+'Watch_gyroscope.csv', schema=schema)

# 2. Resumen de cada dataframe

Se calcula la media, desviación típica, máximo y mínimo de las variables x, y, z agrupadas por usuario, modelo y acción.

In [None]:
def summarize_dataframe(spark_df, agg=['user', 'model', 'gt'], features=['x', 'y', 'z']):
    
    # descripcion
    #----------------
    
    #Calcula la media, std, min y max de las columnas argumento agregadas por las columnas seleccionadas.
    
    # argumentos:
    #----------------
    
    #<spark_df> | tipo: pyspark.sql.DataFrame
    #<agg> | tipo: list | contenido: str
    #<features> | tipo: list | contenido: str
    
    # resultado: 
    #----------------
    
    #<result> | tipo: pyspark.sql.DataFrame | descriptivo: dataframe de spark con los estadísticos calculados y las columnas de agregado
    
    features_dict = {} # diccionario auxiliar para calcular la desviacion tipica con la funcion pyspark.sql.GroupedData.agg
    for f in features:
        features_dict[f] = 'stddev'
    
    std = spark_df.select(agg+features).groupby(agg).agg(features_dict)
    maximum = spark_df.select(agg+features).groupby(agg).max()
    minimum = spark_df.select(agg+features).groupby(agg).min()
    mean = spark_df.select(agg+features).groupby(agg).mean()
    
    result = mean.join(std, on=agg, how='left')\
        .join(minimum, on=agg, how='left')\
        .join(maximum, on=agg, how='left')
    
    return result

In [92]:
summary_phones_acc = summarize_dataframe(df_phones_acc)
summary_phones_gyros = summarize_dataframe(df_phones_gyros)
summary_watch_acc = summarize_dataframe(df_watch_acc)
summary_watch_gyros = summarize_dataframe(df_watch_gyros)

In [90]:
summary_phones_acc.take(2)

                                                                                

[Row(user='c', model='gear', gt='stand', avg(x)=0.03652780587438986, avg(y)=-0.042153270222705125, avg(z)=-0.03179906613070246, stddev(x)=0.48909112975469576, stddev(y)=0.21413389782911277, stddev(z)=0.2305604712729737, min(x)=-5.2245893478393555, min(y)=-2.2833943367004395, min(z)=-1.9624834060668945, max(x)=2.66369366645813, max(y)=2.443450450897217, max(z)=1.830923318862915),
 Row(user='b', model='gear', gt='stand', avg(x)=0.009942532104274503, avg(y)=-0.05661034300205209, avg(z)=-0.046038979898989404, stddev(x)=0.6784552716547866, stddev(y)=0.2222310138318165, stddev(z)=0.529926204603212, min(x)=-5.694371223449707, min(y)=-1.3688647747039795, min(z)=-3.473294734954834, max(x)=8.47364616394043, max(y)=1.1811119318008423, max(z)=3.3752903938293457)]

# 3. Join de cada dataframe del mismo dispositivo

In [100]:
summary_phones = summary_phones_acc.join(summary_phones_gyros, on=['user', 'model', 'gt'], how='left')
summary_watch = summary_watch_acc.join(summary_watch_gyros, on=['user', 'model', 'gt'], how='left')

In [101]:
summary_phones.take(2)

21/12/19 14:10:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

[Row(user='d', model='s3mini', gt='stairsdown', avg(x)=-4.463443999159753, avg(y)=0.24306410762203068, avg(z)=8.472466408398889, stddev(x)=2.240649971846288, stddev(y)=1.9391851564647065, stddev(z)=2.9196563251323107, min(x)=-15.545551300048828, min(y)=-7.756016254425049, min(z)=0.424970805644989, max(x)=3.9408559799194336, max(y)=8.825027465820312, max(z)=19.612701416015625, avg(x)=-0.2228422161491789, avg(y)=-0.03428942505599068, avg(z)=0.4503586104449553, stddev(x)=0.36662374555262106, stddev(y)=0.733085709661103, stddev(z)=0.7681403850578339, min(x)=-1.9054771661758423, min(y)=-3.5413575172424316, min(z)=-2.4095382690429688, max(x)=1.1589009761810303, max(y)=6.058246612548828, max(z)=4.878363132476807),
 Row(user='h', model='samsungold', gt='stairsdown', avg(x)=-5.635658786490508, avg(y)=0.23615755468668323, avg(z)=8.060744384923018, stddev(x)=2.5809602029984866, stddev(y)=2.0189003788579716, stddev(z)=3.542944712247185, min(x)=-18.387239456176758, min(y)=-9.040392875671387, min(z)

In [105]:
summary_phones.count(), summary_watch.count() 

                                                                                

(252, 118)

# 4. Unión de los dataframe de ambos dispositivos

In [103]:
summary = summary_phones.union(summary_watch)

In [104]:
summary.take(2)

                                                                                

[Row(user='d', model='s3mini', gt='stairsdown', avg(x)=-4.463443999159753, avg(y)=0.24306410762203068, avg(z)=8.472466408398889, stddev(x)=2.240649971846288, stddev(y)=1.9391851564647065, stddev(z)=2.9196563251323107, min(x)=-15.545551300048828, min(y)=-7.756016254425049, min(z)=0.424970805644989, max(x)=3.9408559799194336, max(y)=8.825027465820312, max(z)=19.612701416015625, avg(x)=-0.2228422161491789, avg(y)=-0.03428942505599068, avg(z)=0.4503586104449553, stddev(x)=0.36662374555262106, stddev(y)=0.733085709661103, stddev(z)=0.7681403850578339, min(x)=-1.9054771661758423, min(y)=-3.5413575172424316, min(z)=-2.4095382690429688, max(x)=1.1589009761810303, max(y)=6.058246612548828, max(z)=4.878363132476807),
 Row(user='h', model='samsungold', gt='stairsdown', avg(x)=-5.635658786490508, avg(y)=0.23615755468668323, avg(z)=8.060744384923018, stddev(x)=2.5809602029984866, stddev(y)=2.0189003788579716, stddev(z)=3.542944712247185, min(x)=-18.387239456176758, min(y)=-9.040392875671387, min(z)

In [106]:
summary.count() # la dimension del dataframe final debe coincidir con las dimensiones de cada uno por separado

                                                                                

370