In [None]:
# Last amended: 20th Sep, 2022
# My folder: /home/ashok/Documents/spark/2.ml/1.demo
##   Objectives:
##  		i)  Usage of StringIndexer, OneHotEncoder
##              and VectorAssembler
##          ii) Usage of pipelining 
##          iii) Data Transformations
##
##
## Data: Save the following as: my.csv in 
##       /home/ashok/Documents/spark/ml/1.demo


"""
Sample of our datafile, my.csv:

c1,c2,c3,n1,n2,n3,f
a,x,1.0,0,2.1,3.2,y
a,y,3.1,1,4.2,2.4,n
b,y,1.1,0,1.4,2.5,y
b,n,2.0,0,1.3,6.7,n


"""

In [None]:
# Transfer my.csv to hadoop, as:


! hdfs dfs -rm hdfs://localhost:9000/user/ashok/my.csv
#! hdfs dfs -put /home/ashok/Documents/spark/2.ml/1.demo/my.csv hdfs://localhost:9000/user/ashok
! hdfs dfs -put /home/ashok/Downloads/my.csv  hdfs://localhost:9000/user/ashok
#! hdfs dfs -cat  hdfs://localhost:9000/user/ashok/my.csv
        

Broad Steps
1. Transform categorical data to integers (indices) using StringIndexer
2. Transform indicies to OHE form
3. Transform target seprately to integers (indices) using StrinIndexer
4. Collect all numeric and OHE features in one place using VectorAssembler
5. Perform modeling

Small steps
1. Transform categorical data to integers (indices) using StringIndexer
> i) Create a list of categorical features<br>
>ii) Create a StringIndexer object<br>
>iii)Fit and transform using this object <br>

2. Transform indices to OHE form<br>
>i) Instantiate  an OHE object<br>
>ii)Fit and transform indices createdas a result of 1(iii) above<br>

### Transfer files to hadoop
Start hadoop and issue the following three commands

```

hdfs dfs -rm hdfs://localhost:9000/user/ashok/my.csv
hdfs dfs -put /home/ashok/Documents/spark/2.ml/1.demo/my.csv hdfs://localhost:9000/user/ashok
hdfs dfs -cat  hdfs://localhost:9000/user/ashok/my.csv

```



### Call libraries

In [None]:
## 1.0 Call libraries
# 1.1   For transforming categorical data to integer and to dummy:
#       And for collecting all features at one place

from pyspark.ml.feature import  StringIndexer, OneHotEncoder, VectorAssembler

In [None]:
# 1.2   To execute all transformation operations as pipeline

#from pyspark.ml import Pipeline

In [None]:
# 1.3 Logistic Regression modeling:

from pyspark.ml.classification import LogisticRegression

In [None]:
# 1.4 For evaluating results:

from pyspark.ml.evaluation import BinaryClassificationEvaluator

### Read Data

In [None]:
# 2.0 Read demo data:

df_demo = spark.read.csv(
                         "hdfs://localhost:9000/user/ashok/my.csv",
                          header = True,
                          inferSchema = True
                         ) 


In [None]:
# 2.1 Display data:

df_demo.show()

In [None]:
# 2.1.1 Data type:

df_demo.dtypes

In [None]:
# 2.2 Lists of string and numeric columns:

cat_cols = ['c1','c2', 'c3']	
i_cols   = ['c11','c22', 'c33']             # Names after 'string indexing'

num_cols = ['n1','n2','n3']


### StrinIndex cat columns

### [What is a StringIndexer](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html)
>StringIndexer is a label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we first cast it to string and then index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. 


In [None]:
# 2.3 Integer index string columns:

# 2.3.1 Instantiate class:

si     = StringIndexer(
                        inputCols = cat_cols, 
                        outputCols = i_cols
                       )


In [None]:
# 2.3.2 train StringIndexer object:

model = si.fit(df_demo)

In [None]:
# 2.3.3 Let us see which levels have high frequencies:

df_demo.groupby('c1').count().show()
df_demo.groupby('c3').count().show()

In [None]:
# 2.3.4 Transform data and observe:

df_demo = model.transform(df_demo)
df_demo.show()


