In [165]:
import pyspark
from pyspark.ml.feature import Imputer
import pyspark.sql.functions as F
from pyspark.sql.functions import col, sum, mean, median, mode, concat_ws, col, split
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import SparseVector, DenseVector, Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
# handling categorical features
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [166]:
import numpy as np
import pandas as pd

In [167]:
pd.read_csv("cal_housing.csv")

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,
...,...,...,...,...,...,...,...,...,...,...
20635,-121.09,39.48,25.0,1665.0,374.0,845.0,330.0,1.5603,78100.0,
20636,-121.21,39.49,18.0,697.0,150.0,356.0,114.0,2.5568,77100.0,
20637,-121.22,39.43,17.0,2254.0,485.0,1007.0,433.0,1.7000,92300.0,
20638,-121.32,39.43,18.0,1860.0,409.0,741.0,349.0,1.8672,84700.0,


In [168]:
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [169]:
spark

In [170]:
df_pyspark = spark.read.csv("customer_churn.csv", header=True, inferSchema=True)

In [171]:
df_pyspark.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|  

In [172]:
indexer = StringIndexer(inputCol="gender", outputCol="gender_category")
df_r = indexer.fit(df_pyspark).transform(df_pyspark)
df_r.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+---------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|gender_category|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+---------------+
|7590-VHVEG|F

In [173]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=[
    "gender",
    "Partner", 
    "Dependents", 
    "PhoneService", 
    "MultipleLines", 
    "InternetService", 
    "OnlineSecurity", 
    "OnlineBackup", 
    "DeviceProtection", 
    "TechSupport", 
    "StreamingTV", 
    "StreamingMovies", 
    "Contract", 
    "PaperlessBilling", 
    "PaymentMethod",
    "Churn"
    ], 
    outputCols=[
        "gender_category",
        "Partner_category", 
        "Dependents_category", 
        "PhoneService_category", 
        "MultipleLines_category", 
        "InternetService_category", 
        "OnlineSecurity_category", 
        "OnlineBackup_category", 
        "DeviceProtection_category", 
        "TechSupport_category", 
        "StreamingTV_category", 
        "StreamingMovies_category",
        "Contract_category",
        "PaperlessBilling_category",
        "PaymentMethod_category",
        "Churn_category"
        ])
df_r = indexer.fit(df_pyspark).transform(df_pyspark)
df_r.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+---------------+----------------+-------------------+---------------------+----------------------+------------------------+-----------------------+---------------------+-------------------------+--------------------+--------------------+------------------------+-----------------+-------------------------+----------------------+--------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|gender_category|Partner_categor

In [174]:
df_r = df_r.select("customerID", 
                   "gender_category",
                    "Partner_category", 
                    "Dependents_category", 
                    "PhoneService_category", 
                    "MultipleLines_category", 
                    "InternetService_category", 
                    "OnlineSecurity_category", 
                    "OnlineBackup_category", 
                    "DeviceProtection_category", 
                    "TechSupport_category", 
                    "StreamingTV_category", 
                    "StreamingMovies_category",
                    "Contract_category",
                    "PaperlessBilling_category",
                    "PaymentMethod_category",
                    "MonthlyCharges",
                    "TotalCharges",
                    "churn_category")
df_r.show()

+----------+---------------+----------------+-------------------+---------------------+----------------------+------------------------+-----------------------+---------------------+-------------------------+--------------------+--------------------+------------------------+-----------------+-------------------------+----------------------+--------------+------------+--------------+
|customerID|gender_category|Partner_category|Dependents_category|PhoneService_category|MultipleLines_category|InternetService_category|OnlineSecurity_category|OnlineBackup_category|DeviceProtection_category|TechSupport_category|StreamingTV_category|StreamingMovies_category|Contract_category|PaperlessBilling_category|PaymentMethod_category|MonthlyCharges|TotalCharges|churn_category|
+----------+---------------+----------------+-------------------+---------------------+----------------------+------------------------+-----------------------+---------------------+-------------------------+--------------------+--

