In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from operator import add
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


spark = SparkSession \
    .builder \
    .appName("Logistic regression with binomial") \
    .getOrCreate()
	
#creating dataframe	
ad_data= spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("/Users/akashsoni/adult5.csv")
ad_data.createOrReplaceTempView("adult")
dataset = spark.table("adult")
cols = dataset.columns
#print cols


#categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
categoricalColumns = ["workclass"]
stages = []
for categoricalCol in categoricalColumns:
	stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
	#.fit(ad_data)
	#df_numeric = stringIndexer.transform(ad_data)
	#df_numeric.repartition(1).repartition(1).write.csv('indexer')
	#print df_numeric.select('workclass','workclassIndex').show(5)
	#In the above line for example, it takes workclass string and concatinates with the address("Index")
	encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
	#print encoder.outputCol
	stages += [stringIndexer, encoder]
#print stages


#
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "income", outputCol = "label")
stages += [label_stringIdx]
# Transform all features into a vector using VectorAssembler
#numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
numericCols = ["age","hours_per_week"]
assemblerInputs = list(map(lambda c: c + "classVec", categoricalColumns)) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
#print stages
#print assembler

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
#dataset.printSchema()
#dataset.repartition(1).write.format("json").save("c:\\spark\\ml_detail\\step1_transform_features")

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)

#dataset.printSchema()
#dataset.show(5)


# we can use print dataset
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
#trainnig data
#trainingData.repartition(1).write.format("json").save("c:\\spark\\ml_detail\\step2_training_data")
#training data
#testData.repartition(1).write.format("json").save("c:\\spark\\ml_detail\\step3_test_data")
#print trainingData.count()
#print testData.count()



# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10,family="binomial")

# Train model with Training Data
lrModel = lr.fit(trainingData)
######################### difference between multiclass/binary is coefficients ########################
print (lrModel.coefficients)

############################################################################################################

# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

#predictions.printSchema()

# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose income & occupation
selected = predictions.select("label", "prediction", "probability", "income")
#selected.printSchema()
selected.show(truncate=False)

#selected.repartition(1).write.format("json").save("logistics_data")


#binary classification

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

print (evaluator.evaluate(predictions))

evaluator.getMetricName()




[-7.19184160783,-10.220627656,0.316523376544,-0.170779945632]
+-----+----------+------------------------------------------+------+
|label|prediction|probability                               |income|
+-----+----------+------------------------------------------+------+
|1.0  |0.0       |[0.9996270840069971,3.7291599300297534E-4]| >50K |
+-----+----------+------------------------------------------+------+

1.0


'areaUnderROC'