In [1]:
import pyspark
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [3]:
nums = sc.parallelize([1,2,3,4])

In [4]:
nums.take(1)

[1]

In [5]:
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i' %(num))

1
4
9
16


In [6]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [7]:
list_p = [('John', 19), ('Smith', 29), ('Adam', 35), ('Henry', 50)]
list_p

[('John', 19), ('Smith', 29), ('Adam', 35), ('Henry', 50)]

In [9]:
rdd = sc.parallelize(list_p)

In [12]:
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [13]:
DF_ppl = sqlContext.createDataFrame(ppl)

In [14]:
DF_ppl.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [15]:
import pandas as pd
url = "https://raw.githubusercontent.com/sadhana1002/PredictingSalaryClass-Classification/master/adult.csv"
df = sqlContext.createDataFrame(pd.read_csv(url, 
                                      names=['Age','workclass',
                                             'fnlwgt','education',
                                             'education_num',
                                             'marital',
                                             'occupation',
                                             'relationship','race',
                                             'sex','capital_gain',
                                             'capital_loss',
                                             'hours_week',
                                             'native_country','label']))
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [16]:
df.show(5, truncate=False)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|Age|workclass        |fnlwgt|education |education_num|marital            |occupation        |relationship  |race  |sex    |capital_gain|capital_loss|hours_week|native_country|label |
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male  |2174        |0           |40        | United-States| <=50K|
|50 | Self-emp-not-inc|83311 | Bachelors|13           | Married-civ-spouse| Exec-managerial  | Husband      | White| Male  |0           |0           |13        | United-States| <=50K|
|38 | Private         |215646| HS-grad  |9            | Divorced          | Hand

In [17]:
from pyspark.sql.types import *

# write a custom function to convert the data type to dataframe columns
def convertColumn(df,names,newType):
    for name in names:
        df = df.withColumn(name,df[name].cast(newType))
    return df

# list of continuous features
CONTI_FEATURES = ['age', 'fnlwgt', 'capital_gain', 'education_num', 'capital_loss', 'hours_week']

# convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())

df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [18]:
df.select('age','fnlwgt').show(5)

+----+--------+
| age|  fnlwgt|
+----+--------+
|39.0| 77516.0|
|50.0| 83311.0|
|38.0|215646.0|
|53.0|234721.0|
|28.0|338409.0|
+----+--------+
only showing top 5 rows



In [19]:
df.groupBy('education').count().sort('count',ascending=True).show()

+-------------+-----+
|    education|count|
+-------------+-----+
|    Preschool|   51|
|      1st-4th|  168|
|      5th-6th|  333|
|    Doctorate|  413|
|         12th|  433|
|          9th|  514|
|  Prof-school|  576|
|      7th-8th|  646|
|         10th|  933|
|   Assoc-acdm| 1067|
|         11th| 1175|
|    Assoc-voc| 1382|
|      Masters| 1723|
|    Bachelors| 5355|
| Some-college| 7291|
|      HS-grad|10501|
+-------------+-----+



In [20]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+------------------+---------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|     education_num|  marital|       occupation|relationship|               race|    sex|      capital_gain|      capital_loss|        hours_week|native_country| label|
+-------+------------------+------------+------------------+-------------+------------------+---------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|             32561|    32561|            32561|       32561|              32561|  32561|             32561|             32561|             32561|         32561| 32561|
|   mean| 38.58164675532078|    

In [21]:
df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840335|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



In [22]:
df.crosstab('age','label').sort('age_label').show()

+---------+------+-----+
|age_label| <=50K| >50K|
+---------+------+-----+
|     17.0|   395|    0|
|     18.0|   550|    0|
|     19.0|   710|    2|
|     20.0|   753|    0|
|     21.0|   717|    3|
|     22.0|   752|   13|
|     23.0|   865|   12|
|     24.0|   767|   31|
|     25.0|   788|   53|
|     26.0|   722|   63|
|     27.0|   754|   81|
|     28.0|   748|  119|
|     29.0|   679|  134|
|     30.0|   690|  171|
|     31.0|   705|  183|
|     32.0|   639|  189|
|     33.0|   684|  191|
|     34.0|   643|  243|
|     35.0|   659|  217|
|     36.0|   635|  263|
+---------+------+-----+
only showing top 20 rows



In [23]:
df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

In [24]:
df.filter(df.age>40).count()

13443

In [25]:
df.groupby('marital').agg({'capital_gain':'mean'}).show()

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|             Widowed| 571.0715005035247|
| Married-spouse-a...| 653.9832535885167|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
|            Divorced| 728.4148098131893|
|       Never-married|376.58831788823363|
|           Separated| 535.5687804878049|
+--------------------+------------------+



In [26]:
# add square age
from pyspark.sql.functions import *

age_square = df.select(col('age')**2)

df = df.withColumn('age_square',col('age')**2)

df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [28]:
df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country':'count'}).sort(asc('count(native_country)')).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
| Outlying-US(Guam...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|     Trinadad&Tobago|                   19|
|            Cambodia|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|         

In [29]:
df_remove = df.filter(df.native_country != 'Holand-Netherlands')

In [45]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [61]:
CAT_FEATURES = ['workclass','education','marital','occupation','relationship','race','sex','native_country']
stages = []
for categoricalCol in CAT_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols = [categoricalCol+"classVec"])
    stages+= [stringIndexer,encoder]
label_stringIdx = StringIndexer(inputCol='label',outputCol='newlabel')
stages += [label_stringIdx]
assemblerInputs = [c+'classVec' for c in CAT_FEATURES]+CONTI_FEATURES
assembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')
stages += [assembler]

