In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf /content/spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.5.0  pyspark-shell'

In [0]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [9]:
spark

## Machine learning using Spark
### Churn prediction
Objective: Build a machine learning classifier using Spark ML to predict churn

## Data description:
Data is available in two formats xml and json and the file names are customer_profile_data.xml and customer_billing_data.json

## Data description for customer_profile_data.xml file
* customerID: Customer ID
* gender: Whether the customer is a male or a female
* SeniorCitizen: Whether the customer is a senior citizen or not (1, 0)
* Partner: Whether the customer has a partner or not (Yes, No)
* Dependents: Whether the customer has dependents or not (Yes, No)
* tenure: Number of months the customer has stayed with the company
* PhoneService: Whether the customer has a phone service or not (Yes, No)
* MultipleLines: Whether the customer has multiple lines or not (Yes, No, No phone service)
* InternetService: Customer’s internet service provider (DSL, Fiber optic, No)
* OnlineSecurity: Whether the customer has online security or not (Yes, No, No internet service)
* OnlineBackup: Whether the customer has online backup or not (Yes, No, No internet service)
* DeviceProtection: Whether the customer has device protection or not (Yes, No, No internet service)
* TechSupport: Whether the customer has tech support or not (Yes, No, No internet service)
* StreamingTV: Whether the customer has streaming TV or not (Yes, No, No internet service)
* StreamingMovies: Whether the customer has streaming movies or not (Yes, No, No internet service)
* Contract: The contract term of the customer (Month-to-month, One year, Two year)
* PaperlessBilling: Whether the customer has paperless billing or not (Yes, No)
* Churn: Whether the customer churned or not (Yes or No)

## Data description for customer_billing_data.json file
* customerID: Customer ID
* PaymentMethod: The customer’s payment method (Electronic check, Mailed check, Bank transfer (automatic), Credit card (automatic))
* MonthlyCharges: The amount charged to the customer monthly
* TotalCharges: The total amount charged to the customer

In [0]:
billing_df = spark.read.format('json')\
    .load('customer_billing_data.json')

In [8]:
billing_df.show()

