In [1]:
# Importing libraries
import os, time, numpy as np, pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import mean, stddev, min, max, collect_list, flatten, concat, col

In [2]:
# Creating spark enviroment
spark = pyspark.sql.SparkSession.builder.appName("CGE P6 T2").getOrCreate()

22/12/23 15:05:49 WARN Utils: Your hostname, martin resolves to a loopback address: 127.0.1.1; using 192.168.0.24 instead (on interface wlp1s0)
22/12/23 15:05:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/23 15:06:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/23 15:06:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# Change this variable to decide where the dataset files will be obtained from
useRoot = False

# Dataset common path
common_path_root = "./"                             # Default dataset path
common_path_full = "../datasets/datos_sensores/"    # Custom dataset path for full data

# Dataset file names
phones_acc_name = "Phones_accelerometer.csv"
phones_gyr_name = "Phones_gyroscope.csv"
watch_acc_name = "Watch_accelerometer.csv"
watch_gyr_name = "Watch_gyroscope.csv"

# Dataset paths
path_full = (common_path_root if useRoot else common_path_full)
phones_acc_path = path_full + phones_acc_name
phones_gyr_path = path_full + phones_gyr_name
watch_acc_path = path_full + watch_acc_name
watch_gyr_path = path_full + watch_gyr_name

