In [0]:
from pyspark.sql.functions import col
import tensorflow as tf
spark_df = spark.read.format("delta").load("/databricks-datasets/flowers/delta") \
  .select(col("content"), col("label_index")) \
  .limit(100)


In [0]:
path = '/ml/flowersData/converted_data.tfrecord'
spark_df.write.format("tfrecords").mode("overwrite").save(path)
display(dbutils.fs.ls(path))

path,name,size
dbfs:/ml/flowersData/converted_data.tfrecord/_SUCCESS,_SUCCESS,0
dbfs:/ml/flowersData/converted_data.tfrecord/part-r-00000,part-r-00000,17175166


In [0]:
import os
filenames = [(f"/dbfs{path}/{name}") for name in os.listdir("/dbfs" + path) if name.startswith("part")]
dataset = tf.data.TFRecordDataset(filenames)


In [0]:
def decode_and_normalize(serialized_example, image_size = 224):  
  # Parse from single example
  feature_dataset = tf.io.parse_single_example(
      serialized_example,
      features={
          'content': tf.io.FixedLenFeature([], tf.string),
          'label_index': tf.io.FixedLenFeature([], tf.int64),
      })
  # Decode the parsed data
  image = tf.io.decode_jpeg(feature_dataset['content'])
  label = tf.cast(feature_dataset['label_index'], tf.int32)
  # Resize the decoded data into the desired size
  image = tf.image.resize(image, [image_size, image_size])
  # Finally, normalize the data
  image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
  return image, label

In [0]:
parsed_dataset = dataset.map(decode_and_normalize)

In [0]:
parsed_dataset

In [0]:
from pyspark.sql.types import *
path = "test-output.tfrecord"
fields = [StructField("id", IntegerType()), 
StructField("IntegerCol", IntegerType()),
StructField("LongCol", LongType()), 
StructField("FloatCol", FloatType()),
StructField("DoubleCol", DoubleType()), 
StructField("VectorCol", ArrayType(DoubleType(), True)),
StructField("StringCol", StringType())]
schema = StructType(fields)

test_rows = [[11, 1, 23, 10.0, 14.0, [1.0, 2.0], "r1"], [21, 2, 24, 12.0, 15.0, [2.0, 2.0], "r2"]]
rdd = spark.sparkContext.parallelize(test_rows)
df = spark.createDataFrame(rdd, schema)
path= 'dbfs:/tmp/dataset'
df.write.format("tfrecords").option("recordType", "Example").save(path)
display(df)

id,IntegerCol,LongCol,FloatCol,DoubleCol,VectorCol,StringCol
11,1,23,10.0,14.0,"List(1.0, 2.0)",r1
21,2,24,12.0,15.0,"List(2.0, 2.0)",r2


In [0]:
df = spark.read.format("tfrecords").option("recordType", "Example").load(path)
df.show()


In [0]:
import os
import subprocess
import uuid
work_dir = os.path.join("/ml/tmp/petastorm", str(uuid.uuid4()))
dbutils.fs.mkdirs(work_dir)
def get_local_path(dbfs_path):
  return os.path.join("/dbfs", dbfs_path.lstrip("/"))


In [0]:
data_url = "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2"
libsvm_path = os.path.join(work_dir, "mnist.bz2")
subprocess.check_output(["wget", data_url, "-O", get_local_path(libsvm_path)])
df = spark.read.format("libsvm") \
 .option("numFeatures", "784") \
 .load(libsvm_path)


In [0]:
%scala
import org.apache.spark.ml.linalg.Vector
val toArray = udf {v: Vector => v.toArray }
spark.sqlContext.udf.register("toArray", toArray)

In [0]:
parquet_path = os.path.join(work_dir, "parquet")
df.selectExpr("toArray(features) AS features", "int(label) AS label") \
 .repartition(10) \
 .write.mode("overwrite") \
 .option("parquet.block.size", 1024 * 1024) \
 .parquet(parquet_path)

