# Run Spark session with PySpark interface

MLlib documentation is available [here](https://spark.apache.org/docs/latest/ml-guide.html), mainly "MLlib: Main Guide" part

## Environment initializing

* Install openjdk 8 (Oracle), because Spark needs a JVM
* Download Hadoop and Spark framework and install them

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

### Install findspark (PyPI library)

* Define environment variable ``PYSPARK_PYTHON``   
* Add seek paths for python interpreter, in sys.path

In [None]:
!pip install -q findspark

### Add environement variable

* Add ``JAVA_HOME`` : where openjdk 8 is installed
* Add ``SPARK_HOME`` : where hadoop/spark framework is untar

In [None]:
import sys, os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

### Configure and import PySpark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

**Check if ``PYSPARK_PYTHON`` is declared and if python sys.path list is completed**

In [None]:
print(sys.path)
print(os.getenv('PYSPARK_PYTHON'))

['/content/spark-2.4.7-bin-hadoop2.7/python', '/content/spark-2.4.7-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip', '', '/content', '/env/python', '/usr/lib/python37.zip', '/usr/lib/python3.7', '/usr/lib/python3.7/lib-dynload', '/usr/local/lib/python3.7/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.7/dist-packages/IPython/extensions', '/root/.ipython']
/usr/bin/python3


## Start Spark Session

In [None]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("default_payment") \
                    .getOrCreate()

## Import dataset in Spark dataframe

In [None]:
df = spark.read.csv('/content/drive/My Drive/Colab Notebooks/Cours BD/loan_default_payment.csv', header=True, sep=",", inferSchema=True)

**Display a sample**

In [None]:
df.show(5)

+---+---+------+-------+------+-------+---------+--------+-------+------------------+-------------------+-----------------+
|age| ed|employ|address|income|debtinc| creddebt| othdebt|default|          preddef1|           preddef2|         preddef3|
+---+---+------+-------+------+-------+---------+--------+-------+------------------+-------------------+-----------------+
| 41|  3|    17|     12|   176|    9.3|11.359392|5.008608|      1| 0.808394327359702|  0.788640431821437|0.213043376128119|
| 27|  1|    10|      6|    31|   17.3| 1.362202|4.000798|      0| 0.198297476159104|  0.128445387038174|0.436903005506046|
| 40|  1|    15|     14|    55|    5.5| 0.856075|2.168925|      0|0.0100361080990023|0.00298677834821412|0.141022623460993|
| 41|  1|    15|     14|   120|    2.9|  2.65872| 0.82128|      0|0.0221382837594812| 0.0102732648580009|0.104422211934525|
| 24|  2|     2|      0|    28|   17.3| 1.787436|3.056564|      1| 0.781588314152869|  0.737884820480045|0.436903005506046|
+---+---

**Display data type**

In [None]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- ed: integer (nullable = true)
 |-- employ: integer (nullable = true)
 |-- address: integer (nullable = true)
 |-- income: integer (nullable = true)
 |-- debtinc: double (nullable = true)
 |-- creddebt: double (nullable = true)
 |-- othdebt: double (nullable = true)
 |-- default: string (nullable = true)
 |-- preddef1: double (nullable = true)
 |-- preddef2: double (nullable = true)
 |-- preddef3: double (nullable = true)



## Descriptives statistics

Although Pyspark have statistics and modelisation functionalities, it is much less rich than Python or R.  
**It recommanded to use Spark when it's possible, because it's efficient to distribute and parallelise treatments (cleaning, transformation, aggregation...), however for specifics statistics or modelisation, Python or R are often  preferred to Spark.**  

**All variables**

In [None]:
df.summary().show(truncate=True)

+-------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|summary|               age|                ed|           employ|          address|            income|          debtinc|          creddebt|           othdebt|            default|           preddef1|           preddef2|           preddef3|
+-------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|  count|               850|               850|              850|              850|               850|              850|               850|               850|                850|                850|                850|                850|
|   mean|35.029411764705884|1.71058823529411

**Select one variable**

In [None]:
df.select(df.age).summary().show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|               850|
|   mean|35.029411764705884|
| stddev| 8.041431634948793|
|    min|                20|
|    25%|                29|
|    50%|                34|
|    75%|                41|
|    max|                56|
+-------+------------------+



**Aggregation on one variable**

In [None]:
df.filter(df.income > 56+16).count() / df.count()

0.14941176470588236

In [None]:
from pyspark.sql.functions import mean

df.select(mean(df.age)).show()

+------------------+
|          avg(age)|
+------------------+
|35.029411764705884|
+------------------+



In [None]:
df.select(mean("age")).show()

+------------------+
|          avg(age)|
+------------------+
|35.029411764705884|
+------------------+



In [None]:
df.select(df.employ, df.age).groupBy('employ').agg({'age':'mean'}).show()

+------+------------------+
|employ|          avg(age)|
+------+------------------+
|    31|48.333333333333336|
|    28|              48.0|
|    26|              47.0|
|    27|48.333333333333336|
|    12| 35.71052631578947|
|    22| 43.84615384615385|
|     1| 29.35593220338983|
|    13|          37.84375|
|     6|33.679245283018865|
|    16| 40.63636363636363|
|     3|             31.76|
|    20|              38.0|
|     5|  32.6530612244898|
|    19|43.705882352941174|
|    15| 40.17391304347826|
|    17| 42.57142857142857|
|     9| 37.11538461538461|
|     4|31.859649122807017|
|     8|33.526315789473685|
|    23|              42.5|
+------+------------------+
only showing top 20 rows



In [None]:
df.groupBy('employ').mean('age').show()

+------+------------------+
|employ|          avg(age)|
+------+------------------+
|    31|48.333333333333336|
|    28|              48.0|
|    26|              47.0|
|    27|48.333333333333336|
|    12| 35.71052631578947|
|    22| 43.84615384615385|
|     1| 29.35593220338983|
|    13|          37.84375|
|     6|33.679245283018865|
|    16| 40.63636363636363|
|     3|             31.76|
|    20|              38.0|
|     5|  32.6530612244898|
|    19|43.705882352941174|
|    15| 40.17391304347826|
|    17| 42.57142857142857|
|     9| 37.11538461538461|
|     4|31.859649122807017|
|     8|33.526315789473685|
|    23|              42.5|
+------+------------------+
only showing top 20 rows



#### Count null, blank or distinct values for each column

**Null values**

In [None]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+---+------+-------+------+-------+--------+-------+-------+--------+--------+--------+
|age| ed|employ|address|income|debtinc|creddebt|othdebt|default|preddef1|preddef2|preddef3|
+---+---+------+-------+------+-------+--------+-------+-------+--------+--------+--------+
|  0|  0|     0|      0|     0|      0|       0|      0|      0|       0|       0|       0|
+---+---+------+-------+------+-------+--------+-------+-------+--------+--------+--------+



**Blanck values**

In [None]:
from pyspark.sql.types import IntegerType, StringType

df.select([count(when(col(c) == " ", c)).alias(c) for c in df.columns if df.schema[c].dataType == StringType()]).show()

+-------+
|default|
+-------+
|    150|
+-------+



**Distinct values**

In [None]:
from pyspark.sql.functions import countDistinct

df.select([countDistinct(col(c)).alias(c) for c in df.columns]).show()

+---+---+------+-------+------+-------+--------+-------+-------+--------+--------+--------+
|age| ed|employ|address|income|debtinc|creddebt|othdebt|default|preddef1|preddef2|preddef3|
+---+---+------+-------+------+-------+--------+-------+-------+--------+--------+--------+
| 37|  5|    33|     32|   129|    245|     842|    848|      3|     850|     850|     245|
+---+---+------+-------+------+-------+--------+-------+-------+--------+--------+--------+



#### Made filter

Here we remove black values in default column

In [None]:
print(df.count())
df = df.filter(df.default != " ")
print(df.count())

850
700


## Correlation matrix

To made statistics or Machine Learning treatments, Spark need a **vector column** which contains only numeric values, it can't treat directly raw features.  
So you have to create a vector column :  

|age|income|features_vector|
|--|--|--|
|25|35000|[25.0, 35000.0]|

**Convert one variable**

If you have alphanumeric values, you need to convert them, either in _casting_ the feature, or in using _StringIndexer_ (equivalent to labelEncoder) and _OneHotEncoderEstimator_ (equivalent to OneHotEncoder)

In [None]:
from pyspark.sql.types import IntegerType, StringType

df = df.withColumn('default', df['default'].cast(IntegerType()))

**Create vector column**

In [None]:
from pyspark.ml.feature import VectorAssembler

df = VectorAssembler(inputCols=df.drop('default').columns, outputCol="features").transform(df)

In [None]:
df.show(5, False)

+---+---+------+-------+------+-------+---------+--------+-------+------------------+-------------------+-----------------+--------------------------------------------------------------------------------------------------------+
|age|ed |employ|address|income|debtinc|creddebt |othdebt |default|preddef1          |preddef2           |preddef3         |features                                                                                                |
+---+---+------+-------+------+-------+---------+--------+-------+------------------+-------------------+-----------------+--------------------------------------------------------------------------------------------------------+
|41 |3  |17    |12     |176   |9.3    |11.359392|5.008608|1      |0.808394327359702 |0.788640431821437  |0.213043376128119|[41.0,3.0,17.0,12.0,176.0,9.3,11.359392,5.008608,0.808394327359702,0.788640431821437,0.213043376128119] |
|27 |1  |10    |6      |31    |17.3   |1.362202 |4.000798|0      |0.198297476159104 

**Matrix construction**

Pearson and Spearman methods are available

In [None]:
from pyspark.ml.stat import Correlation

corrM = Correlation.corr(df, "features", "pearson").head()

Affichage de la matrice

In [None]:
print("Pearson correlation matrix:\n" + str(corrM[0]))

Pearson correlation matrix:
DenseMatrix([[ 1.        ,  0.022325  ,  0.53649678,  0.59759074,  0.47870987,
               0.01639808,  0.29520667,  0.34021695, -0.31440563, -0.22845968,
               0.01310296],
             [ 0.022325  ,  1.        , -0.15362077,  0.05691913,  0.2351905 ,
               0.00883843,  0.08827406,  0.16545872,  0.1655189 ,  0.19031884,
               0.00723091],
             [ 0.53649678, -0.15362077,  1.        ,  0.3223343 ,  0.61968132,
              -0.03118221,  0.4036937 ,  0.40609121, -0.48956778, -0.46963907,
              -0.02515862],
             [ 0.59759074,  0.05691913,  0.3223343 ,  1.        ,  0.31624514,
               0.01132298,  0.20843505,  0.22651449, -0.28450931, -0.27292787,
               0.00774846],
             [ 0.47870987,  0.2351905 ,  0.61968132,  0.31624514,  1.        ,
              -0.02677729,  0.57019866,  0.61065941, -0.12011129, -0.11778328,
              -0.02616525],
             [ 0.01639808,  0.00883843, -0

## Khi2 / Cramers'V

**Contingency table**

In [None]:
df.crosstab('age', 'default').show(20)

+-----------+---+---+
|age_default|  0|  1|
+-----------+---+---+
|         56|  1|  0|
|         42| 15|  1|
|         24| 12| 12|
|         37| 15|  7|
|         25| 11|  9|
|         52|  5|  2|
|         20|  1|  1|
|         46| 10|  3|
|         29| 30| 14|
|         28| 23| 14|
|         38| 19|  4|
|         21|  7|  3|
|         33| 20|  5|
|         53|  2|  4|
|         41| 26|  8|
|         32| 16|  9|
|         34| 28|  5|
|         45| 15|  1|
|         22|  7|  5|
|         44|  7|  2|
+-----------+---+---+
only showing top 20 rows



**Freedom degrees number**

$(ncol - 1) * (nrow - 1)$

In [None]:
import numpy as np

nb_row = df.crosstab('age', 'default').count()-1
# -2 to subtract index also
nb_col = len(df.crosstab('age', 'default').columns)-2

print(nb_row)
print(nb_col)

36
1


**Observations number**

In [None]:
df.crosstab('age', 'default').groupby().sum().collect()[0][0]

517

**Khi2 statistic**

Including p-value and degrees of freedom   
Take in input one vector and another column, and compare each vector column to other column given

In [None]:
from pyspark.ml.stat import ChiSquareTest

chi2 = ChiSquareTest.test(df, "features", "default").head()
print("pValues: " + str(chi2.pValues))
print("degreesOfFreedom: " + str(chi2.degreesOfFreedom))
print("statistics: " + str(chi2.statistics))

pValues: [0.011724738038353943,0.021553958525744155,1.7962590415088187e-08,0.00021927360430318288,0.2859868908910044,1.2620978023791807e-06,0.45649023463368876,0.4991773829791907,0.48222831559789603,0.4822283155977797,1.2620978023791807e-06]
degreesOfFreedom: [36, 4, 31, 30, 113, 230, 694, 698, 699, 699, 230]
statistics: [57.917070932256046,11.492341703403657,95.37120351869588,65.0011769899331,121.0149460283939,345.371280529983,697.4104491021116,697.410449102112,699.9999999999972,699.999999999997,345.3712805299831]


**Cramers'V**

$\sqrt\frac{khi2}{nbObs * (min(col, row) - 1))}$

In [None]:
import numpy as np

np.sqrt(chi2.statistics[0] / (517 * 1))

0.3347017810938863

## Modelisation

**Balance of explicative variable**

In [None]:
df.groupby(df.default) \
  .count().show()

+-------+-----+
|default|count|
+-------+-----+
|      1|  183|
|      0|  517|
+-------+-----+



**Split train / test dataset**

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed = 235)

In [None]:
from pyspark.ml.classification import RandomForestClassifier

ranf = RandomForestClassifier(labelCol='default', numTrees=600, maxDepth=15, minInstancesPerNode=1, impurity="gini", 
                              featuresCol="features", seed=235, maxBins=20)

**Training model**

In [None]:
ranf = ranf.fit(train)

**Model evaluation**

_My score :_
* AUC : 0.855
* AUP : 0.715

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

pred = ranf.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="default")

AUC = evaluator.evaluate(pred, {evaluator.metricName: "areaUnderROC"})
AUP = evaluator.evaluate(pred, {evaluator.metricName: "areaUnderPR"})

print ("AUC : {0}".format(round(AUC, 3)))
print ("AUP : {0}".format(round(AUP, 3)))

AUC : 0.854
AUP : 0.714


In [None]:
ranf.totalNumNodes/600

155.13

In [None]:
ranf.toDebugString.split('\n')

['RandomForestClassificationModel (uid=RandomForestClassifier_eb2aa5b220c4) with 300 trees',
 '  Tree 0 (weight 1.0):',
 '    If (feature 9 <= 0.376159808205076)',
 '     If (feature 5 <= 19.75)',
 '      If (feature 9 <= 0.2899952530697285)',
 '       If (feature 2 <= 14.5)',
 '        If (feature 10 <= 0.46459320544218596)',
 '         If (feature 4 <= 18.5)',
 '          If (feature 7 <= 0.46839)',
 '           If (feature 6 <= 0.1946145)',
 '            Predict: 0.0',
 '           Else (feature 6 > 0.1946145)',
 '            Predict: 1.0',
 '          Else (feature 7 > 0.46839)',
 '           If (feature 4 <= 16.5)',
 '            If (feature 1 <= 2.5)',
 '             Predict: 0.0',
 '            Else (feature 1 > 2.5)',
 '             Predict: 1.0',
 '           Else (feature 4 > 16.5)',
 '            Predict: 0.0',
 '         Else (feature 4 > 18.5)',
 '          If (feature 9 <= 0.2339925679002455)',
 '           If (feature 5 <= 8.55)',
 '            If (feature 5 <= 3.55)',
 