In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('SparkSocket') \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/11 10:14:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.option("header", "true").csv('projectData/*').drop("label_source")

                                                                                

In [3]:
label_names = [column_name.replace('label:', '') for column_name in df.columns if 'label:' in column_name]
feature_names = [column_name for column_name in df.columns if 'label:' not in column_name][1:-1]

In [4]:
from pyspark.sql.functions import col,when
from pyspark.sql.types import TimestampType
# convert df features types to Integer and Double
df = df.withColumn('timestamp', col('timestamp').cast("integer"))
for feat_name in feature_names:
    df = df.withColumn(feat_name, col(feat_name).cast("double"))
for label_name in label_names:
    label_name = 'label:' + label_name
    df = df.withColumn(label_name, when(col(label_name) == 'nan', None).otherwise(col(label_name).cast("integer")))
    #df = df.withColumn(label_name, col('label:' + label_name).cast("integer"))

In [5]:
import numpy as np
def get_sensor_names_from_features(feature_names):
    feat_sensor_names = np.array([None for feat in feature_names]);
    for (fi,feat) in enumerate(feature_names):
        if feat.startswith('raw_acc'):
            feat_sensor_names[fi] = 'Acc';
            pass;
        elif feat.startswith('proc_gyro'):
            feat_sensor_names[fi] = 'Gyro';
            pass;
        elif feat.startswith('raw_magnet'):
            feat_sensor_names[fi] = 'Magnet';
            pass;
        elif feat.startswith('watch_acceleration'):
            feat_sensor_names[fi] = 'WAcc';
            pass;
        elif feat.startswith('watch_heading'):
            feat_sensor_names[fi] = 'Compass';
            pass;
        elif feat.startswith('location'):
            feat_sensor_names[fi] = 'Loc';
            pass;
        elif feat.startswith('location_quick_features'):
            feat_sensor_names[fi] = 'Loc';
            pass;
        elif feat.startswith('audio_naive'):
            feat_sensor_names[fi] = 'Aud';
            pass;
        elif feat.startswith('audio_properties'):
            feat_sensor_names[fi] = 'AP';
            pass;
        elif feat.startswith('discrete'):
            feat_sensor_names[fi] = 'PS';
            pass;
        elif feat.startswith('lf_measurements'):
            feat_sensor_names[fi] = 'LF';
            pass;
        else:
            raise ValueError("!!! Unsupported feature name: %s" % feat);

        pass;

    return feat_sensor_names;  
feat_sensor_names = get_sensor_names_from_features(feature_names);

In [6]:
def get_selected_dataset(full_dataset, senesors_to_use, target_label):
    is_from_sensor = np.isin(feat_sensor_names, sensors_to_use);
    features_to_use = np.array(feature_names)[is_from_sensor]
    label_col = 'label:' + target_label
    features_to_use = np.insert(features_to_use, 0,'timestamp')
    columns_to_select = np.append(features_to_use, label_col)
    return df.select(*full_columns)

In [9]:
sensors_to_use = ['Acc','WAcc'];
target_label = 'FIX_walking';
df_selected = get_selected_dataset(df, sensors_to_use, target_label)

In [10]:
df_selected.filter(col('timestamp').isNull()).count()

                                                                                

0

In [12]:
# Drop lines where the label and timestamp are null
df_full = df_full.na.drop(subset=[label_col, 'timestamp'])

from pyspark.ml.feature import Imputer, StandardScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Replaces the zero values
# https://spark.apache.org/docs/latest/ml-features.html#imputer
imputer = Imputer(inputCols=features_to_use, outputCols=features_to_use)
# Assemble the features in a vector
assembler = VectorAssembler(inputCols=features_to_use, outputCol="feats")
# Standarize the data
# https://spark.apache.org/docs/latest/ml-features.html#standardscaler
scaler = StandardScaler(inputCol="feats", outputCol="feats_scaled")
# Logistic regression model
# https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")\
  .setFeaturesCol("feats_scaled")\
  .setLabelCol(label_col)

pipeline = Pipeline(stages=[imputer, assembler, scaler,mlr])

In [13]:
train, test = df_full.randomSplit([0.7, 0.3], seed=42)

In [14]:
from pyspark.sql.functions import isnan
train.filter(isnan(col(label_col))).count()

                                                                                

0

In [15]:
train.createOrReplaceTempView('TRAIN')
spark.sql('''
SELECT `label:FIX_walking`, Count(*)
FROM TRAIN
GROUP BY `label:FIX_walking`
''').show()



+-----------------+--------+
|label:FIX_walking|count(1)|
+-----------------+--------+
|                1|   15548|
|                0|  199192|
+-----------------+--------+



                                                                                

In [16]:
# The training data set is unbalanced => fix it in next steps

In [19]:
model = pipeline.fit(train)

                                                                                

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define parameter grid
grid = ParamGridBuilder().addGrid(mlr.maxIter, [100, 75]).build()

# Define evaluator
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol=label_col)

# Create CrossValidator
cross_validator = CrossValidator(estimator=mlr, estimatorParamMaps=grid, evaluator=evaluator,parallelism=2)
cross_validator = CrossValidator(estimator=pipeline,\
                                 estimatorParamMaps=grid,\
                                 evaluator=evaluator,\
                                 numFolds=3)

# Perform cross-validation
cv_model = cross_validator.fit(train)

# Get best model from cross-validation
best_model = cv_model.bestModel

                                                                                

In [26]:
cv_model.write().save('./spark-pipeline-model')

                                                                                

In [27]:
train.columns

['timestamp',
 'raw_acc:magnitude_stats:mean',
 'raw_acc:magnitude_stats:std',
 'raw_acc:magnitude_stats:moment3',
 'raw_acc:magnitude_stats:moment4',
 'raw_acc:magnitude_stats:percentile25',
 'raw_acc:magnitude_stats:percentile50',
 'raw_acc:magnitude_stats:percentile75',
 'raw_acc:magnitude_stats:value_entropy',
 'raw_acc:magnitude_stats:time_entropy',
 'raw_acc:magnitude_spectrum:log_energy_band0',
 'raw_acc:magnitude_spectrum:log_energy_band1',
 'raw_acc:magnitude_spectrum:log_energy_band2',
 'raw_acc:magnitude_spectrum:log_energy_band3',
 'raw_acc:magnitude_spectrum:log_energy_band4',
 'raw_acc:magnitude_spectrum:spectral_entropy',
 'raw_acc:magnitude_autocorrelation:period',
 'raw_acc:magnitude_autocorrelation:normalized_ac',
 'raw_acc:3d:mean_x',
 'raw_acc:3d:mean_y',
 'raw_acc:3d:mean_z',
 'raw_acc:3d:std_x',
 'raw_acc:3d:std_y',
 'raw_acc:3d:std_z',
 'raw_acc:3d:ro_xy',
 'raw_acc:3d:ro_xz',
 'raw_acc:3d:ro_yz',
 'watch_acceleration:magnitude_stats:mean',
 'watch_acceleration:m