In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models, layers
from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset
def get_model():
  model = models.Sequential()
  model.add(layers.Conv2D(32, kernel_size=(3, 3),
                         activation='relu',
                         input_shape=(28, 28, 1)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D(pool_size=(2, 2)))
  model.add(layers.Dropout(0.25))
  model.add(layers.Flatten())
  model.add(layers.Dense(128, activation='relu'))
  model.add(layers.Dropout(0.5))
  model.add(layers.Dense(10, activation='softmax'))
  return model


In [0]:
petastorm_dataset_url = "file://" + get_local_path(parquet_path)

In [0]:
with make_batch_reader(petastorm_dataset_url, num_epochs=100) as reader:
  dataset = make_petastorm_dataset(reader) \
  .map(lambda x: (tf.reshape(x.features, [-1, 28, 28, 1]), tf.one_hot(x.label, 10)))
  model = get_model()
  optimizer = keras.optimizers.Adadelta()
  model.compile(optimizer=optimizer,
               loss='categorical_crossentropy',
               metrics=['accuracy'])
  model.fit(dataset, steps_per_epoch=10, epochs=10)


In [0]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql.functions import col, pandas_udf, PandasUDFType

images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load("/databricks-datasets/flower_photos")

display(images.limit(5))

path,modificationTime,length,content
dbfs:/databricks-datasets/flower_photos/tulips/2431737309_1468526f8b.jpg,2019-12-11T22:18:32.000+0000,281953,/9j/4AAQSkZJRgABAQEBLAEsAAD/4gxYSUNDX1BST0ZJTEUAAQEAAAxITGlubwIQAABtbnRyUkdCIFhZWiAHzgACAAkABgAxAABhY3NwTVNGVAAAAABJRUMgc1JHQgAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLUhQICAAAAAAAAA= (truncated)
dbfs:/databricks-datasets/flower_photos/sunflowers/4932735362_6e1017140f.jpg,2019-12-11T22:18:00.000+0000,277326,/9j/4AAQSkZJRgABAQEASABIAAD/2wBDAAEBAQEBAQEBAQEBAQECAgMCAgICAgQDAwIDBQQFBQUEBAQFBgcGBQUHBgQEBgkGBwgICAgIBQYJCgkICgcICAj/2wBDAQEBAQICAgQCAgQIBQQFCAgICAgICAgICAgICAgICAgICAg= (truncated)
dbfs:/databricks-datasets/flower_photos/tulips/8717900362_2aa508e9e5.jpg,2019-12-11T22:18:52.000+0000,265806,/9j/4AAQSkZJRgABAQEASABIAAD/4gxYSUNDX1BST0ZJTEUAAQEAAAxITGlubwIQAABtbnRyUkdCIFhZWiAHzgACAAkABgAxAABhY3NwTVNGVAAAAABJRUMgc1JHQgAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLUhQICAAAAAAAAA= (truncated)
dbfs:/databricks-datasets/flower_photos/sunflowers/4341530649_c17bbc5d01.jpg,2019-12-11T22:17:56.000+0000,257418,/9j/4AAQSkZJRgABAQEASABIAAD/4gxYSUNDX1BST0ZJTEUAAQEAAAxITGlubwIQAABtbnRyUkdCIFhZWiAHzgACAAkABgAxAABhY3NwTVNGVAAAAABJRUMgc1JHQgAAAAAAAAAAAAAAAAAA9tYAAQAAAADTLUhQICAAAAAAAAA= (truncated)
dbfs:/databricks-datasets/flower_photos/daisy/5693459303_e61d9a9533.jpg,2019-12-11T22:16:30.000+0000,248540,/9j/4AAQSkZJRgABAQEAYABgAAD/2wBDAAEBAQEBAQEBAQEBAQECAgMCAgICAgQDAwIDBQQFBQUEBAQFBgcGBQUHBgQEBgkGBwgICAgIBQYJCgkICgcICAj/2wBDAQEBAQICAgQCAgQIBQQFCAgICAgICAgICAgICAgICAgICAg= (truncated)


In [0]:
model = ResNet50(include_top=False)
model.summary()  # verify that the top layer is removed
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
  model = ResNet50(weights=None, include_top=False)
  model.set_weights(bc_model_weights.value)
  return model

def preprocess(content):
  img = Image.open(io.BytesIO(content)).resize([224, 224])
  arr = img_to_array(img)
  return preprocess_input(arr)

def featurize_series(model, content_series):
  input = np.stack(content_series.map(preprocess))
  preds = model.predict(input)
  output = [p.flatten() for p in preds]
  return pd.Series(output)


In [0]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

In [0]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")
features_df = images.repartition(16).select(col("path"), featurize_udf("content").alias("features"))
features_df.write.mode("overwrite").parquet("dbfs:/ml/tmp/flower_photos_features")
