In [1]:
import pyspark
#SparkContext is the internal engine that allows the connections with the clusters
from pyspark import SparkContext
sc =SparkContext()

In [2]:
#create a collection of data called RDD, Resilient Distributed Dataset
nums = sc.parallelize([1,2,3,4])

In [3]:
nums.take(1)

[1]

In [4]:
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print(num)

1
4
9
16


In [5]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)#used to initiate the functionalities of Spark SQL.


In [6]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name = x[0],age = int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)


In [7]:
DF_ppl.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



# Step 1) Basic Operation With PySpark

In [8]:
#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/thomaspernet/data_csv_r/master/data/adult.csv"
from pyspark import SparkFiles

sc.addFile(url)
sqlContext = SQLContext(sc)

# inferSchema = True

In [9]:
#If you didn't set inderShema to True, datatype are all in string
df = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema= True)

In [10]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [11]:
df.show(5,truncate = False)

+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0  

# inferschema = False

In [12]:
#inferSchema=  False, we need to convert the continuous variable in the right format, you can use recast the columns.  
df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()


root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (null

In [13]:
df.select('age','label').show(5)

+---+-----+
|age|label|
+---+-----+
| 39|<=50K|
| 50|<=50K|
| 38|<=50K|
| 53|<=50K|
| 28|<=50K|
+---+-----+
only showing top 5 rows



In [14]:
df.groupby('education').count().sort('count',ascending=True).show()


+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+



In [15]:
df.describe().show()

+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null|

In [16]:
df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+



# crosstab

In [17]:
#descriptive statistics between two pairwise columns
df.crosstab('age','label').show()


+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       69|   87|  21|
|       88|    3|   0|
|       56|  248| 118|
|       42|  510| 270|
|       24|  767|  31|
|       37|  566| 292|
|       25|  788|  53|
|       52|  286| 192|
|       20|  753|   0|
|       46|  445| 292|
|       57|  227| 131|
|       78|   18|   5|
|       29|  679| 134|
|       84|    9|   1|
|       61|  204|  96|
|       74|   39|  12|
|       60|  211| 101|
|       85|    3|   0|
|       28|  748| 119|
|       38|  545| 282|
+---------+-----+----+
only showing top 20 rows



In [18]:
df.crosstab('age','label').describe().show()

+-------+------------------+------------------+------------------+
|summary|         age_label|             <=50K|              >50K|
+-------+------------------+------------------+------------------+
|  count|                73|                73|                73|
|   mean|53.013698630136986|338.63013698630135|107.41095890410959|
| stddev|21.241006692169833| 273.9200768903077|107.02982912571076|
|    min|                17|                 1|                 0|
|    max|                90|               865|               292|
+-------+------------------+------------------+------------------+



# drop column

In [19]:
df.crosstab('age', 'label').sort("age_label").show()

+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows



In [20]:

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

# filter data

In [21]:
df.filter(df.age>40).count()

13443

# descriptive statistics by group

In [22]:
df.groupby('marital').agg({'capital_gain':'mean'}).sort('avg(capital_gain)').show()


+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|       Never-married|376.58831788823363|
|   Married-AF-spouse| 432.6521739130435|
|           Separated| 535.5687804878049|
|             Widowed| 571.0715005035247|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+



# Step 2) Data Preprocessing

In [23]:
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [24]:
#change the order of the variables
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occuI’m pation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()


Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

In [25]:
df.filter(df.native_country == 'Holand-Netherlands').count()

1

In [26]:
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|     Trinadad&Tobago|                   19|
|            Cambodia|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|         

In [27]:
df_remove = df.filter(df.native_country !='Holand-Netherlands')

# Step 3) Build a data processing pipeline

The steps to transform the data are very similar to scikit-learn. You need to:

1. Index the string to numeric
2. Create the one hot encoder
3. Transform the data

In [28]:
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)

+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|   

# Build the pipeline

build a pipeline to convert all the precise features and add them to the final dataset

1. Encode the categorical data
2. Index the label feature
3. Add continuous variable
4. Assemble the steps.

In [29]:
#1. Encode the categorical data
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    

In [30]:
#2. Index the label feature
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

In [31]:
#3. Add continuous variable
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES


In [32]:
#4. Assemble the steps.
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [33]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [34]:
model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

# Step 4) Build the classifier: logistic

In [35]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [36]:
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])


In [37]:
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



In [38]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [39]:
train_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+



In [40]:
test_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+



# Build the logistic regressor

In [41]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [42]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.0678914665261549,-0.15342552681259095,-0.07060095364074866,-0.16405758656176883,-0.12065529852803142,0.1629223308618494,0.1491768704379407,-0.6268363626108148,-0.19348366154058616,-0.07822699808382186,0.22266720383574284,0.3995710963814983,-0.022202434180362392,-0.3119258578592073,-0.04344977886882983,-0.30600774432783917,-0.4131820968804973,0.5479375042467433,-0.39583735085365634,-0.23166535957967996,0.6187439067333165,-0.3440886145459082,-0.38526688136921355,0.3173244630055744,-0.3505188891861338,-0.20133592313795853,-0.2328785600883082,-0.1334927886502298,-0.11976054249818813,0.17500602491015527,-0.048096810111753416,0.2884842539433672,-0.11631461674482928,0.05241634780629467,-0.3009526245513954,-0.22046421474044287,-0.16557996579005532,-0.11467623193932212,-0.3119664314530722,-0.3442261192327113,0.10553012950728703,0.1522430478143234,-0.29277454549653664,0.2636283344329226,-0.19995137407645105,-0.303294225829555,-0.23108751517817402,0.41891855100024084,-0.05659301

# Step 5) Train and evaluate the model

In [43]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [44]:
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [45]:
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows



# Evaluate the model


In [46]:
cm = predictions.select("label", "prediction")


In [47]:
cm.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+



In [49]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+



In [50]:
cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

In [51]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

Model accuracy: 82.376%


# ROC metrics


In [52]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192
areaUnderROC


# Step 6) Tune the hyperparameter

In [53]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [54]:
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 539.649 seconds


In [55]:
accuracy_m(model = cvModel)

Model accuracy: 85.316%


In [56]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_e39b6a0c4fe6', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_e39b6a0c4fe6', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_e39b6a0c4fe6', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_e39b6a0c4fe6', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_e39b6a0c4fe6', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_e39b6a0c4fe6', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_e39b6a0c4fe6', name='maxIter', doc='maximum number of iterations (>= 0)'): 