In [None]:
#Install libraries
!pip install pyspark
!pip install findspark
 # Import Spark and FindSpark
import findspark
findspark.init()
from pyspark.sql import SparkSession
# Create Spark session
spark = SparkSession.builder.master('local[*]').appName('Basics').getOrCreate()
# Configuration that makes the format of tables clean
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
#Import libraries
from pyspark import SparkConf
from pyspark.context import SparkContext
# Create SparkContext due to the use of RDD's
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Import Data
path = "/content/drive/MyDrive/Thesis/Kopie van DogMoveData.csv"
data = spark.read.csv(path, header=True)
data.show()

+-----+-------+-----+---------+--------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+-----------+---------------+-----------+-----------+-----------+
|DogID|TestNum|t_sec|  ABack_x| ABack_y|  ABack_z|  ANeck_x|  ANeck_y|  ANeck_z|   GBack_x|   GBack_y|   GBack_z|   GNeck_x|   GNeck_y|   GNeck_z|       Task|     Behavior_1| Behavior_2| Behavior_3| PointEvent|
+-----+-------+-----+---------+--------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+-----------+---------------+-----------+-----------+-----------+
|   16|      1|    0| 0.041504|0.938965|-0.015137|-0.067871|-0.510254| -0.93457|-17.639161|-22.766115|   7.44629| -7.934571|  6.347657| 13.427735|<undefined>|    <undefined>|<undefined>|<undefined>|<undefined>|
|   16|      1| 0.01| 0.041992|0.941895| -0.02002|-0.128906|-0.494141|-0.913086|-15.075685|-11.413575|  4.821778|  -3.90625|  4.394532| 16.540528|<undefined

In [None]:
from pyspark.sql.functions import concat, col

# Use only the features corresponding to the back sensors
data_back = data.select("DogID", "TestNum", "t_sec", "ABack_x", "ABack_y", "ABack_z", "GBack_x", "GBack_y", "Gback_z", "Behavior_1", "Behavior_2", "Behavior_3")
# Add key to identify instances that correspond the a particular Dog and Testnumber
data_back = data_back.withColumn("Groupkey", concat(col("DogID"), col("TestNum")))

In [None]:
#mean of each sensor at standing behavior for positional offset

# Create DataFrame with only the Standing behavior
df_only_standing = data.filter((data["Behavior_1"] == "Standing") & (data['Behavior_2'] == "<undefined>") & (data["Behavior_3"] == "<undefined>"))
# Create DataFrame with various statistics (including the mean)
describe_statistics = df_only_standing.describe()
# Select for each feature the mean feature of the DataFrame
standing_mean_ABack_x = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("ABack_x").collect()[0][0])
standing_mean_ABack_y = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("ABack_y").collect()[0][0])
standing_mean_ABack_z = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("ABack_z").collect()[0][0])
standing_mean_GBack_x = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("GBack_x").collect()[0][0])
standing_mean_GBack_y = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("GBack_y").collect()[0][0])
standing_mean_GBack_z = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("GBack_z").collect()[0][0])
standing_mean_ANeck_x = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("ANeck_x").collect()[0][0])
standing_mean_ANeck_y = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("ANeck_y").collect()[0][0])
standing_mean_ANeck_z = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("ANeck_z").collect()[0][0])
standing_mean_GNeck_x = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("GNeck_x").collect()[0][0])
standing_mean_GNeck_y = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("GNeck_y").collect()[0][0])
standing_mean_GNeck_z = float(describe_statistics.filter(describe_statistics["summary"] == "mean").select("GNeck_z").collect()[0][0])


In [None]:
# All of the functions used in the segmentation of Back data

# Segmentation function
def segmentate(instances, window_size, step_size):
    # A list is created with all of the instances from a particalar group
    instances_list = list(instances)
    # An empty list is created
    segments = []
    # Segmentate the instances list and add each segment to the segment list
    for i in range(0, len(instances) - window_size + 1, step_size):
        segment = instances_list[i:i + window_size]
        segments.append(segment)
    return segments