In [175]:
# Get the list of column names
columns = df_r.columns

# Convert all columns to DoubleType
df_r = df_r.select([col(column).cast(DoubleType()).alias(column) for column in columns])

# Show the modified DataFrame
df_r.show(truncate=False)

# df_r['TotalCharges'] = df_r.select('TotalCharges').cast(DoubleType())

+----------+---------------+----------------+-------------------+---------------------+----------------------+------------------------+-----------------------+---------------------+-------------------------+--------------------+--------------------+------------------------+-----------------+-------------------------+----------------------+--------------+------------+--------------+
|customerID|gender_category|Partner_category|Dependents_category|PhoneService_category|MultipleLines_category|InternetService_category|OnlineSecurity_category|OnlineBackup_category|DeviceProtection_category|TechSupport_category|StreamingTV_category|StreamingMovies_category|Contract_category|PaperlessBilling_category|PaymentMethod_category|MonthlyCharges|TotalCharges|churn_category|
+----------+---------------+----------------+-------------------+---------------------+----------------------+------------------------+-----------------------+---------------------+-------------------------+--------------------+--

In [176]:
# Check and convert column data types if necessary
df_r = df_r.withColumn("MonthlyCharges", df_r["MonthlyCharges"].cast("double"))
df_r = df_r.withColumn("TotalCharges", df_r["TotalCharges"].cast("double"))

# Handle missing or null values if needed
df_r = df_r.na.drop(subset=["MonthlyCharges", "TotalCharges"])  # Remove rows with missing values

# One-Hot Encoding if required
# You can use the OneHotEncoder or StringIndexer to encode categorical variables

# Create the VectorAssembler
feature_assembler = VectorAssembler(
    inputCols=["gender_category", "Partner_category", "Dependents_category", "PhoneService_category",
               "MultipleLines_category", "InternetService_category", "OnlineSecurity_category",
               "OnlineBackup_category", "DeviceProtection_category", "TechSupport_category",
               "StreamingTV_category", "StreamingMovies_category", "Contract_category",
               "PaperlessBilling_category", "PaymentMethod_category", "MonthlyCharges", "TotalCharges"],
    outputCol="independent_features"
)

# Transform the DataFrame
output = feature_assembler.transform(df_r)

In [177]:
output.select("independent_features").show()

