In [72]:
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, StandardScaler, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

# Keras / Deep Learning
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras import optimizers, regularizers
from keras.optimizers import Adam

# Elephas for Deep Learning on Spark
from elephas.ml_model import ElephasEstimator

In [73]:
%%time
df = spark.read.csv("hdfs://master:9000/user/data/ATKH_Oplus_TWGKHHPSK1MSB04_memory_usage_2020_11.csv", inferSchema=True, header=True)
# inferSchema referring to the type of the column
df.show(5)

+----------+--------+---------------+-------+
|      Date|    time|Allocate memory|  %used|
+----------+--------+---------------+-------+
|2020/11/30|23:57:45|     1606751865|31.6945|
|2020/11/30|23:54:46|     1606751686|31.9328|
|2020/11/30|23:51:45|     1606751505|32.8929|
|2020/11/30|23:48:45|     1606751325|34.5618|
|2020/11/30|23:45:45|     1606751145|  32.14|
+----------+--------+---------------+-------+
only showing top 5 rows

CPU times: user 1.81 ms, sys: 2.72 ms, total: 4.53 ms
Wall time: 798 ms


In [74]:
df.show(5)

+----------+--------+---------------+-------+
|      Date|    time|Allocate memory|  %used|
+----------+--------+---------------+-------+
|2020/11/30|23:57:45|     1606751865|31.6945|
|2020/11/30|23:54:46|     1606751686|31.9328|
|2020/11/30|23:51:45|     1606751505|32.8929|
|2020/11/30|23:48:45|     1606751325|34.5618|
|2020/11/30|23:45:45|     1606751145|  32.14|
+----------+--------+---------------+-------+
only showing top 5 rows



In [79]:
# Helper function to select features to scale given their skew
def select_features_to_scale(df=df, drop_cols=['Date','time','Allocate memory']):
    
    # Empty Selected Feature List for Output
    selected_features = []
    
    # Select Features to Scale based on Inputs ('in32' type, drop 'ID' columns or others, skew bounds)
    feature_list = list(df.toPandas().columns.drop(drop_cols))
    
    # Loop through 'feature_list' to select features based on Kurtosis / Skew
    for feature in feature_list:

        selected_features.append(feature)
        #if df.toPandas()[feature].kurtosis() < -2 or df.toPandas()[feature].kurtosis() > 2:
            
            
    
    # Return feature list to scale
    return selected_features

In [80]:
stages = []

unscaled_features = select_features_to_scale(df=df)
unscaled_assembler = VectorAssembler(inputCols=unscaled_features, outputCol="unscaled_features")
scaler = MinMaxScaler(min=0.0, max=1.0, inputCol="unscaled_features", outputCol="scaled_features")



stages += [unscaled_assembler, scaler]

In [81]:
scaler

MinMaxScaler_495b8494aa53

In [82]:
# Set Pipeline
pipeline = Pipeline(stages=stages)

# Fit Pipeline to Data
pipeline_model = pipeline.fit(df)

# Transform Data using Fitted Pipeline
df_transform = pipeline_model.transform(df)

In [84]:
df_transform.limit(60).toPandas()

Unnamed: 0,Date,time,Allocate memory,%used,unscaled_features,scaled_features
0,2020/11/30,23:57:45,1606751865,31.6945,[31.6945],[0.20730249328605418]
1,2020/11/30,23:54:46,1606751686,31.9328,[31.9328],[0.21053631501739048]
2,2020/11/30,23:51:45,1606751505,32.8929,[32.8929],[0.2235652375698705]
3,2020/11/30,23:48:45,1606751325,34.5618,[34.5618],[0.24621284599382007]
4,2020/11/30,23:45:45,1606751145,32.14,[32.14],[0.21334809790758305]
5,2020/11/30,23:42:45,1606750965,33.7496,[33.7496],[0.23519098275340317]
6,2020/11/30,23:39:45,1606750785,31.7739,[31.7739],[0.2083799815171415]
7,2020/11/30,23:36:45,1606750605,31.8288,[31.8288],[0.20912499541999655]
8,2020/11/30,23:33:45,1606750425,31.315,[31.315],[0.2021525337936407]
9,2020/11/30,23:30:45,1606750245,31.2396,[31.2396],[0.20112932708553008]


In [85]:
df_transform_fin = df_transform.select('scaled_features')

DataFrame[scaled_features: vector]