# Function to extrapolate the cumulative distribution
def extrapolate(data_feature, p_values):
    # Sort the data from the feature
    data_feature_sorted = np.sort(data_feature)
    # Create a cumulative distribution
    cum_distribution = np.arange(1, len(data_feature_sorted) + 1) / len(data_feature_sorted)
    # this function traces a line through the data using the cumulative distribution
    extrapolator = interp1d(cum_distribution, data_feature_sorted, fill_value="extrapolate")
    # Extract the values of the line of the different p-values
    return extrapolator(p_values)

# Function to calculate all of new features
def calculate_features(segment):
    # Create for each feature new lists based on the data from the segments
    behavior_1 = [x[9] for x in segment]
    behavior_2 = [x[10] for x in segment]
    behavior_3 = [x[11] for x in segment]
    ABack_x = [float(x[3]) for x in segment]
    ABack_y = [float(x[4]) for x in segment]
    ABack_z = [float(x[5]) for x in segment]
    GBack_x = [float(x[6]) for x in segment]
    GBack_y = [float(x[7]) for x in segment]
    GBack_z = [float(x[8]) for x in segment]

    # Calulate the mean
    mean_ABack_x = float(np.mean(ABack_x))
    mean_ABack_y = float(np.mean(ABack_y))
    mean_ABack_z = float(np.mean(ABack_z))
    mean_GBack_x = float(np.mean(GBack_x))
    mean_GBack_y = float(np.mean(GBack_y))
    mean_GBack_z = float(np.mean(GBack_z))

    # Calculate the positional offset based on standing behavior
    positional_offset_ABack = float(np.sqrt((mean_ABack_x - standing_mean_ABack_x) ** 2 + (mean_ABack_y - standing_mean_ABack_y) ** 2 + (mean_ABack_z - standing_mean_ABack_z) ** 2))
    positional_offset_GBack = float(np.sqrt((mean_GBack_x - standing_mean_GBack_x) ** 2 + (mean_GBack_y - standing_mean_GBack_y) ** 2 + (mean_GBack_z - standing_mean_GBack_z) ** 2))

    #Calculate thesum of standard deviation
    sum_std_dev_ABack = float(np.std(ABack_x)) + float(np.std(ABack_y)) + float(np.std(ABack_z))
    sum_std_dev_GBack = float(np.std(GBack_x)) + float(np.std(GBack_y)) + float(np.std(GBack_z))

    # Calculate the sum of mean crossings
    mean_crossings_ABack_x = len(np.where(np.diff(np.sign(ABack_x - np.mean(ABack_x))))[0])
    mean_crossings_ABack_y = len(np.where(np.diff(np.sign(ABack_y - np.mean(ABack_y))))[0])
    mean_crossings_ABack_z = len(np.where(np.diff(np.sign(ABack_z - np.mean(ABack_z))))[0])
    total_mean_crossings_ABack = mean_crossings_ABack_x + mean_crossings_ABack_y + mean_crossings_ABack_z

    mean_crossings_GBack_x = len(np.where(np.diff(np.sign(GBack_x - np.mean(GBack_x))))[0])
    mean_crossings_GBack_y = len(np.where(np.diff(np.sign(GBack_y - np.mean(GBack_y))))[0])
    mean_crossings_GBack_z = len(np.where(np.diff(np.sign(GBack_z - np.mean(GBack_z))))[0])
    total_mean_crossings_GBack = mean_crossings_GBack_x + mean_crossings_GBack_y + mean_crossings_GBack_z

    #Calculate the ECDF
    # Devide the range 0 to 1 in 7 points
    p_values = np.linspace(0, 1, 9)[1:-1]
    # Calculate the ECDF using the function
    ecdf_ABack_x = extrapolate(ABack_x, p_values)
    ecdf_ABack_y = extrapolate(ABack_y, p_values)
    ecdf_ABack_z = extrapolate(ABack_z, p_values)
    # Convert all values to floats
    ecdf_ABack_x = [float(x) for x in ecdf_ABack_x]
    ecdf_ABack_y = [float(x) for x in ecdf_ABack_y]
    ecdf_ABack_z = [float(x) for x in ecdf_ABack_z]
    # Convert list to 7 individual features
    ecdf_ABack_x_1, ecdf_ABack_x_2, ecdf_ABack_x_3, ecdf_ABack_x_4, ecdf_ABack_x_5, ecdf_ABack_x_6, ecdf_ABack_x_7 = ecdf_ABack_x
    ecdf_ABack_y_1, ecdf_ABack_y_2, ecdf_ABack_y_3, ecdf_ABack_y_4, ecdf_ABack_y_5, ecdf_ABack_y_6, ecdf_ABack_y_7 = ecdf_ABack_y
    ecdf_ABack_z_1, ecdf_ABack_z_2, ecdf_ABack_z_3, ecdf_ABack_z_4, ecdf_ABack_z_5, ecdf_ABack_z_6, ecdf_ABack_z_7 = ecdf_ABack_z

    # Calculate the ECDF using the function
    ecdf_GBack_x = extrapolate(GBack_x, p_values)
    ecdf_GBack_y = extrapolate(GBack_y, p_values)
    ecdf_GBack_z = extrapolate(GBack_z, p_values)
    # Convert all values to floats
    ecdf_GBack_x = [float(x) for x in ecdf_GBack_x]
    ecdf_GBack_y = [float(x) for x in ecdf_GBack_y]
    ecdf_GBack_z = [float(x) for x in ecdf_GBack_z]
    # Convert list to 7 individual features
    ecdf_GBack_x_1, ecdf_GBack_x_2, ecdf_GBack_x_3, ecdf_GBack_x_4, ecdf_GBack_x_5, ecdf_GBack_x_6, ecdf_GBack_x_7 = ecdf_GBack_x
    ecdf_GBack_y_1, ecdf_GBack_y_2, ecdf_GBack_y_3, ecdf_GBack_y_4, ecdf_GBack_y_5, ecdf_GBack_y_6, ecdf_GBack_y_7 = ecdf_GBack_y
    ecdf_GBack_z_1, ecdf_GBack_z_2, ecdf_GBack_z_3, ecdf_GBack_z_4, ecdf_GBack_z_5, ecdf_GBack_z_6, ecdf_GBack_z_7 = ecdf_GBack_z

    # Extract the dominant behavior
    # The unique behaviors of each feature are counted, the behavior has the most occurences is returned
    dominant_behavior_1 = max(set(behavior_1), key=behavior_1.count)
    dominant_behavior_2 = max(set(behavior_2), key=behavior_2.count)
    dominant_behavior_3 = max(set(behavior_3), key=behavior_3.count)
    return mean_ABack_x, mean_ABack_y, mean_ABack_z, mean_GBack_x, mean_GBack_y, mean_GBack_z, positional_offset_ABack, positional_offset_GBack, sum_std_dev_ABack, sum_std_dev_GBack, total_mean_crossings_ABack, total_mean_crossings_GBack, ecdf_ABack_x_1, ecdf_ABack_x_2, ecdf_ABack_x_3, ecdf_ABack_x_4, ecdf_ABack_x_5, ecdf_ABack_x_6, ecdf_ABack_x_7, ecdf_ABack_y_1, ecdf_ABack_y_2, ecdf_ABack_y_3, ecdf_ABack_y_4, ecdf_ABack_y_5, ecdf_ABack_y_6, ecdf_ABack_y_7, ecdf_ABack_z_1, ecdf_ABack_z_2, ecdf_ABack_z_3, ecdf_ABack_z_4, ecdf_ABack_z_5, ecdf_ABack_z_6, ecdf_ABack_z_7, ecdf_GBack_x_1, ecdf_GBack_x_2, ecdf_GBack_x_3, ecdf_GBack_x_4, ecdf_GBack_x_5, ecdf_GBack_x_6, ecdf_GBack_x_7, ecdf_GBack_y_1, ecdf_GBack_y_2, ecdf_GBack_y_3, ecdf_GBack_y_4, ecdf_GBack_y_5, ecdf_GBack_y_6, ecdf_GBack_y_7, ecdf_GBack_z_1, ecdf_GBack_z_2, ecdf_GBack_z_3, ecdf_GBack_z_4, ecdf_GBack_z_5, ecdf_GBack_z_6, ecdf_GBack_z_7, dominant_behavior_1, dominant_behavior_2, dominant_behavior_3