In [62]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [63]:
model.take(1)

[Row(age=39.0, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_week=40.0, native_country=' United-States', label=' <=50K', age_square=1521.0, workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(41, {0: 1.0}), newlabel=0.0, features=SparseVector(100, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 39.0, 95: 77516.0, 96: 2174.0, 9

In [64]:
from pyspark.ml.linalg import DenseVector

input_data = model.rdd.map(lambda x : (x['newlabel'], DenseVector(x['features'])))

In [66]:
df_train = sqlContext.createDataFrame(input_data,['label','features'])
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



In [68]:
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
train_data.groupby('label').agg({'label':'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19845|
|  1.0|        6300|
+-----+------------+



In [69]:
test_data.groupby('label').agg({'label':'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4875|
|  1.0|        1541|
+-----+------------+



In [72]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='label',
                       featuresCol='features',
                       maxIter=10,
                       regParam=0.3)

linearModel = lr.fit(train_data)

Exception ignored in: <function JavaWrapper.__del__ at 0x7fa20e1f24c0>
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LogisticRegression' object has no attribute '_java_obj'


In [74]:
print('Coefficients: ' + str(linearModel.coefficients))
print('Intercept: ' + str(linearModel.intercept))

Coefficients: [-0.08092508665280052,-0.12950714746811462,-0.08150398087537934,-0.16920463822399648,-0.1398426935211465,0.16929665899340499,0.19441669709066356,-0.591559598420185,-0.19258893943587022,-0.08054445107206264,0.21383633873683927,0.39835027643221305,-0.007143521372851746,-0.3051585346419393,-0.029758834375507547,-0.3527048207304103,-0.42866354323242567,0.5457909895653656,-0.39060381285827733,-0.18336682389099437,0.6450330629495779,-0.4013477332728405,-0.3829915828120792,0.3204233869817907,-0.34971346076080656,-0.2006559601971576,-0.24448344527598845,-0.14426259221652804,-0.17667670054673612,0.17399311013247104,-0.05860883325168089,0.29167331096438764,-0.11122939022514383,0.043806409459561574,-0.2938610063466789,-0.2264041078772474,-0.17069506892688802,-0.14803507157998047,-0.29673501533172314,-0.34224958329390115,0.15399777334715062,0.1499355064910023,-0.3207127382567393,0.2620836043507478,-0.19024560076661465,-0.2999374790257602,-0.25082040275722733,0.4304539998913642,-0.075

In [75]:
predictions = linearModel.transform(test_data)
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [76]:
selected = predictions.select('label','prediction','probability')
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.67264667156573...|
|  0.0|       0.0|[0.74767042053991...|
|  0.0|       0.0|[0.84128229944305...|
|  0.0|       0.0|[0.82426786691641...|
|  0.0|       0.0|[0.75612484924238...|
|  0.0|       0.0|[0.73064218526286...|
|  0.0|       0.0|[0.81338323719044...|
|  0.0|       0.0|[0.75233435774951...|
|  0.0|       0.0|[0.86637068175599...|
|  0.0|       0.0|[0.79559826017480...|
|  0.0|       0.0|[0.84893801103873...|
|  0.0|       0.0|[0.86364509835917...|
|  0.0|       0.0|[0.85151722042409...|
|  0.0|       0.0|[0.88071263315657...|
|  0.0|       0.0|[0.88299907705955...|
|  0.0|       0.0|[0.64802095182998...|
|  0.0|       1.0|[0.32452120454135...|
|  0.0|       0.0|[0.67247236341054...|
|  0.0|       0.0|[0.88865543922044...|
|  0.0|       0.0|[0.84328923204674...|
+-----+----------+--------------------+
only showing top 20 rows



In [80]:
cm = predictions.select('label','prediction')
cm.groupby('label').agg({'label':'count'}).show()

Exception ignored in: <function JavaWrapper.__del__ at 0x7fa20e1f24c0>
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 42, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LogisticRegression' object has no attribute '_java_obj'


+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4875|
|  1.0|        1541|
+-----+------------+



In [81]:
cm.groupby('prediction').agg({'prediction':'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5788|
|       1.0|              628|
+----------+-----------------+



In [82]:
cm.filter(cm.label==cm.prediction).count()/cm.count()

0.8243453865336658

In [84]:
def accuracy_m(model):
    predictions = model.transform(test_data)
    cm = predictions.select('label','prediction')
    acc = cm.filter(cm.label==cm.prediction).count()/cm.count()
    print('Model accuracy: %.3f%%' %(acc*100))
accuracy_m(model=linearModel)

Model accuracy: 82.435%


In [85]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8872402535815906
areaUnderROC


In [86]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam,[0.01, 0.5]).build())

In [87]:
from time import *

start_time = time()
cv = CrossValidator(estimator=lr,
                   estimatorParamMaps = paramGrid,
                   evaluator = evaluator, numFolds=5)

cvModel = cv.fit(train_data)

end_time = time()
elapsed_time = end_time-start_time

print('Time to train model: %.3f seconds' % elapsed_time)

Time to train model: 211.163 seconds


In [88]:
accuracy_m(model=cvModel)

Model accuracy: 84.991%


In [89]:
bestModel=cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_e79b415ec3a5', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_e79b415ec3a5', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_e79b415ec3a5', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_e79b415ec3a5', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_e79b415ec3a5', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_e79b415ec3a5', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='LogisticRegression_e79b415ec3a5', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These