In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Tarea 2") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import *

In [4]:
#crear el esquema mediante StructType, definir la estructura de un campo (columna) en un esquema con StructField y definir tipo de dato 
fields = [StructField("a", StringType(),True), StructField("b", StringType(),True),StructField("c", StringType(),True),
         StructField("X", DoubleType(),True),StructField("Y", DoubleType(),True),StructField("Z", DoubleType(),True),
         StructField("user", StringType(),True),StructField("model", StringType(),True),StructField("d", StringType(),True),
         StructField("action", StringType(),True)]
schema = StructType(fields)

In [5]:
# leer cada csv y aplicar el schema
df_phone_acceler = spark.read.csv("Phones_accelerometer1.csv", schema=schema, header = False, nullValue="null")
df_phone_gyros = spark.read.csv("Phones_gyroscope1.csv", schema=schema, header = False, nullValue="null")
df_watch_acceler = spark.read.csv("Watch_accelerometer1.csv", schema=schema, header = False, nullValue="null")
df_watch_gyros =  spark.read.csv("Watch_gyroscope1.csv", schema=schema, header = False, nullValue="null")

In [6]:
# eliminar las columnas que no nos sirven con drop
df_phone_acceler = df_phone_acceler.drop("a", "b", "c", "d")
df_phone_gyros = df_phone_gyros.drop("a", "b", "c", "d")
df_watch_acceler = df_watch_acceler.drop("a", "b", "c", "d")
df_watch_gyros = df_watch_gyros.drop("a", "b", "c", "d")

In [7]:
#df_phone_acceler.show()

In [8]:
#función para las agregaciones: media, desviación, min y máx, lo agrupamos por user, model y action
def calcular_resultados(df):
    return df.groupBy("user", "model", "action").agg(
        format_number(mean("X"), 2).alias("mean_X"),
        format_number(stddev("X"), 2).alias("stddev_X"),
        format_number(min("X"), 2).alias("min_X"),
        format_number(max("X"), 2).alias("max_X"),
        format_number(mean("Y"), 2).alias("mean_Y"),
        format_number(stddev("Y"), 2).alias("stddev_Y"),
        format_number(min("Y"), 2).alias("min_Y"),
        format_number(max("Y"), 2).alias("max_Y"),
        format_number(mean("Z"), 2).alias("mean_Z"),
        format_number(stddev("Z"), 2).alias("stddev_Z"),
        format_number(min("Z"), 2).alias("min_Z"),
        format_number(max("Z"), 2).alias("max_Z")
    )
# usar la función con diferentes conjuntos de datos
resultados_phone_acceler = calcular_resultados(df_phone_acceler)
resultados_phone_gyros = calcular_resultados(df_phone_gyros)
resultados_watch_acceler = calcular_resultados(df_watch_acceler)
resultados_watch_gyros = calcular_resultados(df_watch_gyros)

In [9]:
resultados_phone_acceler.show()
resultados_phone_gyros.show()
resultados_watch_acceler.show()
resultados_watch_gyros.show()

+----+------+------+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+
|user| model|action|mean_X|stddev_X|min_X|max_X|mean_Y|stddev_Y|min_Y|max_Y|mean_Z|stddev_Z|min_Z|max_Z|
+----+------+------+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+
|   a|nexus4| stand| -6.03|    0.18|-7.04|-5.52|  0.93|    0.24|-0.84| 1.95|  8.01|    0.18| 7.15| 8.64|
+----+------+------+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+

+----+------+------+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+
|user| model|action|mean_X|stddev_X|min_X|max_X|mean_Y|stddev_Y|min_Y|max_Y|mean_Z|stddev_Z|min_Z|max_Z|
+----+------+------+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+
|   a|nexus4| stand|  0.00|    0.04|-0.17| 0.63|  0.00|    0.03|-0.16| 0.35|  0.00|    0.05|-0.60| 0.45|
+----+------+------+------+--------+-----+-----+------

In [10]:
# dataframes reloj y telefono
reloj = resultados_watch_acceler.join(resultados_watch_gyros, ["user", "model", "action"])
telefono = resultados_phone_acceler.join(resultados_phone_gyros, ["user", "model", "action"])

# unir dataFrames de reloj y teléfono
df_final = reloj.union(telefono)

# dataFrame final
df_final.show()

+----+------+------+------+--------+------+-----+------+--------+------+-----+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+
|user| model|action|mean_X|stddev_X| min_X|max_X|mean_Y|stddev_Y| min_Y|max_Y|mean_Z|stddev_Z|min_Z|max_Z|mean_X|stddev_X|min_X|max_X|mean_Y|stddev_Y|min_Y|max_Y|mean_Z|stddev_Z|min_Z|max_Z|
+----+------+------+------+--------+------+-----+------+--------+------+-----+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+------+--------+-----+-----+
|   a|  gear| stand| -9.29|    0.41|-12.60|-0.57| -3.14|    1.06|-11.08|-0.58| -1.07|    0.62|-2.26| 1.02|  0.02|    0.07|-2.04| 0.81| -0.03|    0.06|-0.55| 0.35| -0.07|    0.05|-1.23| 1.15|
|   a|  gear|   sit| -7.60|    0.19|-10.82|-6.66| -5.53|    0.27| -6.92|-0.83|  2.64|    0.39|-3.67| 3.56|  0.02|    0.06|-0.70| 0.40| -0.04|    0.06|-0.79| 0.40| -0.07|    0.05|-0.23| 0.78|
|   a|nexus4| stand| -6.03|    0.18| -7.04|-5