# Defining dataset schema
dataset_schema = StructType([
    StructField("Index", IntegerType(), True),
    StructField("Arrival_Time", StringType(), True),
    StructField("Creation_Time", StringType(), True),
    StructField("x", FloatType(), True),
    StructField("y", FloatType(), True),
    StructField("z", FloatType(), True),
    StructField("User", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Device", StringType(), True),
    StructField("gt", StringType(), True)
])

# Defining function to group Dataframe by User, Model, and class (gt) and performing required aggregates
def computeDFStats(df):
    df_grouped = df.groupBy("User", "Model", "gt").agg(
        mean("x").alias("mean_x"),
        mean("y").alias("mean_y"),
        mean("z").alias("mean_z"),
        stddev("x").alias("stddev_x"),
        stddev("y").alias("stddev_y"),
        stddev("z").alias("stddev_z"),
        max("x").alias("max_x"),
        max("y").alias("max_y"),
        min("z").alias("min_z"),
        min("x").alias("min_x"),
        min("y").alias("min_y"),
        max("z").alias("max_z")
    )

    return df_grouped

# Defining function to group Dataframe by User, Model, and class (gt) and performing required aggregates
def regroupJoinedDataframes(df):
    df_grouped = df.groupBy("User", "Model", "gt").agg(
        collect_list('mean_x').alias('mean_x_list'),
        collect_list('mean_y').alias('mean_y_list'),
        collect_list('mean_z').alias('mean_z_list'),
        collect_list('stddev_x').alias('stddev_x_list'),
        collect_list('stddev_y').alias('stddev_y_list'),
        collect_list('stddev_z').alias('stddev_z_list'),
        collect_list('max_x').alias('max_x_list'),
        collect_list('max_y').alias('max_y_list'),
        collect_list('max_z').alias('max_z_list'),
        collect_list('min_x').alias('min_x_list'),
        collect_list('min_y').alias('min_y_list'),
        collect_list('min_z').alias('min_z_list')
    )

    return df_grouped

In [4]:
# Importing each dataset into a Dataframe
df_acc_phones = spark.read.format("csv").schema(dataset_schema).load(phones_acc_path)
df_gyr_phones = spark.read.format("csv").schema(dataset_schema).load(phones_gyr_path)
df_acc_watches = spark.read.format("csv").schema(dataset_schema).load(watch_acc_path)
df_gyr_watches = spark.read.format("csv").schema(dataset_schema).load(watch_gyr_path)

# Print one Datafram as a sample
df_acc_phones.show()

                                                                                

+-----+-------------+-------------------+----------+----------+---------+----+------+--------+-----+
|Index| Arrival_Time|      Creation_Time|         x|         y|        z|User| Model|  Device|   gt|
+-----+-------------+-------------------+----------+----------+---------+----+------+--------+-----+
|    0|1424696633908|1424696631913248572| -5.958191| 0.6880646| 8.135345|   a|nexus4|nexus4_1|stand|
|    1|1424696633909|1424696631918283972|  -5.95224| 0.6702118| 8.136536|   a|nexus4|nexus4_1|stand|
|    2|1424696633918|1424696631923288855|-5.9950867| 0.6535492| 8.204376|   a|nexus4|nexus4_1|stand|
|    3|1424696633919|1424696631928385290|-5.9427185| 0.6761627| 8.128204|   a|nexus4|nexus4_1|stand|
|    4|1424696633929|1424696631933420691| -5.991516|0.64164734| 8.135345|   a|nexus4|nexus4_1|stand|
|    5|1424696633929|1424696631938456091| -5.965332| 0.6297455| 8.128204|   a|nexus4|nexus4_1|stand|
|    6|1424696633938|1424696631943522009| -5.991516| 0.6356964|  8.16272|   a|nexus4|nexus4

In [5]:
# Writing into parquet format
df_acc_phones.write.mode("overwrite").parquet(path_full + os.path.splitext(phones_acc_name)[0] + ".parquet")
df_gyr_phones.write.mode("overwrite").parquet(path_full + os.path.splitext(phones_gyr_name)[0] + ".parquet")
df_acc_watches.write.mode("overwrite").parquet(path_full + os.path.splitext(watch_acc_name)[0] + ".parquet")
df_gyr_watches.write.mode("overwrite").parquet(path_full + os.path.splitext(watch_gyr_name)[0] + ".parquet")

                                                                                

In [6]:
# This function returns the size of a given folder in bytes
def getFolderSize(folder_path):
    size = 0
    for path, dirs, files in os.walk(folder_path):
        for f in files:
            fp = os.path.join(path, f)
            size += os.path.getsize(fp)
    return size

# This function returns a tuple containing he name of the given file, the size of the csv file, and the size of the parquet file
# Sizes are in bytes
def getSizeData(file_path):
    return (
        os.path.basename(file_path),
        os.path.getsize(file_path + ".csv"),
        getFolderSize(file_path + ".parquet")
    )

In [7]:
# Column names
df_size_column_names = ['File_name', 'csv_size', 'parquet_size']

# List to convert into Dataframe
size_data = [
    getSizeData(path_full + os.path.splitext(phones_acc_name)[0]),
    getSizeData(path_full + os.path.splitext(phones_gyr_name)[0]),
    getSizeData(path_full + os.path.splitext(watch_acc_name)[0]),
    getSizeData(path_full + os.path.splitext(watch_gyr_name)[0])
]

# Creating Dataframe
df_sizes = spark.createDataFrame(data=size_data, schema=df_size_column_names)

# Showing Dataframe
df_sizes.show()

                                                                                

+--------------------+----------+------------+
|           File_name|  csv_size|parquet_size|
+--------------------+----------+------------+
|Phones_accelerometer|1291856327|   302564744|
|    Phones_gyroscope|1379145657|   317189065|
| Watch_accelerometer| 327168052|    85706745|
|     Watch_gyroscope| 308337025|    77764632|
+--------------------+----------+------------+



In [8]:
######################################## Useful functions for the following tasks ########################################
# This function parses the input string based RDD into tuples
def parseRawRDD(rawRDD):
    # Splitting row into an array with the separator, and adding User, Model, and action (gt) as key
    return rawRDD.map(lambda line: line.split(",")).map(lambda line: ((line[6], line[7], line[9]), (float(line[3]), float(line[4]), float(line[5]))))

# This function returns the given RDD with elements grouped by key and values as a list
def groupData(rdd):
    return rdd.groupByKey().mapValues(list)

# Defining funtion that will compute stats for each list of values
def computeRDDStats(values):
    # Calculating values
    mean = np.mean(values, axis=0)
    std = np.std(values, axis=0)
    max = np.max(values, axis=0)
    min = np.min(values, axis=0)

    return np.concatenate([mean, std, max, min]).ravel().tolist()

# Defining function to group RDD by User, Model, and class (gt) and performing required aggregates
def transformRDD(rdd):
    # Grouping RDD by key
    rdd = groupData(rdd)

    # Calculating values
    rdd = rdd.mapValues(computeRDDStats)

    # Returning RDD with the following shape for each line:
    # (User, Model, gt), [mean_x, mean_y, mean_z, std_x, std_y, std_z, max_x, max_y, max_z, min_x, min_y, min_z]
    return rdd

# This function creates an RDD given the path for a csv file
def getRawRDD(path):
    # Returing the dataset in tuples parsing the index, x, y, and z into the required data types
    return spark.sparkContext.textFile(path).map(lambda line: line.split(","))\
        .map(lambda line: (int(line[0]), line[1], line[2], float(line[3]), float(line[4]), float(line[5]), line[6], line[7], line[8], line[9]))

In [9]:
# Checking execution time for the whole process with RDDs from csv files
# Staring timer
start_time = time.time()

# Importing each dataset into an RDD
rdd_acc_phones = parseRawRDD(spark.sparkContext.textFile(phones_acc_path))
rdd_gyr_phones = parseRawRDD(spark.sparkContext.textFile(phones_gyr_path))
rdd_acc_watches = parseRawRDD(spark.sparkContext.textFile(watch_acc_path))
rdd_gyr_watches = parseRawRDD(spark.sparkContext.textFile(watch_gyr_path))

# Transforming each RDD given the required aggregate function
rdd_acc_phones = transformRDD(rdd_acc_phones)
rdd_gyr_phones = transformRDD(rdd_gyr_phones)
rdd_acc_watches = transformRDD(rdd_acc_watches)
rdd_gyr_watches = transformRDD(rdd_gyr_watches)

# Joining RDDs by phones and watches
rdd_phones = groupData(rdd_acc_phones.union(rdd_gyr_phones))
rdd_watches = groupData(rdd_acc_watches.union(rdd_gyr_watches))

# Joining phone and whatch RDDs into a single one 
rdd_full_data = groupData(rdd_phones.union(rdd_watches))

# Showing first row of result RDD to force evaluation
print(rdd_full_data.first())

# Stop timer and calculate elapsed time
rdd_execution_time = time.time() - start_time
print("Time elapsed in RDDs:", rdd_execution_time, "seconds")

                                                                                

(('h', 'nexus4', 'stand'), [[[-4.613476210915189, 1.1175838345734932, 8.75932358809759, 0.3091236810641607, 0.38506770577906013, 0.29730100460013736, -2.7230988, 3.0986328, 9.884917999999999, -6.1807555999999995, 0.2584076, 7.4829254], [-0.00015575966694099838, -0.0006526839680054187, -0.0007058933840264764, 0.06367217266889313, 0.048615001342265755, 0.09967831960056188, 0.42042542, 0.42703247, 0.7403411999999999, -0.41392517, -0.41036987, -0.59373474]]])
Time elapsed in RDDs: 234.3363811969757 seconds


In [10]:
# Checking execution time for the whole process with Dataframes from RDDs
# Staring timer
start_time = time.time()

# Importing each dataset into an RDD and converting into Dataframe
df_acc_phones = spark.createDataFrame(getRawRDD(phones_acc_path), schema=dataset_schema)
df_gyr_phones = spark.createDataFrame(getRawRDD(phones_gyr_path), schema=dataset_schema)
df_acc_watches = spark.createDataFrame(getRawRDD(watch_acc_path), schema=dataset_schema)
df_gyr_watches = spark.createDataFrame(getRawRDD(watch_gyr_path), schema=dataset_schema)

# Grouping Dataframes and calculating stats
df_acc_phones = computeDFStats(df_acc_phones)
df_gyr_phones = computeDFStats(df_gyr_phones)
df_acc_watches = computeDFStats(df_acc_watches)
df_gyr_watches = computeDFStats(df_gyr_watches)

# Joining Dataframes by phones and watches
df_phones = regroupJoinedDataframes(df_acc_phones.union(df_gyr_phones))
df_watches = regroupJoinedDataframes(df_acc_watches.union(df_gyr_watches))

# Joining phone and whatch Dataframes into a single one 
df_full_data = df_phones.union(df_watches).groupBy(['User', 'Model', 'gt']).agg(
    flatten(collect_list('mean_x_list')).alias('mean_x_list'),
    flatten(collect_list('mean_y_list')).alias('mean_y_list'),
    flatten(collect_list('mean_z_list')).alias('mean_z_list'),
    flatten(collect_list('stddev_x_list')).alias('stddev_x_list'),
    flatten(collect_list('stddev_y_list')).alias('stddev_y_list'),
    flatten(collect_list('stddev_z_list')).alias('stddev_z_list'),
    flatten(collect_list('max_x_list')).alias('max_x_list'),
    flatten(collect_list('max_y_list')).alias('max_y_list'),
    flatten(collect_list('min_z_list')).alias('min_z_list'),
    flatten(collect_list('min_x_list')).alias('min_x_list'),
    flatten(collect_list('min_y_list')).alias('min_y_list'),
    flatten(collect_list('max_z_list')).alias('max_z_list')
)

# Showing result Dataframe to force evaluation
df_full_data.show()

# Stop timer and calculate elapsed time
df_rdd_execution_time = time.time() - start_time
print("Time elapsed in Dataframes from RDDs:", df_rdd_execution_time, "seconds")



+----+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|User|  Model|        gt|         mean_x_list|         mean_y_list|         mean_z_list|       stddev_x_list|       stddev_y_list|       stddev_z_list|          max_x_list|          max_y_list|          min_z_list|          min_x_list|          min_y_list|          max_z_list|
+----+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   a|   gear|      bike|[-7.3919511411324...|[-0.8663799732158...|[6.09251652058661...|[1.41698038260643...|[1.41059091659425...|[1.09386294202730...|[0.12030864, 2.

                                                                                

In [11]:
# Checking execution time for the whole process with Dataframes from parquet file
# Staring timer
start_time = time.time()

# Importing each dataset into a Dataframe from a parquet file
df_acc_phones = spark.read.parquet(os.path.splitext(phones_acc_path)[0] + ".parquet")
df_gyr_phones = spark.read.parquet(os.path.splitext(phones_gyr_path)[0] + ".parquet")
df_acc_watches = spark.read.parquet(os.path.splitext(watch_acc_path)[0] + ".parquet")
df_gyr_watches = spark.read.parquet(os.path.splitext(watch_gyr_path)[0] + ".parquet")

# Grouping Dataframes and calculating stats
df_acc_phones = computeDFStats(df_acc_phones)
df_gyr_phones = computeDFStats(df_gyr_phones)
df_acc_watches = computeDFStats(df_acc_watches)
df_gyr_watches = computeDFStats(df_gyr_watches)

# Joining Dataframes by phones and watches
df_phones = regroupJoinedDataframes(df_acc_phones.union(df_gyr_phones))
df_watches = regroupJoinedDataframes(df_acc_watches.union(df_gyr_watches))

# Joining phone and whatch Dataframes into a single one 
df_full_data = df_phones.union(df_watches).groupBy(['User', 'Model', 'gt']).agg(
    flatten(collect_list('mean_x_list')).alias('mean_x_list'),
    flatten(collect_list('mean_y_list')).alias('mean_y_list'),
    flatten(collect_list('mean_z_list')).alias('mean_z_list'),
    flatten(collect_list('stddev_x_list')).alias('stddev_x_list'),
    flatten(collect_list('stddev_y_list')).alias('stddev_y_list'),
    flatten(collect_list('stddev_z_list')).alias('stddev_z_list'),
    flatten(collect_list('max_x_list')).alias('max_x_list'),
    flatten(collect_list('max_y_list')).alias('max_y_list'),
    flatten(collect_list('min_z_list')).alias('min_z_list'),
    flatten(collect_list('min_x_list')).alias('min_x_list'),
    flatten(collect_list('min_y_list')).alias('min_y_list'),
    flatten(collect_list('max_z_list')).alias('max_z_list')
)

# Showing result Dataframe to force evaluation
df_full_data.show()

# Stop timer and calculate elapsed time
df_parquet_execution_time = time.time() - start_time
print("Time elapsed in Dataframes from parquet files:", df_parquet_execution_time, "seconds")



+----+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|User|  Model|        gt|         mean_x_list|         mean_y_list|         mean_z_list|       stddev_x_list|       stddev_y_list|       stddev_z_list|          max_x_list|          max_y_list|          min_z_list|          min_x_list|          min_y_list|          max_z_list|
+----+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   a|   gear|      bike|[-7.3919511411324...|[-0.8663799732158...|[6.09251652058661...|[1.41698038260643...|[1.41059091659425...|[1.09386294202730...|[0.12030864, 2.

                                                                                

In [12]:
# Checking execution time for the whole process with Dataframes from CSV file
# Staring timer
start_time = time.time()

# Importing each dataset into a Dataframe from a CSV file
df_acc_phones = spark.read.format("csv").schema(dataset_schema).load(phones_acc_path)
df_gyr_phones = spark.read.format("csv").schema(dataset_schema).load(phones_gyr_path)
df_acc_watches = spark.read.format("csv").schema(dataset_schema).load(watch_acc_path)
df_gyr_watches = spark.read.format("csv").schema(dataset_schema).load(watch_gyr_path)

# Grouping Dataframes and calculating stats
df_acc_phones = computeDFStats(df_acc_phones)
df_gyr_phones = computeDFStats(df_gyr_phones)
df_acc_watches = computeDFStats(df_acc_watches)
df_gyr_watches = computeDFStats(df_gyr_watches)

# Joining Dataframes by phones and watches
df_phones = regroupJoinedDataframes(df_acc_phones.union(df_gyr_phones))
df_watches = regroupJoinedDataframes(df_acc_watches.union(df_gyr_watches))

# Joining phone and whatch Dataframes into a single one 
df_full_data = df_phones.union(df_watches).groupBy(['User', 'Model', 'gt']).agg(
    flatten(collect_list('mean_x_list')).alias('mean_x_list'),
    flatten(collect_list('mean_y_list')).alias('mean_y_list'),
    flatten(collect_list('mean_z_list')).alias('mean_z_list'),
    flatten(collect_list('stddev_x_list')).alias('stddev_x_list'),
    flatten(collect_list('stddev_y_list')).alias('stddev_y_list'),
    flatten(collect_list('stddev_z_list')).alias('stddev_z_list'),
    flatten(collect_list('max_x_list')).alias('max_x_list'),
    flatten(collect_list('max_y_list')).alias('max_y_list'),
    flatten(collect_list('min_z_list')).alias('min_z_list'),
    flatten(collect_list('min_x_list')).alias('min_x_list'),
    flatten(collect_list('min_y_list')).alias('min_y_list'),
    flatten(collect_list('max_z_list')).alias('max_z_list')
)

# Showing result Dataframe to force evaluation
df_full_data.show()

# Stop timer and calculate elapsed time
df_csv_execution_time = time.time() - start_time
print("Time elapsed in Dataframes from CSV files:", df_csv_execution_time, "seconds")



+----+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|User|  Model|        gt|         mean_x_list|         mean_y_list|         mean_z_list|       stddev_x_list|       stddev_y_list|       stddev_z_list|          max_x_list|          max_y_list|          min_z_list|          min_x_list|          min_y_list|          max_z_list|
+----+-------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   a|   gear|      bike|[-7.3919511411324...|[-0.8663799732158...|[6.09251652058661...|[1.41698038260643...|[1.41059091659425...|[1.09386294202730...|[0.12030864, 2.

                                                                                