In [1]:
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

sqlContext = SQLContext(sc)

In [4]:
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [5]:
df.groupby('education').count().sort('count',ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [6]:
df.filter(df.age>40).count()

20211

In [7]:
df.groupby('marital-status').agg({'capital-gain': 'sum'}).show()

+--------------------+-----------------+
|      marital-status|sum(capital-gain)|
+--------------------+-----------------+
|           Separated|           890219|
|       Never-married|          6195095|
|Married-spouse-ab...|           395015|
|            Divorced|          5264450|
|             Widowed|           916332|
|   Married-AF-spouse|           109950|
|  Married-civ-spouse|         38932760|
+--------------------+-----------------+



In [8]:
df = df.withColumn("age_square", col("age")**2)

In [9]:
df.groupby('native-country').agg({'native-country': 'count'}).sort(asc("count(native-country)")).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|Outlying-US(Guam-...|                   23|
|          Yugoslavia|                   23|
|                Laos|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [10]:
df = df.filter(df['native-country'] !='Holand-Netherlands')

In [11]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded[['workclass_encoded','workclass_vec']].show(10)

+-----------------+-------------+
|workclass_encoded|workclass_vec|
+-----------------+-------------+
|              0.0|(9,[0],[1.0])|
|              0.0|(9,[0],[1.0])|
|              2.0|(9,[2],[1.0])|
|              0.0|(9,[0],[1.0])|
|              3.0|(9,[3],[1.0])|
|              0.0|(9,[0],[1.0])|
|              3.0|(9,[3],[1.0])|
|              1.0|(9,[1],[1.0])|
|              0.0|(9,[0],[1.0])|
|              0.0|(9,[0],[1.0])|
+-----------------+-------------+
only showing top 10 rows



In [43]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [44]:
# Convert label into label indices using the StringIndexer
# label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
# stages += [label_stringIdx]

In [45]:
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'capital-loss', 'hours-per-week']
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [46]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [47]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
model = pipelineModel.transform(df)

In [56]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["age"], DenseVector(x["features"])))

In [57]:
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

In [59]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [60]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [64]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficientMatrix))
print("Intercept: " + str(linearModel.interceptVector))

Coefficients: DenseMatrix([[-3.36257343e-04, -1.19392432e-04, -1.13536880e-04, ...,
              -2.19076373e-09, -5.70804746e-08, -2.70520816e-05],
             [-3.36257343e-04, -1.19392432e-04, -1.13536880e-04, ...,
              -2.19076373e-09, -5.70804746e-08, -2.70520816e-05],
             [-3.36257343e-04, -1.19392432e-04, -1.13536880e-04, ...,
              -2.19076373e-09, -5.70804746e-08, -2.70520816e-05],
             ...,
             [-1.53513259e-03,  5.03380085e-03, -6.88393521e-04, ...,
              -3.61400848e-10, -3.46254471e-07, -5.21171278e-05],
             [-5.14673589e-04, -3.59901036e-04, -3.41789670e-04, ...,
              -6.60308109e-09, -1.71983183e-07, -4.08102124e-05],
             [-1.57819275e-03, -1.27281197e-03,  3.49861650e-03, ...,
               1.01872719e-07,  3.87923655e-06, -1.99779135e-04]])
Intercept: [-4.478820004113719,-4.478820004113719,-4.478820004113719,-4.478820004113719,-4.478820004113719,-4.478820004113719,-4.478820004113719,-4.478

In [62]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|   17|      20.0|[2.41103232471095...|
|   17|      18.0|[2.37807091928234...|
|   17|      18.0|[2.37795406494152...|
|   17|      18.0|[2.38455178890594...|
|   17|      19.0|[2.41614259994857...|
|   17|      19.0|[2.41821986206058...|
|   17|      19.0|[2.40694232223421...|
|   17|      19.0|[2.40609218138815...|
|   17|      20.0|[2.42326031107704...|
|   17|      20.0|[2.42023086462885...|
|   17|      19.0|[2.43277010000363...|
|   17|      18.0|[2.38398015933459...|
|   17|      18.0|[2.46521409456441...|
|   17|      18.0|[2.37181770281334...|
|   17|      23.0|[2.42398772111925...|
|   17|      18.0|[2.38040155270063...|
|   17|      18.0|[2.38299942877415...|
|   17|      18.0|[2.39298827085825...|
|   17|      18.0|[2.40294216352775...|
|   17|      18.0|[2.31816811960394...|
+-----+----------+--------------------+
only showing top 20 rows



In [65]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

Model accuracy: 5.747%


In [66]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

1.0
areaUnderROC


In [67]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [68]:
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 803.533 seconds


In [69]:
accuracy_m(model = cvModel)

Model accuracy: 7.642%


In [70]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_dba256522b9c', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_dba256522b9c', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_dba256522b9c', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_dba256522b9c', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_dba256522b9c', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_dba256522b9c', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_dba256522b9c', name='maxIter', doc='maximum number of iterations (>= 0)'): 