+--------------------+
|independent_features|
+--------------------+
|(17,[0,1,3,4,5,7,...|
|(17,[5,6,8,12,13,...|
|(17,[5,6,7,14,15,...|
|[0.0,0.0,0.0,1.0,...|
|(17,[0,15,16],[1....|
|(17,[0,4,8,10,11,...|
|(17,[2,4,7,10,14,...|
|(17,[0,3,4,5,6,13...|
|(17,[0,1,4,8,9,10...|
|(17,[2,5,6,7,12,1...|
|(17,[1,2,5,6,14,1...|
|[0.0,0.0,0.0,0.0,...|
|(17,[1,4,8,10,11,...|
|(17,[4,7,8,10,11,...|
|(17,[6,8,9,10,11,...|
|[1.0,1.0,1.0,0.0,...|
|[1.0,0.0,0.0,0.0,...|
|[0.0,0.0,1.0,0.0,...|
|(17,[0,1,2,5,8,9,...|
|(17,[0,7,8,11,15,...|
+--------------------+
only showing top 20 rows



In [178]:
output.show()

+----------+---------------+----------------+-------------------+---------------------+----------------------+------------------------+-----------------------+---------------------+-------------------------+--------------------+--------------------+------------------------+-----------------+-------------------------+----------------------+--------------+------------+--------------+--------------------+
|customerID|gender_category|Partner_category|Dependents_category|PhoneService_category|MultipleLines_category|InternetService_category|OnlineSecurity_category|OnlineBackup_category|DeviceProtection_category|TechSupport_category|StreamingTV_category|StreamingMovies_category|Contract_category|PaperlessBilling_category|PaymentMethod_category|MonthlyCharges|TotalCharges|churn_category|independent_features|
+----------+---------------+----------------+-------------------+---------------------+----------------------+------------------------+-----------------------+---------------------+-------

In [179]:
finalize_data = output.select("independent_features", "Churn_category")
finalize_data.show()

+--------------------+--------------+
|independent_features|Churn_category|
+--------------------+--------------+
|(17,[0,1,3,4,5,7,...|           0.0|
|(17,[5,6,8,12,13,...|           0.0|
|(17,[5,6,7,14,15,...|           1.0|
|[0.0,0.0,0.0,1.0,...|           0.0|
|(17,[0,15,16],[1....|           1.0|
|(17,[0,4,8,10,11,...|           1.0|
|(17,[2,4,7,10,14,...|           0.0|
|(17,[0,3,4,5,6,13...|           0.0|
|(17,[0,1,4,8,9,10...|           1.0|
|(17,[2,5,6,7,12,1...|           0.0|
|(17,[1,2,5,6,14,1...|           0.0|
|[0.0,0.0,0.0,0.0,...|           0.0|
|(17,[1,4,8,10,11,...|           0.0|
|(17,[4,7,8,10,11,...|           1.0|
|(17,[6,8,9,10,11,...|           0.0|
|[1.0,1.0,1.0,0.0,...|           0.0|
|[1.0,0.0,0.0,0.0,...|           0.0|
|[0.0,0.0,1.0,0.0,...|           0.0|
|(17,[0,1,2,5,8,9,...|           1.0|
|(17,[0,7,8,11,15,...|           0.0|
+--------------------+--------------+
only showing top 20 rows



In [180]:
train_data, test_data = finalize_data.randomSplit([0.75, 0.25])

In [181]:
regressor = LinearRegression(featuresCol="independent_features", labelCol="Churn_category", regParam=0.1)
regressor

LinearRegression_9523e55caca1

In [182]:
regressor = regressor.fit(train_data)

23/06/18 19:28:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/06/18 19:28:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/06/18 19:28:10 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [183]:
regressor.coefficients

DenseVector([0.0033, -0.0175, -0.0345, -0.0479, 0.0199, -0.0857, -0.0543, -0.0161, -0.0104, -0.0428, 0.0328, 0.032, -0.0737, -0.0493, -0.033, 0.0007, -0.0])

In [184]:
regressor.intercept

0.540723805083816

In [185]:
# prediction
pred_result = regressor.evaluate(test_data)

In [186]:
pred_result.predictions.show()

+--------------------+--------------+-------------------+
|independent_features|Churn_category|         prediction|
+--------------------+--------------+-------------------+
|(17,[0,1,2,3,4,5,...|           0.0|0.33323665879356235|
|(17,[0,1,2,3,4,5,...|           0.0| 0.3645384764326631|
|(17,[0,1,2,4,5,7,...|           0.0|0.25404660052313366|
|(17,[0,1,2,4,5,8,...|           0.0|0.17731622248700873|
|(17,[0,1,2,4,5,13...|           0.0|0.31943035962846905|
|(17,[0,1,2,4,5,14...|           0.0| 0.3512325620918417|
|(17,[0,1,2,4,6,9,...|           0.0| 0.1818259036294282|
|(17,[0,1,2,4,7,8,...|           1.0| 0.3039724521137345|
|(17,[0,1,2,4,7,8,...|           0.0| 0.1791509653303075|
|(17,[0,1,2,4,7,8,...|           0.0| 0.1259197503631846|
|(17,[0,1,2,4,7,8,...|           1.0| 0.5074581793491141|
|(17,[0,1,2,4,7,10...|           0.0| 0.4209401914407275|
|(17,[0,1,2,4,7,10...|           0.0|0.39133629235364226|
|(17,[0,1,2,4,7,10...|           0.0| 0.4489892001339491|
|(17,[0,1,2,4,