In [1]:
## PYTHON

import pandas as pd
from numpy.random import *

# Generate the dataset
seed(42)

df = pd.DataFrame({
    'x': normal(2, 2, 50),
    'y': standard_cauchy(50),
    'z': uniform(5, 10, 50)
})

In [2]:
# Split the data into train/test samples
from sklearn.model_selection import train_test_split
train, test = train_test_split(df, test_size = 0.2)

In [3]:
# Instantiate a Scaler
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
scaler_sd = StandardScaler()
scaler_range = MinMaxScaler()

# Get scaling parameters with the train sample exclusively, using the Scaler.fit() function
scaler_sd.fit(train)
scaler_range.fit(train)

# Scale data using Scaler.transform()
df_train_scaled_sd = pd.DataFrame(scaler_sd.transform(train))
df_train_scaled_range = pd.DataFrame(scaler_range.transform(train))
df_test_scaled_sd = pd.DataFrame(scaler_sd.transform(test))
df_test_scaled_range = pd.DataFrame(scaler_range.transform(test))

In [4]:
# Check everything is ok
df_train_scaled_sd.describe()

Unnamed: 0,0,1,2
count,40.0,40.0,40.0
mean,-1.887379e-16,-3.556183e-17,-4.801715e-16
std,1.012739,1.012739,1.012739
min,-2.045427,-0.9372863,-1.609726
25%,-0.5368312,-0.2354869,-0.8752191
50%,-0.07175533,-0.1618996,0.2111035
75%,0.5543399,-0.05012685,0.795748
max,2.186865,6.080573,1.495703


In [5]:
df_train_scaled_range.describe()

Unnamed: 0,0,1,2
count,40.0,40.0,40.0
mean,0.483291,0.133557,0.518359
std,0.239289,0.144309,0.326119
min,0.0,0.0,0.0
25%,0.356449,0.100002,0.236523
50%,0.466336,0.110488,0.586337
75%,0.614269,0.126415,0.774603
max,1.0,1.0,1.0


The train sample was successfully scaled in both case.

In [6]:
df_test_scaled_sd.describe()

Unnamed: 0,0,1,2
count,10.0,10.0,10.0
mean,-0.600003,-0.392939,0.10028
std,1.042161,0.623066,0.8831
min,-1.993921,-2.141365,-1.107418
25%,-1.439699,-0.254438,-0.643848
50%,-0.586823,-0.213541,0.21575
75%,0.208046,-0.182735,0.561783
max,0.950234,0.039742,1.309156


In [7]:
df_test_scaled_range.describe()

Unnamed: 0,0,1,2
count,10.0,10.0,10.0
mean,0.341523,0.077566,0.55065
std,0.24624,0.088783,0.284373
min,0.01217,-0.171573,0.161751
25%,0.14312,0.097301,0.311029
50%,0.344637,0.103129,0.587834
75%,0.532447,0.107519,0.699262
max,0.70781,0.13922,0.939928


In [8]:
## SPARK

from pyspark.sql import SparkSession
# Specify the number of available cores in .master()
spark = SparkSession.builder.master('local[4]').appName('Scaling data with Spark').getOrCreate()

# Let us use the Pandas.DataFrame created above with NumPy/Pandas
df = spark.createDataFrame(df)

In [9]:
# Split the data into train/test samples
train, test = df.randomSplit([.8, .2], seed = 42)

In [10]:
# Gather the columns into one with a VectorAssembler, as usual in Spark
from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(inputCols=df.schema.names, outputCol="features")
train = vector_assembler.transform(train)
test = vector_assembler.transform(test)

Here we will just standardize the data. As in scikit-klearn, other scalers are available : MinMaxScaler, RobustScaler and more. More information [here](https://spark.apache.org/docs/latest/ml-features).

In [11]:
# Standardize the data using only the train sample
# This is very similar to scikit-learn preprocessing workflow
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)
scalerModel = scaler.fit(train)
train_scaled = scalerModel.transform(train)
test_scaled = scalerModel.transform(test)

In [12]:
# Check the results are consistent
from pyspark.ml.stat import Summarizer
summarizer = Summarizer.metrics("mean", 'std')
train_scaled.select(summarizer.summary(train_scaled.scaledFeatures)).show(truncate=False)

+----------------------------------------------------------------------------------+
|aggregate_metrics(scaledFeatures, 1.0)                                            |
+----------------------------------------------------------------------------------+
|[[-4.163336342344337E-17,0.0,3.677613769070831E-16], [1.0,0.9999999999999999,1.0]]|
+----------------------------------------------------------------------------------+



In [13]:
test_scaled.select(summarizer.summary(test_scaled.scaledFeatures)).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------+
|aggregate_metrics(scaledFeatures, 1.0)                                                                                     |
+---------------------------------------------------------------------------------------------------------------------------+
|[[0.24683132060177,-0.11688519638997828,-0.24137689651144545], [1.1048888799193564,0.10159371285827362,0.8946021481808285]]|
+---------------------------------------------------------------------------------------------------------------------------+