# Function that combines all of the functions
def key_segments(group):
    key, instances = group
    # The rows of each group are segmentated
    segments = segmentate(instances, window_size, step_size)

    # Create an empty list
    result = []
    # For each segment, the features are calculated and added to that list
    for segment in segments:
        features = calculate_features(segment)
        result.append((key,) + features)
    return result

In [None]:
# Segmentation Back sensors

import numpy as np
from scipy.interpolate import interp1d

# Intervals of 2 seconds
window_size = 200
# Overlap of 1 second
step_size = 100

# Make use of RDD
rdd = data_back.rdd

# Group the data by key (DogID + TestNr)
grouped_rdd = rdd.groupBy(lambda x: x[12])

# Initialize all of the functions
features_rdd = grouped_rdd.flatMap(key_segments)

# Create new DataFrame using the segmented data
segmented_data_back = spark.createDataFrame(features_rdd, schema=["ID", "mean_ABack_x", "mean_ABack_y", "mean_ABack_z", "mean_GBack_x", "mean_GBack_y", "mean_GBack_z",
    "positional_offset_ABack", "positional_offset_GBack", "sum_std_dev_ABack", "sum_std_dev_GBack", "total_mean_crossings_ABack", "total_mean_crossings_GBack",
    "ecdf_ABack_x_1", "ecdf_ABack_x_2", "ecdf_ABack_x_3", "ecdf_ABack_x_4", "ecdf_ABack_x_5", "ecdf_ABack_x_6", "ecdf_ABack_x_7",
    "ecdf_ABack_y_1", "ecdf_ABack_y_2", "ecdf_ABack_y_3", "ecdf_ABack_y_4", "ecdf_ABack_y_5", "ecdf_ABack_y_6", "ecdf_ABack_y_7",
    "ecdf_ABack_z_1", "ecdf_ABack_z_2", "ecdf_ABack_z_3", "ecdf_ABack_z_4", "ecdf_ABack_z_5", "ecdf_ABack_z_6", "ecdf_ABack_z_7",
    "ecdf_GBack_x_1", "ecdf_GBack_x_2", "ecdf_GBack_x_3", "ecdf_GBack_x_4", "ecdf_GBack_x_5", "ecdf_GBack_x_6", "ecdf_GBack_x_7",
    "ecdf_GBack_y_1", "ecdf_GBack_y_2", "ecdf_GBack_y_3", "ecdf_GBack_y_4", "ecdf_GBack_y_5", "ecdf_GBack_y_6", "ecdf_GBack_y_7",
    "ecdf_GBack_z_1", "ecdf_GBack_z_2", "ecdf_GBack_z_3", "ecdf_GBack_z_4", "ecdf_GBack_z_5", "ecdf_GBack_z_6", "ecdf_GBack_z_7",
    "dominant_behavior_1", "dominant_behavior_2", "dominant_behavior_3" ])

