# Data Preprocessing

In [None]:
import findspark
findspark.init()
import pyspark #only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Data Preprocessing').getOrCreate()
spark

from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.feature import StandardScaler, PCA
from pyspark.mllib.stat import Statistics

import numpy as np
import pandas as pd

In [None]:
# Read the dataset
df = spark.read.csv('DailyDemandForecastingOrdersEditSJ.csv', header = True, inferSchema = True)
df.show()
df.printSchema()

#### Shorten the features name

#### Remove null value records

In [None]:
df1 = df1.na.drop(how = 'all')

# Check if there is any NULL values. If yes, remove them
from pyspark.sql.functions import col, count, isnan, when
df1.select([count(when(col(c).isNull(), c)).alias(c) for c in df1.columns]).show()

In [None]:
# Import Imputer class from PySpark

from pyspark.ml.feature import Imputer

# Create an instance of imputer for all missing values (numeric fields)
myImputer = Imputer(
                inputCols = ['UrgentOrd', 'FiscalSecOrd'],
                outputCols = ['UrgentOrd', 'FiscalSecOrd']
).setStrategy('mean')

df1 = myImputer.fit(df1).transform(df1)

df1.show()

In [None]:
df1.corr('UrgentOrd','FiscalSecOrd')

In [None]:
features = df1.drop('Target')

In [None]:
featuresRDD = features.rdd.map(lambda row: row[0:])
featuresRDD.collect()

#### Statistics

In [None]:
summary = Statistics.colStats(featuresRDD)
print(summary.mean())
print(summary.variance())
print(summary.numNonzeros())
print(summary.normL1())

#### Checking correlation using pearson method

#### VectorAssembler

In [None]:
df1.show()

In [None]:
features = df1.drop('Target')
assembler = VectorAssembler(inputCols=features.columns, outputCol='features')
output = assembler.transform(df1)
output.select('features', 'Target').show(truncate = False)

#### Standard Scaling

In [None]:
label = df1.select('Target')
label.show()

In [None]:
features = df1.drop('Target')

In [None]:
colNames = features.columns
featuresRDD = features.rdd.map(lambda row: row[0:])

In [None]:
featuresRDD.collect()

In [None]:
scaler1 = StandardScaler().fit(featuresRDD)

In [None]:
scaledFeatures = scaler1.transform(featuresRDD)

In [None]:
for data in scaledFeatures.collect():
    print(data)

#### PCA
Principal component analysis (PCA) is an unsupervised technique used to preprocess and reduce the dimensionality of high-dimensional datasets while preserving the original structure and relationships inherent to the original dataset so that machine learning models can still learn from them and be used to make accurate predictions.

In [None]:
pca = PCA(k=3)
pcaModel = pca.fit(scaledFeatures)

In [None]:
result = pcaModel.transform(scaledFeatures)
result.collect()

In [None]:
# store dense sector in a dataframe
df3 = result.map(lambda x: (x, )).toDF(['PCAFeatures'])
df3.show(truncate = False)