# Using Spark to build Logistic Regression model

Purpose: Predict income classification (below of above 50K)

Data and Code Source: 
https://www.guru99.com/pyspark-tutorial.html

In [86]:
import pyspark
from pyspark import SparkContext

sc = SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-1-c3c95c6330c7>:4 

In [None]:
nums = sc.parallelize([1,2,3,4])

In [None]:
nums.take(1)

In [None]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

In [None]:
sqlContext = SQLContext(sc)

In [None]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)

In [None]:
rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [None]:
list_p = [('John',19), ('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

In [None]:
DF_ppl.printSchema()

# Logit 

In [87]:
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
sc.addFile(url)
sqlCont = SQLContext(sc)

In [4]:
df = sqlCont.read.csv(SparkFiles.get('adult_data.csv'), header=True, inferSchema=True)

In [5]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [None]:
df.show(5, truncate=False)

In [None]:
df.select('age','race').show(5)

In [None]:
df.groupBy('education').count().sort('count', ascending=True).show()

In [None]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'educational_num', 'capital_loss', 'hours-per-week']
# Convert the type
df_string = convertColumn(df, CONTI_FEATURES, FloatType())

df_string.printSchema()

In [7]:
df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

In [8]:
df.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|                 0|
|    max|             99999|
+-------+------------------+



In [9]:
df.crosstab('age', 'income').sort('age_income').show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|        17|  595|   0|
|        18|  862|   0|
|        19| 1050|   3|
|        20| 1112|   1|
|        21| 1090|   6|
|        22| 1161|  17|
|        23| 1307|  22|
|        24| 1162|  44|
|        25| 1119|  76|
|        26| 1068|  85|
|        27| 1117| 115|
|        28| 1101| 179|
|        29| 1025| 198|
|        30| 1031| 247|
|        31| 1050| 275|
|        32|  957| 296|
|        33| 1045| 290|
|        34|  949| 354|
|        35|  997| 340|
|        36|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [11]:
df.filter(df.age>20).count()

45219

In [13]:
df.groupby('marital-status').agg({'capital-gain':'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



In [15]:
from pyspark.sql.functions import *
age_square = df.select(col('age')**2)

In [16]:
df = df.withColumn('age_square', col('age')**2)

In [17]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



In [20]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt',
             'education',
             'educational-num',
             'marital-status',
             'occupation',
             'relationship',
             'race',
             'gender',
             'capital-gain',
             'capital-loss',
             'hours-per-week',
             'native-country',
             'income']
df = df.select(COLUMNS)
df.first()

Row(age=25, age_square=625.0, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K')

In [21]:
df.groupby('native-country').agg({'native-country':'count'}).sort(asc('count(native-country)')).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|Outlying-US(Guam-...|                   23|
|                Laos|                   23|
|          Yugoslavia|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [55]:
from functools import reduce

oldColumns = df.columns
newColumns = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
              'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
              'hours_week', 'native_country', 'label']

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(newColumns)), df)
df.printSchema()
#df.show()

root
 |-- age: integer (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [56]:
df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

In [57]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [59]:
stringIndexer = StringIndexer(inputCol = 'workclass', outputCol = 'workclass_encoded')
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol='workclass_encoded', outputCol = 'workclass_vec')
encoded = encoder.transform(indexed)
encoded.show(2)

+---+----------+---------+------+---------+-------------+------------------+-----------------+------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|workclass|fnlwgt|education|education_num|           marital|       occupation|relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+---------+------+---------+-------------+------------------+-----------------+------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 25|     625.0|  Private|226802|     11th|            7|     Never-married|Machine-op-inspct|   Own-child|Black|Male|           0|           0|        40| United-States|<=50K|              0.0|(9,[0],[1.0])|
| 38|    1444.0|  Private| 89814|  HS-grad|            9|Married-civ-spouse|  Farming-fishing|     Husband|White|Male|           0|           0|        50| United-S

In [77]:
df_remove.printSchema()

root
 |-- age: integer (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [67]:
encoded.select('workclass', 'workclass_encoded', 'workclass_vec').show(50)

+----------------+-----------------+-------------+
|       workclass|workclass_encoded|workclass_vec|
+----------------+-----------------+-------------+
|         Private|              0.0|(9,[0],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|       Local-gov|              2.0|(9,[2],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|               ?|              3.0|(9,[3],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|               ?|              3.0|(9,[3],[1.0])|
|Self-emp-not-inc|              1.0|(9,[1],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|     Federal-gov|              6.0|(9,[6],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|               ?|              3.0|(9,[3],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|         Private|              0.0|(9,[0],[1.0])|
|       State-gov|             

In [None]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [79]:
df_remove.drop('education_num').columns

['age',
 'age_square',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

In [80]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator

# encode all categorical features
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
CONT_FEATURES  = ['age', 'fnlwgt','capital_gain', 'capital_loss', 'hours_week']
stages = []
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                    outputCols=[categoricalCol+'classVec'])
    stages += [stringIndexer, encoder]

# convert label into label indices using StringIndexer
label_stringIdx = StringIndexer(inputCol='label', outputCol='newlabel')
stages += [label_stringIdx]

assemblerInputs = [c + 'classVec' for c in CATE_FEATURES] + CONT_FEATURES
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages += [assembler]

# create a Pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [81]:
model.take(1)

[Row(age=25, age_square=625.0, workclass='Private', fnlwgt=226802, education='11th', education_num=7, marital='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', sex='Male', capital_gain=0, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(98, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 226802.0, 97: 40.0}))]

In [82]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x['newlabel'], DenseVector(x['features'])))

In [89]:
df_train = sqlCont.createDataFrame(input_data, ['label', 'features'])

In [90]:
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



In [91]:
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [92]:
train_data.groupby('label').agg({'label':'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       29675|
|  1.0|        9300|
+-----+------------+



In [93]:
from pyspark.ml.classification import LogisticRegression

In [94]:
lr = LogisticRegression(labelCol='label',
                       featuresCol='features',
                       maxIter=10,
                       regParam=0.3)
linearModel = lr.fit(train_data)

print('Coefficients: ' + str(linearModel.coefficients))
print('Intercept: ' + str(linearModel.intercept))

Coefficients: [-0.08291539859596332,-0.13128066168454205,-0.04252824549956917,-0.17214090867403273,-0.11991371324225043,0.18083586244548436,0.18694646950356447,-0.2290196886156488,-0.2046951680603273,-0.07381112960044971,0.24417859780038137,0.4251011428795726,-0.005025027787783692,-0.32244065352932066,-0.002005553788510388,-0.35369017203669645,-0.45656053992458595,0.5837186488197419,-0.39515581561627877,-0.22052505393370983,0.6551297756686417,-0.3845787134116139,-0.43651855202718437,0.3399871308683097,-0.35317733907262716,-0.21954520883308581,-0.21604462791611415,-0.15408684324137564,-0.13290278710374342,0.20353935103663778,-0.06675847079764424,0.30272896939230737,-0.11550020822836823,0.03597464698587911,-0.2930436336857631,-0.21310883386418822,-0.17321026318855953,-0.1287492129276133,-0.28548918359835057,-0.31436273662006436,0.10885928474274373,0.10377900382844367,-0.2711712213034461,0.28911931666613055,-0.2116341641683871,-0.30307754144148485,-0.24975019508815438,0.38072418546682296,

In [95]:
prediction = linearModel.transform(test_data)

In [96]:
prediction.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [97]:
selected = prediction.select('label', 'prediction', 'probability')
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.90738310397271...|
|  0.0|       0.0|[0.92210643707431...|
|  0.0|       0.0|[0.91653112086434...|
|  0.0|       0.0|[0.92165294477832...|
|  0.0|       0.0|[0.63014613036744...|
|  0.0|       0.0|[0.64568980001892...|
|  0.0|       0.0|[0.65019152944581...|
|  0.0|       0.0|[0.73140740027626...|
|  0.0|       0.0|[0.91872221474052...|
|  0.0|       0.0|[0.83200245622189...|
|  0.0|       0.0|[0.83100080914289...|
|  0.0|       0.0|[0.83186483418696...|
|  0.0|       0.0|[0.54057652000698...|
|  0.0|       0.0|[0.64758660638965...|
|  0.0|       0.0|[0.65490647855304...|
|  0.0|       0.0|[0.57995884653068...|
|  0.0|       0.0|[0.86698614738953...|
|  0.0|       0.0|[0.79980152459084...|
|  0.0|       0.0|[0.84109588766540...|
|  0.0|       0.0|[0.51846565288158...|
+-----+----------+--------------------+
only showing top 20 rows



In [98]:
cm = prediction.select('label', 'prediction')

In [99]:
cm.groupby('label').agg({'label':'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7479|
|  1.0|        2387|
+-----+------------+



In [100]:
def accuracy_m(model):
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print('Model accuracy: {:.3f%}' + acc*100)

In [101]:
acc = cm.filter(cm.label == cm.prediction).count() / cm.count()

TypeError: can only concatenate str (not "float") to str

In [105]:
print('Model accuracy: {:.3f}%'.format(acc*100))

Model accuracy: 81.928%


In [109]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [111]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
print(evaluator.evaluate(prediction))
print(evaluator.getMetricName())

0.8918109654105872
areaUnderROC


In [106]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [107]:
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.5]).build())

In [112]:
from time import * 
t0 = time()

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds = 5)
cvModel = cv.fit(train_data)

t1 = time()
elapse = t1-t0
print('time to train mode: {:.3f}'.format(elapse))

time to train mode: 1187.530


In [113]:
prediction1 = cvModel.transform(test_data)
cm1 = prediction1.select('label', 'prediction')
acc1 = cm1.filter(cm1.label == cm1.prediction).count() / cm1.count()

In [118]:
'{:.4f}%'.format(acc1*100)

'84.7659%'

In [119]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_eee1a4c06d9e', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_eee1a4c06d9e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_eee1a4c06d9e', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_eee1a4c06d9e', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_eee1a4c06d9e', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_eee1a4c06d9e', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_eee1a4c06d9e', name='maxIter', doc='maximum number of iterations (>= 0)'): 