# Initialization

In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql.functions import mean, col, desc, count, isnull, isnan, when, rank, sum
import pandas as pd
import matplotlib.pyplot as plt


spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()
    # .config("spark.master", "spark://127.0.0.1:7077") \


spark.sparkContext.setLogLevel(logLevel='ERROR')


In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler, StandardScalerModel

from pyspark.ml.clustering import KMeansModel, KMeans
seed = 383
K = 4

from pyspark.ml.linalg import Vectors, DenseVector


# uber

In [26]:
uber_file = "uber-raw-data-aug14.csv"
uber = spark.read.csv(uber_file, header=True, inferSchema=True)

train, test = uber.randomSplit([0.8, 0.2], seed=seed)


# Compare Scaler

## uber

In [27]:
assembler = VectorAssembler(inputCols=['Lat', 'Lon'], outputCol="vector_features")
uber = assembler.transform(uber)

scaler = StandardScaler(inputCol='vector_features', outputCol='vector_features_std')
scaler_model = scaler.fit(uber)
uber = scaler_model.transform(uber)

print(scaler_model.mean)
print(scaler_model.std)
print(scaler_model.getWithMean())
print(scaler_model.getWithStd())


[40.73778073582314,-73.97016031316464]
[0.04362806084687051,0.06148272834516592]
False
True


## uber 20 split

In [28]:
assembler = VectorAssembler(inputCols=['Lat', 'Lon'], outputCol="vector_features")
test = assembler.transform(test)

scaler = StandardScaler(inputCol='vector_features', outputCol='vector_features_std')
scaler_model = scaler.fit(test)
test = scaler_model.transform(test)

print(scaler_model.mean)
print(scaler_model.std)
print(scaler_model.getWithMean())
print(scaler_model.getWithStd())


[40.73779190118132,-73.97022777754322]
[0.04344944241798461,0.06140209835630617]
False
True


## compare

In [29]:
x= uber.select("vector_features_std").filter((col("Date/Time") == '8/1/2014 0:00:00') & (col("Lat") == 40.7424) & (col("Lon") == -74.0044)).collect()
print(x)

y= test.select("vector_features_std").filter((col("Date/Time") == '8/1/2014 0:00:00') & (col("Lat") == 40.7424) & (col("Lon") == -74.0044)).collect()
print(y)


[Row(vector_features_std=DenseVector([933.8577, -1203.6616]))]
[Row(vector_features_std=DenseVector([937.6967, -1205.2422]))]


# Relearn Scaler

## uber

In [30]:
uber_file = "uber-raw-data-aug14.csv"
uber = spark.read.csv(uber_file, header=True, inferSchema=True)


## uber 80 split

In [31]:
assembler = VectorAssembler(inputCols=['Lat', 'Lon'], outputCol="vector_features")
train = assembler.transform(train)

scaler = StandardScaler(inputCol='vector_features', outputCol='vector_features_std')
scaler_model = scaler.fit(train)
train = scaler_model.transform(train)

print(scaler_model.mean)
print(scaler_model.std)
print(scaler_model.getWithMean())
print(scaler_model.getWithStd())


[40.737777944820685,-73.97014344910451]
[0.043672628138981366,0.06150290156661267]
False
True


## new row

In [32]:
df = spark.createDataFrame([('8/1/2014 0:00:00', 40.7424, -74.0044, 'B02598', (Vectors.dense([40.7424, -74.0044])))], ["Date/Time", "Lat", "Lon", "Base", "vector_features"])


## transform one row

In [33]:
df = scaler_model.transform(df)

# df.select("vector_features_std").show(truncate=False)
# df.show(truncate=False)

a= df.select("vector_features_std").filter((col("Date/Time") == '8/1/2014 0:00:00') & (col("Lat") == 40.7424) & (col("Lon") == -74.0044)).collect()
print(a)


[Row(vector_features_std=DenseVector([932.9047, -1203.2668]))]


## add new row to 80 split

In [34]:
new_df = train.union(df)


## transform without train

In [35]:
new_df = scaler_model.transform(new_df.select("Date/Time", "Lat", "Lon", "Base", "vector_features"))

b= new_df.select("vector_features_std").filter((col("Date/Time") == '8/1/2014 0:00:00') & (col("Lat") == 40.7424) & (col("Lon") == -74.0044)).collect()
print(b)


[Row(vector_features_std=DenseVector([932.9047, -1203.2668]))]


## train and transform

In [36]:
scaler = StandardScaler(inputCol='vector_features', outputCol='vector_features_std')
scaler_model = scaler.fit(new_df.select("Date/Time", "Lat", "Lon", "Base", "vector_features"))
new_df = scaler_model.transform(new_df.select("Date/Time", "Lat", "Lon", "Base", "vector_features"))

c= new_df.select("vector_features_std").filter((col("Date/Time") == '8/1/2014 0:00:00') & (col("Lat") == 40.7424) & (col("Lon") == -74.0044)).collect()
print(c)

print(scaler_model.mean)
print(scaler_model.std)
print(scaler_model.getWithMean())
print(scaler_model.getWithStd())


[Row(vector_features_std=DenseVector([932.9054, -1203.2674]))]
[40.73777795178752,-73.97014350073948]
[0.043672595593661455,0.06150286959490978]
False
True


# Practice Scaler

## create df

In [14]:
from pyspark.ml.linalg import Vectors, DenseVector

df = spark.createDataFrame([(Vectors.dense([0.0]),), (DenseVector([2.0]),)], ["a"])

standardScaler = StandardScaler()
standardScaler.setInputCol("a")
standardScaler.setOutputCol("scaled")

model = standardScaler.fit(df)

model.getInputCol()
model.setOutputCol("output")

print(model.mean)
print(model.std)
x = model.transform(df)
print(x.collect()[1].output)
x.show()


[1.0]
[1.4142135623730951]
[1.414213562373095]
+-----+-------------------+
|    a|             output|
+-----+-------------------+
|[0.0]|              [0.0]|
|[2.0]|[1.414213562373095]|
+-----+-------------------+



## save

In [11]:
standardScalerPath = "_standard-scaler"
standardScaler.save(standardScalerPath)
loadedStandardScaler = StandardScaler.load(standardScalerPath)
# loadedStandardScaler = StandardScalerModel.load(standardScalerPath) # err

loadedStandardScaler.getWithMean() == standardScaler.getWithMean()
loadedStandardScaler.getWithStd() == standardScaler.getWithStd()


True

## save2

In [17]:
modelPath = "_standard-scaler-model"
model.save(modelPath)
loadedModel = StandardScalerModel.load(modelPath)
# loadedModel = StandardScaler.load(modelPath) # err
loadedModel.std == model.std
loadedModel.mean == model.mean
loadedModel.transform(df).take(1) == model.transform(df).take(1)

print(loadedModel.transform(df).take(1))


[Row(a=DenseVector([0.0]), output=DenseVector([0.0]))]
