1.-Basic operation with Spyspark

In [1]:
!pip install pyspark

from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
from pyspark import SparkContext




In [2]:
sc =SparkContext()
sc.addFile(url)
sqlContext = SQLContext(sc)



In [3]:
 df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

In [4]:
 df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [5]:
 df.show(5, truncate = False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

In [6]:
cols = ['x', 'age','workclass','fnlwgt','education','education_num','marital_status','occupation','relationship','race','gender',
        'capital_gain','capital_loss','hours_week','native_country','label']

df=df.toDF(*cols)

# Import all from `sql.types`
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())
# Check the dataset
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [7]:
 df.select('age','fnlwgt').show(5)

+----+--------+
| age|  fnlwgt|
+----+--------+
|25.0|226802.0|
|38.0| 89814.0|
|28.0|336951.0|
|44.0|160323.0|
|18.0|103497.0|
+----+--------+
only showing top 5 rows



In [8]:
df.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [9]:
df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+-----+
|summary|                 x|               age|  workclass|            fnlwgt|   education|     education_num|marital_status|      occupation|relationship|              race|gender|      capital_gain|     capital_loss|        hours_week|native_country|label|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+-----+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|     

In [10]:
df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655413|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



In [11]:
df.crosstab('age', 'label').sort("age_label").show()

+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|     17.0|  595|   0|
|     18.0|  862|   0|
|     19.0| 1050|   3|
|     20.0| 1112|   1|
|     21.0| 1090|   6|
|     22.0| 1161|  17|
|     23.0| 1307|  22|
|     24.0| 1162|  44|
|     25.0| 1119|  76|
|     26.0| 1068|  85|
|     27.0| 1117| 115|
|     28.0| 1101| 179|
|     29.0| 1025| 198|
|     30.0| 1031| 247|
|     31.0| 1050| 275|
|     32.0|  957| 296|
|     33.0| 1045| 290|
|     34.0|  949| 354|
|     35.0|  997| 340|
|     36.0|  948| 400|
+---------+-----+----+
only showing top 20 rows



In [12]:
df.drop('education_num').columns

['x',
 'age',
 'workclass',
 'fnlwgt',
 'education',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

In [13]:
df.filter(df.age > 40).count()

20211

In [14]:
df.groupby('marital_status').agg({'capital_gain': 'mean'}).show()

+--------------------+------------------+
|      marital_status| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



2.-Data preprocessing

In [15]:
from pyspark.sql.functions import *
# 1 Select the column
age_square = df.select(col("age")**2)
# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [16]:
COLUMNS = ['x','age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital_status','occupation', 'relationship','race', 'gender', 'capital_gain', 'capital_loss', 'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()

Row(x=1, age=25.0, age_square=625.0, workclass='Private', fnlwgt=226802.0, education='11th', education_num=7.0, marital_status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital_gain=0.0, capital_loss=0.0, hours_week=40.0, native_country='United-States', label='<=50K')

In [17]:
df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|                Laos|                   23|
|          Yugoslavia|                   23|
|Outlying-US(Guam-...|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [18]:
df_remove = df.filter(df.native_country!='Holand-Netherlands')

In [19]:
df_remove.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|                Laos|                   23|
|          Yugoslavia|                   23|
|Outlying-US(Guam-...|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|            Portugal|                   67|
|         

3.- Build a data processing pipeline

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [21]:
CATE_FEATURES = ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'gender', 'native_country']

stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [22]:
 # Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

In [23]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [24]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [25]:
 # Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [26]:
 model.take(1)

[Row(x=1, age=25.0, age_square=625.0, workclass='Private', fnlwgt=226802.0, education='11th', education_num=7.0, marital_status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital_gain=0.0, capital_loss=0.0, hours_week=40.0, native_country='United-States', label='<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital_statusIndex=1.0, marital_statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 226802.

4.-Build the classifier:logistic

In [27]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [28]:
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

In [29]:
 df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



In [30]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [31]:
train_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       29701|
|  1.0|        9345|
+-----+------------+



In [32]:
test_data.groupby('label').agg({'label':'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7453|
|  1.0|        2342|
+-----+------------+



In [33]:
from pyspark.ml.classification import LogisticRegression
# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)
# Fit the data to the model
linearModel = lr.fit(train_data)

In [34]:
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.01214505924991046,-0.06823860934764871,0.02590973176806412,-0.15292515783828683,-0.044840458857517224,0.27970562888971845,0.24647580835764304,-0.4560424712738613,-0.1365223031970391,-0.03315220059099741,0.22625745712162823,0.3701547467909734,0.0037020958989733704,-0.22829460750030028,0.019811878548210746,-0.23459262516102183,-0.2607630592044288,0.5250358407234751,-0.23543784877427493,-0.19113553874878203,0.5107503253509824,-0.22817086177572463,-0.2610155931858554,0.411606905887486,-0.31119334767374346,-0.18014793011827138,-0.18777770284996675,-0.15923275299794762,-0.15174412850202196,0.24989942019871186,-0.03146648002530881,0.33886264346132794,-0.09526103834724899,0.06129512879112693,-0.25564705012497235,-0.16869242642823615,-0.15344218378465108,-0.06958149377296703,-0.2180335954061276,-0.2754137819908856,0.10549640739397056,0.11078385575328129,-0.22011813710286274,0.3434580549808888,-0.19388305268234202,-0.27958466874037746,-0.2138644021840045,0.4479481504436871,0.08

5.-Train and evaluate the model

In [35]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [36]:
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [37]:
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.94897211752093...|
|  0.0|       0.0|[0.96181340180899...|
|  0.0|       0.0|[0.81544680161191...|
|  0.0|       0.0|[0.93838588013706...|
|  0.0|       0.0|[0.55062963693331...|
|  0.0|       1.0|[0.28223281796724...|
|  0.0|       1.0|[0.34214769708732...|
|  0.0|       0.0|[0.92861466273695...|
|  0.0|       1.0|[0.41427127276005...|
|  0.0|       1.0|[0.31821655911344...|
|  0.0|       0.0|[0.91805481465498...|
|  0.0|       0.0|[0.87671197102811...|
|  0.0|       0.0|[0.86521764677453...|
|  0.0|       0.0|[0.94658924875362...|
|  0.0|       0.0|[0.67979060749181...|
|  0.0|       0.0|[0.78793006516059...|
|  0.0|       0.0|[0.85662515535784...|
|  0.0|       0.0|[0.84317330097427...|
|  0.0|       0.0|[0.84126323762299...|
|  0.0|       0.0|[0.86178242748983...|
+-----+----------+--------------------+
only showing top 20 rows



In [38]:
#Evaluate model
cm = predictions.select("label", "prediction")

In [39]:
cm.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7453|
|  1.0|        2342|
+-----+------------+



In [40]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             8768|
|       1.0|             1027|
+----------+-----------------+



In [41]:
cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8265441551812149

In [42]:
def accuracy_m(model):
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100))
accuracy_m(model = linearModel)

Model accuracy: 82.654%


In [46]:
### Use ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8879099229638652
areaUnderROC


In [47]:
print(evaluator.evaluate(predictions))

0.8879123864518226


6.-Tune the hyperparameter

In [43]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation

In [50]:
paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.5]).build())

In [51]:
from time import *
start_time = time()
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 658.110 seconds


In [52]:
accuracy_m(model = cvModel)

Model accuracy: 84.941%


In [53]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_0af33b721060', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_0af33b721060', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_0af33b721060', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_0af33b721060', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_0af33b721060', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_0af33b721060', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_0af33b721060', name='maxBlockSizeInMB', doc='maximum memory in MB for s