segmented_data_back.show()

+---+--------------------+--------------------+-------------------+--------------------+--------------------+-------------------+-----------------------+-----------------------+--------------------+------------------+--------------------------+--------------------------+--------------+--------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-------------------+-------------------+-------------------+
| ID|        me

In [None]:
# Calculate total number of rows of segmented data
num_rows = segmented_data_back.count()
print(f"Number of rows: {num_rows}")

Number of rows: 99914


In [None]:
# Export the data
segmented_data_back_1file = segmented_data_back.coalesce(1)
segmented_data_back_1file.write.csv("/content/drive/MyDrive/data2", header=True, mode="append")

In [None]:
from pyspark.sql.functions import concat, col

# Use only the feautres corresponding to the Neck sensors
data_neck = data.select("DogID", "TestNum", "t_sec", "ANeck_x", "ANeck_y", "ANeck_z", "GNeck_x", "GNeck_y", "GNeck_z")
# Add key to identify instances that correpsond to a particular Dog and TestNumber
data_neck = data_neck.withColumn("Groupkey", concat(col("DogID"), col("TestNum")))

data_neck.show()

+-----+-------+-----+---------+---------+---------+----------+----------+----------+--------+
|DogID|TestNum|t_sec|  ANeck_x|  ANeck_y|  ANeck_z|   GNeck_x|   GNeck_y|   GNeck_z|Groupkey|
+-----+-------+-----+---------+---------+---------+----------+----------+----------+--------+
|   16|      1|    0|-0.067871|-0.510254| -0.93457| -7.934571|  6.347657| 13.427735|     161|
|   16|      1| 0.01|-0.128906|-0.494141|-0.913086|  -3.90625|  4.394532| 16.540528|     161|
|   16|      1| 0.02|-0.158691|-0.480469|-0.911133| -0.488281| -1.953125| 26.794435|     161|
|   16|      1| 0.03| -0.12207|-0.486816|-0.880371|  1.159668|  -5.67627|  38.08594|     161|
|   16|      1| 0.04|-0.053711|     -0.5|-0.807129|  4.577637|  4.089356| 41.503909|     161|
|   16|      1| 0.05| 0.019043|-0.463867|-0.749023| 12.939454|  20.81299|  32.53174|     161|
|   16|      1| 0.06| 0.045898|-0.376953| -0.78125|  22.70508| 29.968264| 14.038087|     161|
|   16|      1| 0.07| 0.028809|-0.307617|-0.941406| 23.92578