### OneHotEncode

#### [What is OneHotEncoder](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html)
>A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

In [None]:
# 3.0 One hot encoding of indexed columns:

ohe      = OneHotEncoder(
                         inputCols = ['c11','c22','c33'],
                         outputCols = ['c11vec','c22vec','c33vec']
                         )


In [None]:
# 3.1 fit the data
model_ohe = ohe.fit(df_demo)


In [None]:
# 3.2 transform the data
df_demo = model_ohe.transform(df_demo)
df_demo.show()


#### How to interpret vectors?

Consider the vector: <i>(48,[0, 1, 9],[14.1, 1.0, 1.0])</i>. This vector represents a vector of length 48, with three non-zero entries:

    i)   14.1    at the 0th position
    ii)  1.0     at the 1st position
    iii) 1.0     at the 9th position
    iv)  Rest all 45 positions would be 0.
    
 Refer [here](https://stackoverflow.com/a/38236452)   


### StringIndex target

In [None]:
# 4.0 indexing target separately
#     Generally it is customary to name
#     target as 'label'

si_label = StringIndexer(
                        inputCol = 'f',
                        outputCol= 'label'
                        ) 


In [None]:
# 4.1

model_label = si_label.fit(df_demo)


In [None]:
# 4.2 Transform dataframe:

df_demo = model_label.transform(df_demo)
df_demo.show()


### Vector Assembling

In [None]:
# 5.0 Using vectorassembler

# 5.1 Create object
#     Input cols are OHE columns + numerical columns
#     Generally output col name is 'features'

vc     = VectorAssembler(
                          inputCols = ['c11vec','c22vec','c33vec', 'n1','n2','n3'],
                          outputCol = 'features'
                         )


In [None]:
# 5.2 vc_demo does not have 'fit' method
#     only transform() is available
#     So transform the data:

df_demo = vc.transform(df_demo)


### Modeling

In [None]:
# 7.0 Instantiate Estimator:

lr = LogisticRegression(
                        labelCol="label",
                        featuresCol="features",
                        maxIter=10
                        )


In [None]:
# 7.1 fit the model
lr_model = lr.fit(df_demo)

### Predictions

In [None]:
# 8.0 Make predictions on df_demo 
#     itself using transform() method
#     (There is no predict() method)

predictions = lr_model.transform(df_demo)


In [None]:
# 8.1 What columns are contained in the output
predictions.columns

In [None]:
# 8.2 Select only relevant columns:

selected = predictions.select("label", "prediction", "probability", "rawPrediction")
selected.show(truncate = False)


### Evaluation

In [None]:
# 9.  We can make use of the BinaryClassificationEvaluator method to
#     evaluate our model.
#     The Evaluator expects two input columns: (rawPrediction, label)
#     and a value of 'metricName'
#     By default -label- parameter has value 'label', 'metricName'
#     has value of "areaUnderROC"

# 9.1 Instantiate evaluate class

evaluator = BinaryClassificationEvaluator(
                                          rawPredictionCol="rawPrediction"
                                         )

# 9.2 Evaluate to retun AUC
evaluator.evaluate(predictions)

In [None]:
#################################################################################################
######################################### Using Pipelining ######################################
#################################################################################################

### StringIndexing and OHE pipe

In [None]:
# 5. Assemble all processing objects in a pipe
#    and then use the pipe


In [None]:
# 5.1 list of stages:

ToDo= [si ,ohe, si_label]


In [None]:
# 5.2 Build pipe:

pipe = Pipeline(stages=ToDo)

In [None]:
# 5.3 Train pipe:

model_pipe = pipe.fit(df_demo)


In [None]:
# 5.4 Transform data with pipe:

df_trans = model_pipe.transform(df_demo)                       
df_trans.show()

### Vector Assembling

In [None]:
# 6.0 Using vectorassembler

# 6.1 Create object
#     Input cols are OHE columns + numerical columns
#     Generally output col name is 'features'

vc     = VectorAssembler(
                          inputCols = ['c11vec','c22vec','c33vec', 'n1','n2','n3'],
                          outputCol = 'features'
                         )


In [None]:
# 6.2 vc_demo does not have 'fit' method
#     only transform() is available
#     So transform the data:

df_demo = vc.transform(df_demo)


### Extending pipe

In [None]:
# 6.3 We add vc object to pipe

ToDo= [si,ohe, si_label, vc]


In [None]:
# 6.4 Instantiate Pipeline class:

pipe = Pipeline(stages=ToDo)

In [None]:
# 6.5 Fit pipe:

model_pipe = pipe.fit(df_demo)

In [None]:
# 6.6 Transform data:

df_trans = model_pipe.transform(df_demo)                       

In [None]:
# 6.7 Show:

df_trans.show(truncate = False)

### Modeling

In [None]:
# 7.0 Instantiate Estimator:

lr = LogisticRegression(
                        labelCol="label",
                        featuresCol="features",
                        maxIter=10
                        )


### Final pipe with modeling
What is a pipeline. Refer [here](https://spark.apache.org/docs/latest/ml-pipeline.html)

In [None]:
# 7.1 Build final pipe and use it:

ToDo= [si,ohe, si_label, vc, lr]
pipe = Pipeline(stages=ToDo)
lr_model = pipe.fit(df_demo)


### Predictions and evaluations

In [None]:
# 8.0 Make predictions on df_demo 
#     itself using transform() method
#     (There is no predict() method)

predictions = lr_model.transform(df_demo)


In [None]:
# 8.1
predictions.columns        # All columns + 3 more
                           # ['rawPrediction', 'probability', 'prediction']

In [None]:
# 8.2 
predictions.printSchema()


In [None]:
# 8.3 Select only relevant columns:

selected = predictions.select("label", "prediction", "probability", "rawPrediction")
selected.show(truncate = False)


In [None]:
# 9.  We can make use of the BinaryClassificationEvaluator method to
#     evaluate our model.
#     The Evaluator expects two input columns: (rawPrediction, label)
#     and a value of 'metricName'
#     By default -label- parameter has value 'label', 'metricName'
#     has value of "areaUnderROC"

# 9.1 Instantiate evaluate class

evaluator = BinaryClassificationEvaluator(
                                          rawPredictionCol="rawPrediction"
                                         )

# 9.2 Evaluate to retun AUC
evaluator.evaluate(predictions)

In [None]:
# 9.3  Note that the default metric for the
#      BinaryClassificationEvaluator is areaUnderROC

evaluator.getMetricName()


In [None]:
################ I am done ################

In [None]:


# 10.0 Basic Statistical operations
#      Assemble only numerical data into a vector
#      Many spark Statistical functions are available
#      only on vector data. Here is an example how
#      to use them.
#      Ref: Basic Statistics
#           https://spark.apache.org/docs/latest/ml-statistics.html#basic-statistics

from pyspark.ml.stat import Correlation

vc_corr = VectorAssembler(
                          inputCols = ['n1','n2','n3'],
                          outputCol = 'vectors'
                         )

vec = vc_corr.transform(df_demo)

r1 = Correlation.corr(vec, "vectors").head()
print("Pearson correlation matrix:\n" + str(r1[0]))


########################
# Creating polynomial or interaction features
########################
# Extracting, transforming and selecting features
#   Ref: https://spark.apache.org/docs/latest/ml-features#extracting-transforming-and-selecting-features
# 11.0
from pyspark.ml.feature import Interaction
from pyspark.sql.functions import col                                                           

# 11.1 VectorAssemble only numeric cols
vc_num = VectorAssembler(
                          inputCols = ['n1','n2','n3'],
                          outputCol = "vec"
                         )
# 11.2
df_trans = vc_num.transform(df_demo)

# 11.3 Create a similar column to vector column
df_trans = df_trans.withColumn("avec", col("vec"))

# 12.0 Instantiate Interaction class
poly= Interaction(
                  inputCols=['avec','vec'],
                  outputCol = 'features'
                 )

# 12.1 Transform and create features
df_trans = poly.transform(df_trans)
df_trans.select('vec','features').show(truncate = False)

########################
# 13.0 MinMaxScaling data
########################
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(df_trans)
df_trans = scalerModel.transform(df_trans)
df_trans.show()
########################

