In [1]:
spark.conf.set(
  "fs.azure.account.key.zbatchaistorage.blob.core.windows.net",
  "your_storage_key")

In [2]:
### mounting blob to databrick only once and it will be there forever 
dbutils.fs.mount(source = "wasbs://mntdatabrick@zbatchaistorage.blob.core.windows.net/faces/",mount_point = "/mnt/root/",extra_configs = {"fs.azure.account.key.zbatchaistorage.blob.core.windows.net": "your_storage_access_key"})

In [3]:
# read in csv file into spark:https://blog.arinti.be/databricks-importing-data-from-a-blob-storage-2b8dc700d029
df = spark.read.format("csv").option("delimiter",';').option("header","true").load("wasbs://mntdatabrick@zbatchaistorage.blob.core.windows.net/BreastCancerData.csv", inferSchema = True)
df.show(3)

In [4]:
data=df.select(["age", "menopause", "tumor-size", "inv-nodes","node-caps","deg-malig","breast","breast-quad","irradiat","Class"])

train, test = data.randomSplit([0.75, 0.25], seed=123)
train.toPandas()

In [5]:
from mmlspark import TuneHyperparameters
from mmlspark.TrainClassifier import TrainClassifier
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
logReg = LogisticRegression()
randForest = RandomForestClassifier()
gbt = GBTClassifier()
smlmodels = [logReg, randForest, gbt]
mmlmodels = [TrainClassifier(model=model, labelCol="Class") for model in smlmodels]

In [6]:
from mmlspark import HyperparamBuilder
from mmlspark import RangeHyperParam
from mmlspark import DiscreteHyperParam
from mmlspark import RandomSpace
paramBuilder = \
  HyperparamBuilder() \
    .addHyperparam(logReg, logReg.regParam, RangeHyperParam(0.1, 0.3, isDouble=True)) \
    .addHyperparam(randForest, randForest.numTrees, DiscreteHyperParam([5,10])) \
    .addHyperparam(randForest, randForest.maxDepth, DiscreteHyperParam([3,5])) \
    .addHyperparam(gbt, gbt.maxBins, RangeHyperParam(8,16)) \
    .addHyperparam(gbt, gbt.maxDepth, DiscreteHyperParam([3,5]))
randomSpace = RandomSpace(paramBuilder.build())

In [7]:
bestModel = TuneHyperparameters(
              evaluationMetric="accuracy", models=mmlmodels, numFolds=2,
              numRuns=len(mmlmodels) * 2, parallelism=1,
              paramSpace=randomSpace.space(), seed=0).fit(train)

In [8]:
from mmlspark import ComputeModelStatistics
prediction = bestModel.transform(test)
metrics = ComputeModelStatistics().transform(prediction)
m=metrics.toPandas()
m

In [9]:
### operationalize the model for an api call
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
import uuid

serving_inputs = spark.readStream.server() \
    .address("localhost", 8898, "my_api") \
    .load()\
    .withColumn("variables", from_json(col("value"), test.schema))\
    .select("id","variables.*")

serving_outputs = bestModel.transform(serving_inputs) \
  .withColumn("scored_labels", col("scored_labels").cast("string"))

server = serving_outputs.writeStream \
    .server() \
    .option("name", "my_api") \
    .queryName("my_query") \
    .option("replyCol", "scored_labels") \
    .option("checkpointLocation", "checkpoints-{}".format(uuid.uuid1())) \
    .start()

In [10]:
df.show(1)


In [11]:
serving_inputs

In [12]:
import requests
dat = u'{"age":5,"menopause":1,"tumor-size":1,"inv-nodes":1,"node-caps":2,"deg-malig":1,"breast":3,"breast-quad":1,"irradiat":1,"Class":0}'
r = requests.post(data=dat, url="http://localhost:8898/my_api")
print("Response {}".format(r.text))

In [13]:
import mmlspark
import numpy as np
from mmlspark import toNDArray

femaleimageDir = "wasbs://mntdatabrick@zbatchaistorage.blob.core.windows.net/faces/female"
fimages = spark.readImages(femaleimageDir, recursive = True, sampleRatio = 0.1).cache()

maleimageDir = "wasbs://mntdatabrick@zbatchaistorage.blob.core.windows.net/faces/female"
mimages = spark.readImages(femaleimageDir, recursive = True, sampleRatio = 0.1).cache()
mimages.printSchema()
print(fimages.count(), mimages.count())