In [None]:
# All of the functions used in the segmentation of Neck Data

# Segmentation function
def segmentate(instances, window_size, step_size):
    # A list is created with all of the instances from a particalar group
    instances_list = list(instances)
    # An empty list is created
    segments = []
    # Segmentate the instances list and add ech segment to the segment list
    for i in range(0, len(instances) - window_size + 1, step_size):
        segment = instances_list[i:i + window_size]
        segments.append(segment)
    return segments

# Function to extrapolate the cumulative distribution
def extrapolate(data_axis, p_values):
    # Sort the data frin tge feature
    data_axis_sorted = np.sort(data_axis)
    # Create a cumulative distribution
    cum_distribution = np.arange(1, len(data_axis_sorted) + 1) / len(data_axis_sorted)
    # this function traces a line through the data using the cumulative distribution
    extrapolator = interp1d(cum_distribution, data_axis_sorted, fill_value="extrapolate")
    # Extract the values of the line of the different p-values
    return extrapolator(p_values)

# Function to calculate all of the new features
def calculate_features(segment):
    # Create for each feature new lists based on the data from the segments
    ANeck_x = [float(x[3]) for x in segment]
    ANeck_y = [float(x[4]) for x in segment]
    ANeck_z = [float(x[5]) for x in segment]
    GNeck_x = [float(x[6]) for x in segment]
    GNeck_y = [float(x[7]) for x in segment]
    GNeck_z = [float(x[8]) for x in segment]

    # Calculate the mean
    mean_ANeck_x = float(np.mean(ANeck_x))
    mean_ANeck_y = float(np.mean(ANeck_y))
    mean_ANeck_z = float(np.mean(ANeck_z))
    mean_GNeck_x = float(np.mean(GNeck_x))
    mean_GNeck_y = float(np.mean(GNeck_y))
    mean_GNeck_z = float(np.mean(GNeck_z))

    # Calculate the positional offset based on standing behavior
    positional_offset_ANeck = float(np.sqrt((mean_ANeck_x - standing_mean_ANeck_x) ** 2 + (mean_ANeck_y - standing_mean_ANeck_y) ** 2 + (mean_ANeck_z - standing_mean_ANeck_z) ** 2))
    positional_offset_GNeck = float(np.sqrt((mean_GNeck_x - standing_mean_GNeck_x) ** 2 + (mean_GNeck_y - standing_mean_GNeck_y) ** 2 + (mean_GNeck_z - standing_mean_GNeck_z) ** 2))

    #Calculate the sum of standard deviation
    sum_std_dev_ANeck = float(np.std(ANeck_x)) + float(np.std(ANeck_y)) + float(np.std(ANeck_z))
    sum_std_dev_GNeck = float(np.std(GNeck_x)) + float(np.std(GNeck_y)) + float(np.std(GNeck_z))

    #Calculate the sum of mean crossings
    mean_crossings_ANeck_x = len(np.where(np.diff(np.sign(ANeck_x - np.mean(ANeck_x))))[0])
    mean_crossings_ANeck_y = len(np.where(np.diff(np.sign(ANeck_y - np.mean(ANeck_y))))[0])
    mean_crossings_ANeck_z = len(np.where(np.diff(np.sign(ANeck_z - np.mean(ANeck_z))))[0])
    total_mean_crossings_ANeck = mean_crossings_ANeck_x + mean_crossings_ANeck_y + mean_crossings_ANeck_z

    mean_crossings_GNeck_x = len(np.where(np.diff(np.sign(GNeck_x - np.mean(GNeck_x))))[0])
    mean_crossings_GNeck_y = len(np.where(np.diff(np.sign(GNeck_y - np.mean(GNeck_y))))[0])
    mean_crossings_GNeck_z = len(np.where(np.diff(np.sign(GNeck_z - np.mean(GNeck_z))))[0])
    total_mean_crossings_GNeck = mean_crossings_GNeck_x + mean_crossings_GNeck_y + mean_crossings_GNeck_z

    #Calculate the ECDF
    # Devide tge range 0 to 1 in 7 points
    p_values = np.linspace(0, 1, 9)[1:-1]
    # Calculate the ECDF using the function
    ecdf_ANeck_x = extrapolate(ANeck_x, p_values)
    ecdf_ANeck_y = extrapolate(ANeck_y, p_values)
    ecdf_ANeck_z = extrapolate(ANeck_z, p_values)
    # Convert all values to floats
    ecdf_ANeck_x = [float(x) for x in ecdf_ANeck_x]
    ecdf_ANeck_y = [float(x) for x in ecdf_ANeck_y]
    ecdf_ANeck_z = [float(x) for x in ecdf_ANeck_z]
    # Convert list to 7 individual features
    ecdf_ANeck_x_1, ecdf_ANeck_x_2, ecdf_ANeck_x_3, ecdf_ANeck_x_4, ecdf_ANeck_x_5, ecdf_ANeck_x_6, ecdf_ANeck_x_7 = ecdf_ANeck_x
    ecdf_ANeck_y_1, ecdf_ANeck_y_2, ecdf_ANeck_y_3, ecdf_ANeck_y_4, ecdf_ANeck_y_5, ecdf_ANeck_y_6, ecdf_ANeck_y_7 = ecdf_ANeck_y
    ecdf_ANeck_z_1, ecdf_ANeck_z_2, ecdf_ANeck_z_3, ecdf_ANeck_z_4, ecdf_ANeck_z_5, ecdf_ANeck_z_6, ecdf_ANeck_z_7 = ecdf_ANeck_z

    # Calculate the ECDF using the function
    ecdf_GNeck_x = extrapolate(GNeck_x, p_values)
    ecdf_GNeck_y = extrapolate(GNeck_y, p_values)
    ecdf_GNeck_z = extrapolate(GNeck_z, p_values)
    # Convert all values to floats
    ecdf_GNeck_x = [float(x) for x in ecdf_GNeck_x]
    ecdf_GNeck_y = [float(x) for x in ecdf_GNeck_y]
    ecdf_GNeck_z = [float(x) for x in ecdf_GNeck_z]
    # Convert list to 7 individual features
    ecdf_GNeck_x_1, ecdf_GNeck_x_2, ecdf_GNeck_x_3, ecdf_GNeck_x_4, ecdf_GNeck_x_5, ecdf_GNeck_x_6, ecdf_GNeck_x_7 = ecdf_GNeck_x
    ecdf_GNeck_y_1, ecdf_GNeck_y_2, ecdf_GNeck_y_3, ecdf_GNeck_y_4, ecdf_GNeck_y_5, ecdf_GNeck_y_6, ecdf_GNeck_y_7 = ecdf_GNeck_y
    ecdf_GNeck_z_1, ecdf_GNeck_z_2, ecdf_GNeck_z_3, ecdf_GNeck_z_4, ecdf_GNeck_z_5, ecdf_GNeck_z_6, ecdf_GNeck_z_7 = ecdf_GNeck_z

    return mean_ANeck_x, mean_ANeck_y, mean_ANeck_z, mean_GNeck_x, mean_GNeck_y, mean_GNeck_z, positional_offset_ANeck, positional_offset_GNeck, sum_std_dev_ANeck, sum_std_dev_GNeck, total_mean_crossings_ANeck, total_mean_crossings_GNeck, ecdf_ANeck_x_1, ecdf_ANeck_x_2, ecdf_ANeck_x_3, ecdf_ANeck_x_4, ecdf_ANeck_x_5, ecdf_ANeck_x_6, ecdf_ANeck_x_7, ecdf_ANeck_y_1, ecdf_ANeck_y_2, ecdf_ANeck_y_3, ecdf_ANeck_y_4, ecdf_ANeck_y_5, ecdf_ANeck_y_6, ecdf_ANeck_y_7, ecdf_ANeck_z_1, ecdf_ANeck_z_2, ecdf_ANeck_z_3, ecdf_ANeck_z_4, ecdf_ANeck_z_5, ecdf_ANeck_z_6, ecdf_ANeck_z_7, ecdf_GNeck_x_1, ecdf_GNeck_x_2, ecdf_GNeck_x_3, ecdf_GNeck_x_4, ecdf_GNeck_x_5, ecdf_GNeck_x_6, ecdf_GNeck_x_7, ecdf_GNeck_y_1, ecdf_GNeck_y_2, ecdf_GNeck_y_3, ecdf_GNeck_y_4, ecdf_GNeck_y_5, ecdf_GNeck_y_6, ecdf_GNeck_y_7, ecdf_GNeck_z_1, ecdf_GNeck_z_2, ecdf_GNeck_z_3, ecdf_GNeck_z_4, ecdf_GNeck_z_5, ecdf_GNeck_z_6, ecdf_GNeck_z_7

