In [1]:
import pickle
import numpy as np
import pandas as pd
import time

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [3]:
fname = "/Users/alaa/data/WESAD/S2/S2.pkl"

In [4]:
with open(fname, 'rb') as f:
    u = pickle._Unpickler(f)
    u.encoding = 'latin1'
    raw_data = u.load()

In [5]:
num_rows = len(raw_data['signal']['chest']['ACC'])

In [6]:
raw_data['subject']

'S2'

In [7]:
list(raw_data.keys())

['signal', 'label', 'subject']

In [8]:
keys = list(raw_data.keys())
modal_keys = list(raw_data['signal'].keys())
signal_keys = {}
for modal in modal_keys:
    signal_keys[modal] = list(raw_data['signal'][modal].keys())

In [9]:
signal_keys

{'chest': ['ACC', 'ECG', 'EMG', 'EDA', 'Temp', 'Resp'],
 'wrist': ['ACC', 'BVP', 'EDA', 'TEMP']}

In [10]:
subject = np.ones(num_rows) * 2  # S2
labels = raw_data['label']
columns = ['subject', 'label']
for modal in modal_keys:
    for signal in signal_keys[modal]:
        columns.append(modal+'-'+signal)
print(columns)

['subject', 'label', 'chest-ACC', 'chest-ECG', 'chest-EMG', 'chest-EDA', 'chest-Temp', 'chest-Resp', 'wrist-ACC', 'wrist-BVP', 'wrist-EDA', 'wrist-TEMP']


In [11]:
data = {'subject': subject, 'labels': labels}
for key in signal_keys['chest']:
    if key == 'ACC':
        data[key+'_1'] = raw_data['signal']['chest'][key][:, 0]
        data[key+'_2'] = raw_data['signal']['chest'][key][:, 1]
        data[key+'_3'] = raw_data['signal']['chest'][key][:, 2]
    else: data[key] = raw_data['signal']['chest'][key]

In [12]:
start = time.time()
pd_df = pd.DataFrame(index=np.arange(num_rows))
for colname, col in data.items():
    pd_df[colname] = col
print('time:', time.time() - start)

time: 0.1420290470123291


In [13]:
pd_df.head(5)

Unnamed: 0,subject,labels,ACC_1,ACC_2,ACC_3,ECG,EMG,EDA,Temp,Resp
0,2.0,0,0.9554,-0.222,-0.558,0.021423,-0.00444,5.250549,30.120758,-1.148987
1,2.0,0,0.9258,-0.2216,-0.5538,0.020325,0.004349,5.267334,30.129517,-1.124573
2,2.0,0,0.9082,-0.2196,-0.5392,0.016525,0.005173,5.243301,30.138214,-1.152039
3,2.0,0,0.8974,-0.2102,-0.5122,0.016708,0.007187,5.249405,30.129517,-1.158142
4,2.0,0,0.8882,-0.2036,-0.4824,0.011673,-0.015152,5.286407,30.130951,-1.161194


In [14]:
# Enable Arrow-based columnar data transfers to optimize Python-Java processes
ss.conf.set("spark.sql.execution.arrow.enabled", "true")

In [15]:
# Create Spark DF from Pandas DF
spark_df = ss.createDataFrame(pd_df)

In [16]:
spark_df.printSchema()

root
 |-- subject: double (nullable = true)
 |-- labels: integer (nullable = true)
 |-- ACC_1: double (nullable = true)
 |-- ACC_2: double (nullable = true)
 |-- ACC_3: double (nullable = true)
 |-- ECG: double (nullable = true)
 |-- EMG: double (nullable = true)
 |-- EDA: double (nullable = true)
 |-- Temp: float (nullable = true)
 |-- Resp: double (nullable = true)



In [17]:
spark_df.show()

+-------+------+------------------+--------------------+--------------------+-------------------+-------------------+------------------+---------+-----------------+
|subject|labels|             ACC_1|               ACC_2|               ACC_3|                ECG|                EMG|               EDA|     Temp|             Resp|
+-------+------+------------------+--------------------+--------------------+-------------------+-------------------+------------------+---------+-----------------+
|    2.0|     0|0.9553999900817871|-0.22200000286102295| -0.5579999685287476|   0.02142333984375|-0.0044403076171875|  5.25054931640625|30.120758|-1.14898681640625|
|    2.0|     0|0.9257999658584595| -0.2215999960899353| -0.5537999868392944|   0.02032470703125| 0.0043487548828125|    5.267333984375|30.129517|-1.12457275390625|
|    2.0|     0|0.9082000255584717|-0.21960002183914185| -0.5392000079154968| 0.0165252685546875| 0.0051727294921875|5.2433013916015625|30.138214|-1.15203857421875|
|    2.0| 