In [1]:
spark

In [2]:
%fs ls /FileStore/tables/breast_cancer_wisconsin_data-e1a95.csv

In [3]:
%fs head /FileStore/tables/breast_cancer_wisconsin_data-e1a95.csv

In [4]:
%sql DROP TABLE IF EXISTS cancer

In [5]:
%sql
CREATE TABLE cancer(
  id INT,
  diagnosis STRING,
  radius_mean FLOAT,
  texture_mean FLOAT,
  perimeter_mean FLOAT,
  area_mean FLOAT,
  smoothness_mean FLOAT,
  compactness_mean FLOAT,
  concavity_mean FLOAT,
  concave_points_mean FLOAT,
  symmetry_mean FLOAT,
  fractal_dimension_mean FLOAT,
  radius_se FLOAT,
  texture_se FLOAT,
  perimeter_se FLOAT,
  area_se FLOAT,
  smoothness_se FLOAT,
  compactness_se FLOAT, 
  concavity_se FLOAT,
  concave_points_se FLOAT,
  symmetry_se FLOAT,
  fractal_dimension_se FLOAT,
  radius_worst FLOAT,
  texture_worst FLOAT,
  perimeter_worst FLOAT,
  area_worst FLOAT,
  smoothness_worst FLOAT,
  compactness_worst FLOAT,
  concavity_worst FLOAT,
  concave_points_worst FLOAT,
  symmetry_worst FLOAT,
  fractal_dimension_worst FLOAT)
USING com.databricks.spark.csv
OPTIONS (path "/FileStore/tables/breast_cancer_wisconsin_data-e1a95.csv", header "true")

In [6]:
dataset=spark.table("cancer")
cols = dataset.columns

In [7]:
display(dataset)

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
categoricalColumn = ["diagnosis"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumn:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [9]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "diagnosis", outputCol = "label")
stages += [label_stringIdx]

In [10]:
# Transform all features into a vector using VectorAssembler
numericCols = ["radius_mean","texture_mean","perimeter_mean","area_mean","smoothness_mean","compactness_mean","concavity_mean","concave_points_mean","symmetry_mean","fractal_dimension_mean"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumn) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

In [11]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

In [12]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData,testData)=dataset.randomSplit([0.7,0.3],seed=100)
print trainingData.count()
print testData.count()

### Logistic Regression

In [14]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [15]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [16]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [17]:
predictions.printSchema()

In [18]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "probability", "diagnosis")
display(selected)

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [20]:
evaluator.getMetricName()

In [21]:
print lr.explainParams()

In [22]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [23]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [24]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [25]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [26]:
print 'Model Intercept: ', cvModel.bestModel.interceptVector

In [27]:
#weights = cvModel.bestModel.weights
# on Spark 2.X weights are available as ceofficients
weights = cvModel.bestModel.coefficientMatrix
weights = map(lambda w: (float(w),), weights)  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

In [28]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "diagnosis")
display(selected)

### Decision Tree

In [30]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [31]:
print "numNodes = ", dtModel.numNodes
print "depth = ", dtModel.depth

In [32]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [33]:
predictions.printSchema()

In [34]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "diagnosis")
display(selected)

In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [36]:
dt.getImpurity()

In [37]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2,6,10])
             .addGrid(dt.maxBins, [20,40,80])
             .build())

In [38]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

In [39]:
print "numNodes = ", cvModel.bestModel.numNodes
print "depth = ", cvModel.bestModel.depth

In [40]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [41]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [42]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "diagnosis")
display(selected)