# Function that combines all of the functions
def key_segments(group):
    key, instances = group
    # The rows of each group are segmentated
    segments = segmentate(instances, window_size, step_size)
    # Create an empty list
    result = []
    # For each segment, the featyres are calculated and added to the list
    for segment in segments:
        features = calculate_features(segment)
        result.append((key,) + features)
    return result

In [None]:
# Segmentation Neck sensors

# Intervals of 2 seconds
window_size = 200
# Overlap of 1 second
step_size = 100

# Make use of RDD
rdd = data_neck.rdd

# Group the data by key (DogID + TestNr)
grouped_rdd = rdd.groupBy(lambda x: x[9])

# Initialize all of the functions
features_rdd = grouped_rdd.flatMap(key_segments)

# Create new DataFrame using the segmented data
segmented_data_Neck = spark.createDataFrame(features_rdd, schema=["ID", "mean_ANeck_x", "mean_ANeck_y", "mean_ANeck_z", "mean_GNeck_x", "mean_GNeck_y", "mean_GNeck_z",
    "positional_offset_ANeck", "positional_offset_GNeck", "sum_std_dev_ANeck", "sum_std_dev_GNeck", "total_mean_crossings_ANeck", "total_mean_crossings_GNeck",
    "ecdf_ANeck_x_1", "ecdf_ANeck_x_2", "ecdf_ANeck_x_3", "ecdf_ANeck_x_4", "ecdf_ANeck_x_5", "ecdf_ANeck_x_6", "ecdf_ANeck_x_7",
    "ecdf_ANeck_y_1", "ecdf_ANeck_y_2", "ecdf_ANeck_y_3", "ecdf_ANeck_y_4", "ecdf_ANeck_y_5", "ecdf_ANeck_y_6", "ecdf_ANeck_y_7",
    "ecdf_ANeck_z_1", "ecdf_ANeck_z_2", "ecdf_ANeck_z_3", "ecdf_ANeck_z_4", "ecdf_ANeck_z_5", "ecdf_ANeck_z_6", "ecdf_ANeck_z_7",
    "ecdf_GNeck_x_1", "ecdf_GNeck_x_2", "ecdf_GNeck_x_3", "ecdf_GNeck_x_4", "ecdf_GNeck_x_5", "ecdf_GNeck_x_6", "ecdf_GNeck_x_7",
    "ecdf_GNeck_y_1", "ecdf_GNeck_y_2", "ecdf_GNeck_y_3", "ecdf_GNeck_y_4", "ecdf_GNeck_y_5", "ecdf_GNeck_y_6", "ecdf_GNeck_y_7",
    "ecdf_GNeck_z_1", "ecdf_GNeck_z_2", "ecdf_GNeck_z_3", "ecdf_GNeck_z_4", "ecdf_GNeck_z_5", "ecdf_GNeck_z_6", "ecdf_GNeck_z_7"])

segmented_data_Neck.show()

+---+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-----------------------+-----------------------+--------------------+------------------+--------------------------+--------------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
| ID|        mean_ANeck_x|        mean_ANeck_y|       mean_ANeck_z|        mean_GNeck

In [None]:
# Export the new segmented data
segmented_data_Neck_1file = segmented_data_Neck.coalesce(1)
segmented_data_Neck_1file.write.csv("/content/drive/MyDrive/data2", header=True, mode="append")