+--------------+--------------------+------------+----------+
|MonthlyCharges|       PaymentMethod|TotalCharges|customerID|
+--------------+--------------------+------------+----------+
|          65.6|        Mailed check|       593.3|0002-ORFBO|
|          59.9|        Mailed check|       542.4|0003-MKNFE|
|          73.9|    Electronic check|      280.85|0004-TLHLJ|
|          98.0|    Electronic check|     1237.85|0011-IGKFF|
|          83.9|        Mailed check|       267.4|0013-EXCHZ|
|          69.4|Credit card (auto...|      571.45|0013-MHZWF|
|         109.7|Bank transfer (au...|     7904.25|0013-SMEOE|
|         84.65|Credit card (auto...|      5377.8|0014-BMAQU|
|          48.2|    Electronic check|      340.35|0015-UOCOJ|
|         90.45|        Mailed check|      5957.9|0016-QLJIS|
|          45.2|Credit card (auto...|     2460.55|0017-DINOC|
|         116.8|Credit card (auto...|     8456.75|0017-IUDMW|
|         101.3|Bank transfer (au...|     7261.25|0019-EFAEP|
|       

In [0]:
profile_df = spark.read.format('xml')\
    .option('rowTag', 'customer_profile')\
    .load('customer_profile_data.xml')

In [6]:
profile_df.show()

+-----+--------------+----------+-------------------+---------------+----------------+-------------------+-------------------+----------------+-------+------------+-------------+-------------------+-------------------+-------------------+----------+------+------+
|Churn|      Contract|Dependents|   DeviceProtection|InternetService|   MultipleLines|       OnlineBackup|     OnlineSecurity|PaperlessBilling|Partner|PhoneService|SeniorCitizen|    StreamingMovies|        StreamingTV|        TechSupport|customerID|gender|tenure|
+-----+--------------+----------+-------------------+---------------+----------------+-------------------+-------------------+----------------+-------+------------+-------------+-------------------+-------------------+-------------------+----------+------+------+
|   No|      One year|       Yes|                 No|            DSL|              No|                Yes|                 No|             Yes|    Yes|         Yes|            0|                 No|          

Verify the row counts of the two DataFrames and check whether the counts are matching or not

In [0]:
if profile_df.count() == billing_df.count():
    print('row counts matched in both dataframes')
else:
    print('row counts not matched')

row counts matched in both dataframes


In [0]:
print('row counts :', profile_df.count(), billing_df.count())

row counts : 6311 6311


Verify columns

In [0]:
print(profile_df.columns)

['Churn', 'Contract', 'Dependents', 'DeviceProtection', 'InternetService', 'MultipleLines', 'OnlineBackup', 'OnlineSecurity', 'PaperlessBilling', 'Partner', 'PhoneService', 'SeniorCitizen', 'StreamingMovies', 'StreamingTV', 'TechSupport', 'customerID', 'gender', 'tenure']


In [0]:
print(billing_df.columns)

['MonthlyCharges', 'PaymentMethod', 'TotalCharges', 'customerID']


# Common column name in two dataframes

In [0]:
profile_df_cols = profile_df.columns
billing_df_cols = billing_df.columns

[item for i, item in enumerate(profile_df_cols) if item in billing_df_cols]

['customerID']

## Combine two dataframes

In [0]:
combined_df = profile_df.join(billing_df, 'customerID')

## Verify dataframe

In [0]:
print('row counts: ', combined_df.count())

row counts:  6311


In [0]:
combined_df.show(4)

+----------+-----+--------------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+----------------+------------+
|customerID|Churn|      Contract|Dependents|DeviceProtection|InternetService|MultipleLines|OnlineBackup|OnlineSecurity|PaperlessBilling|Partner|PhoneService|SeniorCitizen|StreamingMovies|StreamingTV|TechSupport|gender|tenure|MonthlyCharges|   PaymentMethod|TotalCharges|
+----------+-----+--------------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+----------------+------------+
|0002-ORFBO|   No|      One year|       Yes|              No|            DSL|           No|         Yes|            No|             Yes|    Yes|         Yes|            0|             No|

In [0]:
import pyspark.sql.functions as F

In [0]:
combined_df.select([
    F.count(F.when(F.isnan(F.col(c)) | F.col(c).isNull(), F.col(c))).alias(c)
    for c in combined_df.columns
]).show()

+----------+-----+--------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+-------------+------------+
|customerID|Churn|Contract|Dependents|DeviceProtection|InternetService|MultipleLines|OnlineBackup|OnlineSecurity|PaperlessBilling|Partner|PhoneService|SeniorCitizen|StreamingMovies|StreamingTV|TechSupport|gender|tenure|MonthlyCharges|PaymentMethod|TotalCharges|
+----------+-----+--------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+-------------+------------+
|         0|    0|       0|         0|               0|              0|            0|           0|             0|               0|      0|           0|            0|              0|          0|          0|     0|  

In [0]:
combined_df.select([
    F.count(F.when(F.col(c) == ' ', F.col(c))).alias(c)
    for c in combined_df.columns
]).show()

+----------+-----+--------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+-------------+------------+
|customerID|Churn|Contract|Dependents|DeviceProtection|InternetService|MultipleLines|OnlineBackup|OnlineSecurity|PaperlessBilling|Partner|PhoneService|SeniorCitizen|StreamingMovies|StreamingTV|TechSupport|gender|tenure|MonthlyCharges|PaymentMethod|TotalCharges|
+----------+-----+--------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+-------------+------------+
|         0|    0|       0|         0|               0|              0|            0|           0|             0|               0|      0|           0|            0|              0|          0|          0|     0|  

In [0]:
combined_df = combined_df.filter(F.col('TotalCharges') != ' ')

In [0]:
combined_df.count()

6300

In [0]:
combined_df.select([
    F.count(F.when(F.col(c) == ' ', F.col(c))).alias(c)
    for c in combined_df.columns
]).show()

+----------+-----+--------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+-------------+------------+
|customerID|Churn|Contract|Dependents|DeviceProtection|InternetService|MultipleLines|OnlineBackup|OnlineSecurity|PaperlessBilling|Partner|PhoneService|SeniorCitizen|StreamingMovies|StreamingTV|TechSupport|gender|tenure|MonthlyCharges|PaymentMethod|TotalCharges|
+----------+-----+--------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+-------------+------------+
|         0|    0|       0|         0|               0|              0|            0|           0|             0|               0|      0|           0|            0|              0|          0|          0|     0|  

# Verify the type of the ‘TotalCharges’ column, change it to Double if not

In [0]:
combined_df.dtypes

[('customerID', 'string'),
 ('Churn', 'string'),
 ('Contract', 'string'),
 ('Dependents', 'string'),
 ('DeviceProtection', 'string'),
 ('InternetService', 'string'),
 ('MultipleLines', 'string'),
 ('OnlineBackup', 'string'),
 ('OnlineSecurity', 'string'),
 ('PaperlessBilling', 'string'),
 ('Partner', 'string'),
 ('PhoneService', 'string'),
 ('SeniorCitizen', 'bigint'),
 ('StreamingMovies', 'string'),
 ('StreamingTV', 'string'),
 ('TechSupport', 'string'),
 ('gender', 'string'),
 ('tenure', 'bigint'),
 ('MonthlyCharges', 'double'),
 ('PaymentMethod', 'string'),
 ('TotalCharges', 'string')]

In [0]:
combined_df = combined_df.withColumn('TotalCharges', 
                                     F.col('TotalCharges').cast('double'))

In [0]:
combined_df.dtypes

[('customerID', 'string'),
 ('Churn', 'string'),
 ('Contract', 'string'),
 ('Dependents', 'string'),
 ('DeviceProtection', 'string'),
 ('InternetService', 'string'),
 ('MultipleLines', 'string'),
 ('OnlineBackup', 'string'),
 ('OnlineSecurity', 'string'),
 ('PaperlessBilling', 'string'),
 ('Partner', 'string'),
 ('PhoneService', 'string'),
 ('SeniorCitizen', 'bigint'),
 ('StreamingMovies', 'string'),
 ('StreamingTV', 'string'),
 ('TechSupport', 'string'),
 ('gender', 'string'),
 ('tenure', 'bigint'),
 ('MonthlyCharges', 'double'),
 ('PaymentMethod', 'string'),
 ('TotalCharges', 'double')]

# Replace the values in ‘SeniorCitizen’ column as

In [0]:
combined_df.select('SeniorCitizen').distinct().show()

+-------------+
|SeniorCitizen|
+-------------+
|            0|
|            1|
+-------------+



In [0]:
combined_df = combined_df.withColumn(
    'SeniorCitizen',
    F.when(F.col('SeniorCitizen') == 0, 'No').otherwise('Yes'))

In [0]:
combined_df.select('SeniorCitizen').distinct().show()

+-------------+
|SeniorCitizen|
+-------------+
|           No|
|          Yes|
+-------------+



In [0]:
## Create View for the new dataframe
combined_df.registerTempTable("combined_df_table")

In [0]:
result_df = combined_df.groupBy('Churn').count()
result_df.show()

+-----+-----+
|Churn|count|
+-----+-----+
|   No| 4641|
|  Yes| 1659|
+-----+-----+



In [0]:
## Multiple ways of referring a column in a dataframe
result_df = spark.sql("""
SELECT Churn, COUNT(*) AS Count
FROM combined_df_table
GROUP BY Churn
""")

result_df.show()

+-----+-----+
|Churn|Count|
+-----+-----+
|   No| 4641|
|  Yes| 1659|
+-----+-----+



# Bin the data in the tenure column
# Use bucketizer available in pyspark.ml.feature

Bin the data into 7 bins as follows

* Split-1 for tenure <= 10
* Split-2 for tenure > 10 and <= 20
* Split-3 for tenure > 20 and <= 30
* Split-4 for tenure > 30 and <= 40
* Split-5 for tenure > 40 and <= 50
* Split-6 for tenure > 50 and <= 60
* Split-7 for tenure > 60

In [0]:
from pyspark.ml.feature import Bucketizer

splits = [
    -float('inf'), 10.01, 20.01, 30.01, 40.01, 50.01, 60.01,
    float('inf')
]

bucketizer = Bucketizer(
    splits=splits, inputCol='tenure', outputCol='bucketized_tenure')

# Transform original data into its bucket index.
combined_df = bucketizer.transform(combined_df)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
combined_df.select("tenure", "bucketized_tenure").show(10)

Bucketizer output with 7 buckets
+------+-----------------+
|tenure|bucketized_tenure|
+------+-----------------+
|     9|              0.0|
|     9|              0.0|
|     4|              0.0|
|    13|              1.0|
|     3|              0.0|
|     9|              0.0|
|    71|              6.0|
|    63|              6.0|
|     7|              0.0|
|    65|              6.0|
+------+-----------------+
only showing top 10 rows



In [0]:
combined_df.show(4)

+----------+-----+--------------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+----------------+------------+-----------------+
|customerID|Churn|      Contract|Dependents|DeviceProtection|InternetService|MultipleLines|OnlineBackup|OnlineSecurity|PaperlessBilling|Partner|PhoneService|SeniorCitizen|StreamingMovies|StreamingTV|TechSupport|gender|tenure|MonthlyCharges|   PaymentMethod|TotalCharges|bucketized_tenure|
+----------+-----+--------------+----------+----------------+---------------+-------------+------------+--------------+----------------+-------+------------+-------------+---------------+-----------+-----------+------+------+--------------+----------------+------------+-----------------+
|0002-ORFBO|   No|      One year|       Yes|              No|            DSL|           No|         Yes|            No|             Y

In [0]:
combined_df.select('bucketized_tenure').distinct().show()

+-----------------+
|bucketized_tenure|
+-----------------+
|              0.0|
|              1.0|
|              4.0|
|              3.0|
|              2.0|
|              6.0|
|              5.0|
+-----------------+



### Add/Derive a new column ‘tenure_group’ from the above ‘bucketized_tenure’ column as below, using either broadcast join or udf.

Map the 7 values 0.0 to 0.6 as below
* 0.0 as tenure_0-10
* 1.0 as tenure_10-20
* 2.0 as tenure_20-30
* 3.0 as tenure_30-40
* 4.0 as tenure_40-50
* 5.0 as tenure_50-60
* 6.0 as tenure_60-and-above

** Using broadcase join

In [0]:
replacing_df = spark.createDataFrame([
    (0.0, 'tenure_0-10'),
    (1.0, 'tenure_10-20'),
    (2.0, 'tenure_20-30'),
    (3.0, 'tenure_30-40'),
    (4.0, 'tenure_40-50'),
    (5.0, 'tenure_50-60'),
    (6.0, 'tenure_60-and-above')], ['bucketized_tenure', 'tenure_group'])

replacing_df.show()

+-----------------+-------------------+
|bucketized_tenure|       tenure_group|
+-----------------+-------------------+
|              0.0|        tenure_0-10|
|              1.0|       tenure_10-20|
|              2.0|       tenure_20-30|
|              3.0|       tenure_30-40|
|              4.0|       tenure_40-50|
|              5.0|       tenure_50-60|
|              6.0|tenure_60-and-above|
+-----------------+-------------------+



In [0]:
combined_df1 = combined_df.join(
    F.broadcast(replacing_df), "bucketized_tenure")

In [0]:
combined_df1.select('tenure', 'bucketized_tenure', 'tenure_group').show(4)

+------+-----------------+------------+
|tenure|bucketized_tenure|tenure_group|
+------+-----------------+------------+
|     9|              0.0| tenure_0-10|
|     9|              0.0| tenure_0-10|
|     4|              0.0| tenure_0-10|
|    13|              1.0|tenure_10-20|
+------+-----------------+------------+
only showing top 4 rows



In [0]:
from pyspark.sql.types import *

def replace_udf(column_value):
    if   column_value == 0.0:
        return 'tenure_0-10'
    elif column_value == 1.0:
        return 'tenure_10-20'
    elif column_value == 2.0:
        return 'tenure_20-30'
    elif column_value == 3.0:
        return 'tenure_30-40'
    elif column_value == 4.0:
        return 'tenure_40-50'
    elif column_value == 5.0:
        return 'tenure_50-60'
    else:
        return 'tenure_60-and-above'
    
from pyspark.sql.functions import udf
    
python_udf = udf(replace_udf)

In [0]:
combined_df = combined_df.withColumn('tenure_group', 
                                     python_udf('bucketized_tenure'))

In [0]:
combined_df.show()

+----------+-----+--------------+----------+-------------------+---------------+----------------+-------------------+-------------------+----------------+-------+------------+-------------+-------------------+-------------------+-------------------+------+------+--------------+--------------------+------------+-----------------+-------------------+
|customerID|Churn|      Contract|Dependents|   DeviceProtection|InternetService|   MultipleLines|       OnlineBackup|     OnlineSecurity|PaperlessBilling|Partner|PhoneService|SeniorCitizen|    StreamingMovies|        StreamingTV|        TechSupport|gender|tenure|MonthlyCharges|       PaymentMethod|TotalCharges|bucketized_tenure|       tenure_group|
+----------+-----+--------------+----------+-------------------+---------------+----------------+-------------------+-------------------+----------------+-------+------------+-------------+-------------------+-------------------+-------------------+------+------+--------------+--------------------

In [0]:
combined_df = combined_df.drop('bucketized_tenure')

In [0]:
combined_df.columns

['customerID',
 'Churn',
 'Contract',
 'Dependents',
 'DeviceProtection',
 'InternetService',
 'MultipleLines',
 'OnlineBackup',
 'OnlineSecurity',
 'PaperlessBilling',
 'Partner',
 'PhoneService',
 'SeniorCitizen',
 'StreamingMovies',
 'StreamingTV',
 'TechSupport',
 'gender',
 'tenure',
 'MonthlyCharges',
 'PaymentMethod',
 'TotalCharges',
 'tenure_group']

In [0]:
result_df = combined_df.crosstab('tenure_group', 'Churn')
result_df.show()

+-------------------+----+---+
| tenure_group_Churn|  No|Yes|
+-------------------+----+---+
|       tenure_10-20| 552|253|
|       tenure_20-30| 529|160|
|       tenure_30-40| 461|124|
|       tenure_50-60| 545| 91|
|tenure_60-and-above|1187| 83|
|       tenure_40-50| 480|103|
|        tenure_0-10| 887|845|
+-------------------+----+---+



## Identify the categorical and numerical columns.
* Note.
    * customerID is unique identifier of each customer.
    * Churn – Whether the customer Churn or NOT (Target attribute).

In [0]:
id_col = ['customerID']
target_col = ['Churn']

In [0]:
dtypes_list = combined_df.dtypes
dtypes_list

[('customerID', 'string'),
 ('Churn', 'string'),
 ('Contract', 'string'),
 ('Dependents', 'string'),
 ('DeviceProtection', 'string'),
 ('InternetService', 'string'),
 ('MultipleLines', 'string'),
 ('OnlineBackup', 'string'),
 ('OnlineSecurity', 'string'),
 ('PaperlessBilling', 'string'),
 ('Partner', 'string'),
 ('PhoneService', 'string'),
 ('SeniorCitizen', 'string'),
 ('StreamingMovies', 'string'),
 ('StreamingTV', 'string'),
 ('TechSupport', 'string'),
 ('gender', 'string'),
 ('tenure', 'bigint'),
 ('MonthlyCharges', 'double'),
 ('PaymentMethod', 'string'),
 ('TotalCharges', 'double'),
 ('tenure_group', 'string')]

In [0]:
cat_cols = [each_val[0] for each_val in dtypes_list
            if each_val[1] == 'string' and
               each_val[0] not in id_col + target_col]

In [0]:
cat_cols

['Contract',
 'Dependents',
 'DeviceProtection',
 'InternetService',
 'MultipleLines',
 'OnlineBackup',
 'OnlineSecurity',
 'PaperlessBilling',
 'Partner',
 'PhoneService',
 'SeniorCitizen',
 'StreamingMovies',
 'StreamingTV',
 'TechSupport',
 'gender',
 'PaymentMethod',
 'tenure_group']

In [0]:
num_cols = [each_col for each_col in combined_df.columns
               if each_col not in id_col + target_col + cat_cols]

In [0]:
combined_df = combined_df.drop('tenure_group')

In [0]:
cat_cols = cat_cols[:-1]
cat_cols

['Contract',
 'Dependents',
 'DeviceProtection',
 'InternetService',
 'MultipleLines',
 'OnlineBackup',
 'OnlineSecurity',
 'PaperlessBilling',
 'Partner',
 'PhoneService',
 'SeniorCitizen',
 'StreamingMovies',
 'StreamingTV',
 'TechSupport',
 'gender',
 'PaymentMethod']

In [0]:
combined_df = combined_df.withColumn(
    'label',
    F.when(F.col('Churn') == 'No', 0.0).otherwise(1.0))

In [0]:
combined_df.dtypes

[('customerID', 'string'),
 ('Churn', 'string'),
 ('Contract', 'string'),
 ('Dependents', 'string'),
 ('DeviceProtection', 'string'),
 ('InternetService', 'string'),
 ('MultipleLines', 'string'),
 ('OnlineBackup', 'string'),
 ('OnlineSecurity', 'string'),
 ('PaperlessBilling', 'string'),
 ('Partner', 'string'),
 ('PhoneService', 'string'),
 ('SeniorCitizen', 'string'),
 ('StreamingMovies', 'string'),
 ('StreamingTV', 'string'),
 ('TechSupport', 'string'),
 ('gender', 'string'),
 ('tenure', 'bigint'),
 ('MonthlyCharges', 'double'),
 ('PaymentMethod', 'string'),
 ('TotalCharges', 'double'),
 ('label', 'double')]

In [0]:
combined_df = combined_df.drop('Churn')

## Pipelines

### Define SparkML pipelines
Stages
  * Preprocessing
  * Model Building & Evaluation
Define the SparkMl Pipelines for the preprocessing stages as below.
  1. Combine all the numerical columns as vector (VectorAssembler for numerical columns)
  2. Standardize the values in the above vector using StandardScaler available in pyspark.ml.feature
  3. StringIndexer and OneHotEncoderEstimator for the categorical columns.
  4. Combine all the above vectors created by OneHotEncoderEstimator as a single vector. (VectorAssembler for categorical col vectors)
  5. Combine both the scaled numerical vector and combined categorical vectors (for the vectors obtained in ii and iv using VectorAssembler.



In [0]:
from pyspark.ml.feature import VectorAssembler

num_cols_assembler = VectorAssembler(inputCols = num_cols,
                                     outputCol = 'num_cols_vector')

In [0]:
from pyspark.ml.feature import StandardScaler

num_cols_standardizer = StandardScaler(inputCol = 'num_cols_vector',
                                       outputCol = 'scaled_num_features',
                                       withStd = True, withMean = True)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

encoding_stages = []

for each_col in cat_cols:
    # Category indexing using sting indexer
    stringIndexer = StringIndexer(
        inputCol = each_col, outputCol = each_col + 'Index')

    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    oneHotEncoder = OneHotEncoderEstimator(
        inputCols=[stringIndexer.getOutputCol()],
        outputCols=[each_col + 'classVector'],
        handleInvalid='keep')
    
    # Add stages.  These are not run here, but will run all at once later on.
    encoding_stages += [stringIndexer, oneHotEncoder]

In [0]:
encoding_stages

[StringIndexer_07d3c0718ad6,
 OneHotEncoderEstimator_1c16ec8f8e6c,
 StringIndexer_f594ceee29fb,
 OneHotEncoderEstimator_465ffe67b8c1,
 StringIndexer_9dcb13c15635,
 OneHotEncoderEstimator_7ddb5c72e6ff,
 StringIndexer_22245c37aa11,
 OneHotEncoderEstimator_7f4b40f62cda,
 StringIndexer_5ea9978dbcd1,
 OneHotEncoderEstimator_0182290a8ab9,
 StringIndexer_51e19ebae141,
 OneHotEncoderEstimator_db12f8a2ccc0,
 StringIndexer_f94d912f708b,
 OneHotEncoderEstimator_338cbabe7660,
 StringIndexer_86eecf2b6d84,
 OneHotEncoderEstimator_f7a0d785e777,
 StringIndexer_26ed15ec53a3,
 OneHotEncoderEstimator_3eaf8326e1aa,
 StringIndexer_826fcaec8528,
 OneHotEncoderEstimator_3674a1287195,
 StringIndexer_64784358b353,
 OneHotEncoderEstimator_72d83293f9b4,
 StringIndexer_0fcb4da70e9a,
 OneHotEncoderEstimator_ffa08cd6ce93,
 StringIndexer_f32f3665d416,
 OneHotEncoderEstimator_25c98cac92a5,
 StringIndexer_e49f26f1ab76,
 OneHotEncoderEstimator_26139c26ad66,
 StringIndexer_e42976355912,
 OneHotEncoderEstimator_101dbc94e

In [0]:
cat_cols_vectors_list = [col + 'classVector' for col in cat_cols]
cat_vecs_assembler = VectorAssembler(inputCols = cat_cols_vectors_list,
                                     outputCol = 'cat_cols_vector')

In [0]:
assembler_all_features = VectorAssembler(
    inputCols=['scaled_num_features', 'cat_cols_vector'], outputCol='features')

In [0]:
pre_processing_stages = [num_cols_assembler] + \
                        [num_cols_standardizer] + \
                        encoding_stages + \
                        [cat_vecs_assembler] + \
                        [assembler_all_features]

In [0]:
pre_processing_stages

[VectorAssembler_585cff5c3188,
 StandardScaler_4a2c94481845,
 StringIndexer_07d3c0718ad6,
 OneHotEncoderEstimator_1c16ec8f8e6c,
 StringIndexer_f594ceee29fb,
 OneHotEncoderEstimator_465ffe67b8c1,
 StringIndexer_9dcb13c15635,
 OneHotEncoderEstimator_7ddb5c72e6ff,
 StringIndexer_22245c37aa11,
 OneHotEncoderEstimator_7f4b40f62cda,
 StringIndexer_5ea9978dbcd1,
 OneHotEncoderEstimator_0182290a8ab9,
 StringIndexer_51e19ebae141,
 OneHotEncoderEstimator_db12f8a2ccc0,
 StringIndexer_f94d912f708b,
 OneHotEncoderEstimator_338cbabe7660,
 StringIndexer_86eecf2b6d84,
 OneHotEncoderEstimator_f7a0d785e777,
 StringIndexer_26ed15ec53a3,
 OneHotEncoderEstimator_3eaf8326e1aa,
 StringIndexer_826fcaec8528,
 OneHotEncoderEstimator_3674a1287195,
 StringIndexer_64784358b353,
 OneHotEncoderEstimator_72d83293f9b4,
 StringIndexer_0fcb4da70e9a,
 OneHotEncoderEstimator_ffa08cd6ce93,
 StringIndexer_f32f3665d416,
 OneHotEncoderEstimator_25c98cac92a5,
 StringIndexer_e49f26f1ab76,
 OneHotEncoderEstimator_26139c26ad66,
 

In [0]:
from pyspark.ml import Pipeline

partialPipeLine = Pipeline().setStages(pre_processing_stages)
pipelineModel = partialPipeLine.fit(combined_df)
prepped_data_df = pipelineModel.transform(combined_df)
prepped_data_df.cache()

DataFrame[customerID: string, Contract: string, Dependents: string, DeviceProtection: string, InternetService: string, MultipleLines: string, OnlineBackup: string, OnlineSecurity: string, PaperlessBilling: string, Partner: string, PhoneService: string, SeniorCitizen: string, StreamingMovies: string, StreamingTV: string, TechSupport: string, gender: string, tenure: bigint, MonthlyCharges: double, PaymentMethod: string, TotalCharges: double, label: double, num_cols_vector: vector, scaled_num_features: vector, ContractIndex: double, ContractclassVector: vector, DependentsIndex: double, DependentsclassVector: vector, DeviceProtectionIndex: double, DeviceProtectionclassVector: vector, InternetServiceIndex: double, InternetServiceclassVector: vector, MultipleLinesIndex: double, MultipleLinesclassVector: vector, OnlineBackupIndex: double, OnlineBackupclassVector: vector, OnlineSecurityIndex: double, OnlineSecurityclassVector: vector, PaperlessBillingIndex: double, PaperlessBillingclassVector:

In [0]:
prepped_data_df.select('label', 'features').show(3,truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                |
+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0.0  |(46,[0,1,2,5,7,8,12,14,18,20,23,26,27,29,31,35,38,41,43],[-0.9640259084652045,0.024022318899511584,-0.7534155676632999,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|0.0  |(46,[0,1,2,3,6,8,12,15,17,20,24,25,27,29,32,34,37,40,43],[-0.9640259084652045,-0.16557129678453888,-0.7758451061827129,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|1.0  |(46,[0,1,2,3,6,9,11,14,17,20

In [0]:
train, validation = combined_df.randomSplit([0.7, 0.3])
print("There are %d training examples and %d test examples." %
      (train.count(), validation.count()))

There are 4381 training examples and 1919 test examples.


In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, labelCol="label", featuresCol="features")

In [0]:
from pyspark.ml import Pipeline

lr_Pipeline = Pipeline(stages=pre_processing_stages+[lr])

lr_Pipeline_model = lr_Pipeline.fit(train)

In [0]:
model_lr = lr_Pipeline_model.stages[-1]

In [0]:
model_lr.coefficients

DenseVector([-0.43, 0.1155, -0.241, 0.6132, -0.6281, -0.3499, 0.0136, -0.1102, 0.018, -0.0181, -0.1213, 0.3973, -0.4306, -0.1213, -0.1795, 0.084, 0.0476, 0.1383, -0.1486, -0.1213, 0.2382, -0.2867, -0.1213, 0.0715, -0.1553, -0.0185, -0.0621, -0.2751, 0.0476, -0.2063, 0.0588, 0.1508, -0.1498, -0.1213, 0.2005, -0.1996, -0.1213, 0.1556, -0.1882, -0.1213, -0.0678, -0.0126, 0.2964, -0.0371, -0.2632, -0.209])

In [0]:
train_predictions_lr = lr_Pipeline_model.transform(train)
test_predictions_lr = lr_Pipeline_model.transform(validation)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictionAndLabels_train_lr = train_predictions_lr.select("prediction", 
                                                           "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
train_accuracy_lr = evaluator.evaluate(predictionAndLabels_train_lr)
print("Train set accuracy  = " + str(train_accuracy_lr))
print("Train Error = %g" % (1.0 - train_accuracy_lr))

predictionAndLabels_test_lr = test_predictions_lr.select("prediction", "label")
test_accuracy_lr = evaluator.evaluate(predictionAndLabels_test_lr)
print("Test set accuracy = " + str(test_accuracy_lr))
print("Test Error = %g" % (1.0 - test_accuracy_lr))

Train set accuracy  = 0.8016434603971696
Train Error = 0.198357
Test set accuracy = 0.8014590932777488
Test Error = 0.198541


In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [0]:
dt_Pipeline = Pipeline(stages=pre_processing_stages+[dt]) 

dt_Pipeline_model = dt_Pipeline.fit(train)

In [0]:
train_predictions_dt = dt_Pipeline_model.transform(train)
test_predictions_dt = dt_Pipeline_model.transform(validation)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictionAndLabels_train_dt = train_predictions_dt.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
train_accuracy = evaluator.evaluate(predictionAndLabels_train_dt)
print("Train set accuracy  = " + str(train_accuracy))
print("Train Error = %g" % (1.0 - train_accuracy))

predictionAndLabels_test_dt = test_predictions_dt.select("prediction", "label")
test_accuracy = evaluator.evaluate(predictionAndLabels_test_dt)
print("Test set accuracy = " + str(test_accuracy))
print("Test Error = %g" % (1.0 - test_accuracy))

Train set accuracy  = 0.7986761013467245
Train Error = 0.201324
Test set accuracy = 0.7988535695674831
Test Error = 0.201146
