From 5c8a0d0b9a1d550a14cdb328eeddc374717cda81 Mon Sep 17 00:00:00 2001 From: Peng Yu Date: Tue, 20 Mar 2018 13:22:46 -0400 Subject: [PATCH 1/5] add append shape --- .../org/tensorframes/ExperimentalOperations.scala | 10 +++++++++- .../scala/org/tensorframes/BasicOperationsSuite.scala | 10 ++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/tensorframes/ExperimentalOperations.scala b/src/main/scala/org/tensorframes/ExperimentalOperations.scala index 57ae8d1..c925fd0 100644 --- a/src/main/scala/org/tensorframes/ExperimentalOperations.scala +++ b/src/main/scala/org/tensorframes/ExperimentalOperations.scala @@ -1,8 +1,9 @@ package org.tensorframes +import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.{ArrayType, DataType, NumericType} +import org.apache.spark.sql.types.{ArrayType, DataType, MetadataBuilder, NumericType} import org.tensorframes.impl.{ScalarType, SupportedOperations} @@ -45,6 +46,13 @@ trait ExperimentalOperations { } df.select(cols: _*) } + + def appendShape(df: DataFrame, col: Column, shape: Array[Double]): DataFrame = { + val meta = new MetadataBuilder + meta.putString("org.sparktf.type", "DoubleType") + meta.putDoubleArray("org.spartf.shape", shape) + df.withColumn(col.toString(), col.as("", meta.build())) + } } private[tensorframes] object ExtraOperations extends ExperimentalOperations with Logging { diff --git a/src/test/scala/org/tensorframes/BasicOperationsSuite.scala b/src/test/scala/org/tensorframes/BasicOperationsSuite.scala index 5b1a218..129476b 100644 --- a/src/test/scala/org/tensorframes/BasicOperationsSuite.scala +++ b/src/test/scala/org/tensorframes/BasicOperationsSuite.scala @@ -3,6 +3,7 @@ package org.tensorframes import org.scalatest.FunSuite import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col import org.tensorframes.dsl.Implicits._ import org.tensorframes.dsl._ @@ -42,6 +43,15 @@ class BasicOperationsSuite compareRows(df2.collect(), Array(Row(Seq(1.0), Seq(1.0)), Row(Seq(2.0), Seq(2.0)))) } + testGraph("Identity - 1 dim with manfully") { + val df = make1(Seq(Seq(1.0), Seq(2.0)), "in") + val adf = ops.appendShape(df, col("in"), Array(-1.0, 1.0)) + val p1 = placeholder[Double](Unknown, 1) named "in" + val out = identity(p1) named "out" + val df2 = adf.mapBlocks(out).select("in", "out") + compareRows(df2.collect(), Array(Row(Seq(1.0), Seq(1.0)), Row(Seq(2.0), Seq(2.0)))) + } + testGraph("Simple add - 1 dim") { val a = placeholder[Double](Unknown, 1) named "a" val b = placeholder[Double](Unknown, 1) named "b" From 7e443f0e63d1b5d4c422c2f046f78d036b069cb6 Mon Sep 17 00:00:00 2001 From: Peng Yu Date: Tue, 20 Mar 2018 17:27:09 -0400 Subject: [PATCH 2/5] add python bindings --- src/main/python/tensorframes/core.py | 20 +++++++++++++++- src/main/python/tensorframes/core_test.py | 18 ++++++++++++++- .../tensorframes/ExperimentalOperations.scala | 16 +++++++++---- .../tensorframes/BasicOperationsSuite.scala | 23 +++++++++++++++++-- 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/src/main/python/tensorframes/core.py b/src/main/python/tensorframes/core.py index 251e74e..dc7e9f4 100644 --- a/src/main/python/tensorframes/core.py +++ b/src/main/python/tensorframes/core.py @@ -9,7 +9,8 @@ from pyspark.sql.types import DoubleType, IntegerType, LongType, FloatType, ArrayType __all__ = ['reduce_rows', 'map_rows', 'reduce_blocks', 'map_blocks', - 'analyze', 'print_schema', 'aggregate', 'block', 'row'] + 'analyze', 'print_schema', 'aggregate', 'block', 'row', + 'append_shape'] _sc = None _sql = None @@ -377,6 +378,23 @@ def analyze(dframe): """ return DataFrame(_java_api().analyze(dframe._jdf), _sql) +def append_shape(dframe, col, size): + """Append extra metadata for a dataframe that + describes the numerical shape of the content. + + This method is useful when a dataframe contains non-scalar tensors, for which the shape must be checked beforehand. + + Note: nullable fields are not accepted. + + The function [print_schema] lets users introspect the information added to the DataFrame. + + :param dframe: a Spark DataFrame + :param col: a Column expression + :param size: a shape corresponding to the tensor + :return: a Spark DataFrame with metadata information embedded. + """ + return DataFrame(_java_api().appendShape(dframe._jdf, col, size), _sql) + def aggregate(fetches, grouped_data, initial_variables=_initial_variables_default): """ Performs an algebraic aggregation on the grouped data. diff --git a/src/main/python/tensorframes/core_test.py b/src/main/python/tensorframes/core_test.py index a5ea321..01451cf 100644 --- a/src/main/python/tensorframes/core_test.py +++ b/src/main/python/tensorframes/core_test.py @@ -4,6 +4,7 @@ from pyspark import SparkContext from pyspark.sql import DataFrame, SQLContext from pyspark.sql import Row +from pyspark.sql.functions import col import tensorflow as tf import pandas as pd @@ -198,7 +199,22 @@ def test_reduce_rows_1(self): res = tfs.reduce_rows(x, df) assert res == sum([r.x for r in data]) - # This test fails + def test_append_shape(self): + data = [Row(x=float(x)) for x in range(5)] + df = self.sql.createDataFrame(data) + ddf = tfs.append_shape(df, col('x'), [-1, 1]) + import ipdb; ipdb.set_trace() + with tf.Graph().as_default(): + # The placeholder that corresponds to column 'x' + x_1 = tf.placeholder(tf.double, shape=[], name="x_1") + x_2 = tf.placeholder(tf.double, shape=[], name="x_2") + # The output that adds 3 to x + x = tf.add(x_1, x_2, name='x') + # The resulting number + res = tfs.reduce_rows(x, ddf) + assert res == sum([r.x for r in data]) + +# This test fails def test_reduce_blocks_1(self): data = [Row(x=float(x)) for x in range(5)] df = self.sql.createDataFrame(data) diff --git a/src/main/scala/org/tensorframes/ExperimentalOperations.scala b/src/main/scala/org/tensorframes/ExperimentalOperations.scala index c925fd0..49ebc6a 100644 --- a/src/main/scala/org/tensorframes/ExperimentalOperations.scala +++ b/src/main/scala/org/tensorframes/ExperimentalOperations.scala @@ -4,7 +4,6 @@ import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{ArrayType, DataType, MetadataBuilder, NumericType} - import org.tensorframes.impl.{ScalarType, SupportedOperations} /** @@ -47,10 +46,17 @@ trait ExperimentalOperations { df.select(cols: _*) } - def appendShape(df: DataFrame, col: Column, shape: Array[Double]): DataFrame = { + def appendShape(df: DataFrame, col: Column, shape: Array[Long]): DataFrame = { + val meta = new MetadataBuilder - meta.putString("org.sparktf.type", "DoubleType") - meta.putDoubleArray("org.spartf.shape", shape) + val colDtypes = df.select(col).schema.fields.head.dataType + val basicDatatype = { + ExtraOperations.extractBasicType(colDtypes).getOrElse(throw new Exception(s"'$colDtypes' was not supported")) + } + meta.putString(MetadataConstants.tensorStructType, + SupportedOperations.opsFor(basicDatatype).sqlType.toString + ) + meta.putLongArray(MetadataConstants.shapeKey, shape) df.withColumn(col.toString(), col.as("", meta.build())) } } @@ -118,7 +124,7 @@ private[tensorframes] object ExtraOperations extends ExperimentalOperations with DataFrameInfo(allInfo) } - private def extractBasicType(dt: DataType): Option[ScalarType] = dt match { + def extractBasicType(dt: DataType): Option[ScalarType] = dt match { case x: NumericType => Some(SupportedOperations.opsFor(x).scalarType) case x: ArrayType => extractBasicType(x.elementType) case _ => None diff --git a/src/test/scala/org/tensorframes/BasicOperationsSuite.scala b/src/test/scala/org/tensorframes/BasicOperationsSuite.scala index 129476b..e5e68eb 100644 --- a/src/test/scala/org/tensorframes/BasicOperationsSuite.scala +++ b/src/test/scala/org/tensorframes/BasicOperationsSuite.scala @@ -43,9 +43,9 @@ class BasicOperationsSuite compareRows(df2.collect(), Array(Row(Seq(1.0), Seq(1.0)), Row(Seq(2.0), Seq(2.0)))) } - testGraph("Identity - 1 dim with manfully") { + testGraph("Identity - 1 dim, Manually") { val df = make1(Seq(Seq(1.0), Seq(2.0)), "in") - val adf = ops.appendShape(df, col("in"), Array(-1.0, 1.0)) + val adf = ops.appendShape(df, col("in"), Array(-1, 1)) val p1 = placeholder[Double](Unknown, 1) named "in" val out = identity(p1) named "out" val df2 = adf.mapBlocks(out).select("in", "out") @@ -67,6 +67,25 @@ class BasicOperationsSuite Row(Seq(2.0), Seq(2.2), Seq(4.2)))) } + testGraph("Simple add - 1 dim, Manually") { + val a = placeholder[Double](Unknown, 1) named "a" + val b = placeholder[Double](Unknown, 1) named "b" + val out = a + b named "out" + + val df = sql.createDataFrame(Seq( + Seq(1.0)->Seq(1.1), + Seq(2.0)->Seq(2.2))).toDF("a", "b") + val adf = { + ops.appendShape( + ops.appendShape(df, col("a"), Array(-1, 1)), + col("b"), Array(-1, 1)) + } + val df2 = adf.mapBlocks(out).select("a", "b","out") + compareRows(df2.collect(), Array( + Row(Seq(1.0), Seq(1.1), Seq(2.1)), + Row(Seq(2.0), Seq(2.2), Seq(4.2)))) + } + testGraph("Reduce - sum double") { val df = make1(Seq(1.0, 2.0), "x") val x1 = placeholder[Double]() named "x_1" From 0cf78390943f59eaaf01eebb83340782aa4a4154 Mon Sep 17 00:00:00 2001 From: Peng Yu Date: Wed, 21 Mar 2018 13:03:39 -0400 Subject: [PATCH 3/5] fix the column --- src/main/python/tensorframes/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/python/tensorframes/core.py b/src/main/python/tensorframes/core.py index dc7e9f4..af32e5f 100644 --- a/src/main/python/tensorframes/core.py +++ b/src/main/python/tensorframes/core.py @@ -393,7 +393,7 @@ def append_shape(dframe, col, size): :param size: a shape corresponding to the tensor :return: a Spark DataFrame with metadata information embedded. """ - return DataFrame(_java_api().appendShape(dframe._jdf, col, size), _sql) + return DataFrame(_java_api().appendShape(dframe._jdf, col._jc, size), _sql) def aggregate(fetches, grouped_data, initial_variables=_initial_variables_default): """ From 0efca130450e7b31474261882650cfb09e9a230e Mon Sep 17 00:00:00 2001 From: Peng Yu Date: Wed, 21 Mar 2018 13:22:10 -0400 Subject: [PATCH 4/5] aaaa --- .../scala/org/tensorframes/ExperimentalOperations.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/tensorframes/ExperimentalOperations.scala b/src/main/scala/org/tensorframes/ExperimentalOperations.scala index 49ebc6a..c12cb0e 100644 --- a/src/main/scala/org/tensorframes/ExperimentalOperations.scala +++ b/src/main/scala/org/tensorframes/ExperimentalOperations.scala @@ -6,6 +6,8 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{ArrayType, DataType, MetadataBuilder, NumericType} import org.tensorframes.impl.{ScalarType, SupportedOperations} +import java.util + /** * Some useful methods for operating on dataframes that are not part of the official API (and thus may change anytime). */ @@ -46,7 +48,7 @@ trait ExperimentalOperations { df.select(cols: _*) } - def appendShape(df: DataFrame, col: Column, shape: Array[Long]): DataFrame = { + def appendShape(df: DataFrame, col: Column, shape: Array[Int]): DataFrame = { val meta = new MetadataBuilder val colDtypes = df.select(col).schema.fields.head.dataType @@ -56,9 +58,12 @@ trait ExperimentalOperations { meta.putString(MetadataConstants.tensorStructType, SupportedOperations.opsFor(basicDatatype).sqlType.toString ) - meta.putLongArray(MetadataConstants.shapeKey, shape) + meta.putLongArray(MetadataConstants.shapeKey, shape.map(_.asInstanceOf[Long])) df.withColumn(col.toString(), col.as("", meta.build())) } + + def appendShape(df: DataFrame, col:Column, shape: util.ArrayList[Int]): DataFrame = + appendShape(df, col, shape) } private[tensorframes] object ExtraOperations extends ExperimentalOperations with Logging { From 068fa747c1037865c9ccb1c4af5813c6e2a20649 Mon Sep 17 00:00:00 2001 From: Peng Yu Date: Wed, 21 Mar 2018 16:05:30 -0400 Subject: [PATCH 5/5] address beatiful comments --- src/main/python/tensorframes/core.py | 9 ++++++--- src/main/python/tensorframes/core_test.py | 5 ++--- .../org/tensorframes/ExperimentalOperations.scala | 10 ++++++---- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/main/python/tensorframes/core.py b/src/main/python/tensorframes/core.py index af32e5f..a9da016 100644 --- a/src/main/python/tensorframes/core.py +++ b/src/main/python/tensorframes/core.py @@ -378,11 +378,12 @@ def analyze(dframe): """ return DataFrame(_java_api().analyze(dframe._jdf), _sql) -def append_shape(dframe, col, size): +def append_shape(dframe, col, shape): """Append extra metadata for a dataframe that describes the numerical shape of the content. This method is useful when a dataframe contains non-scalar tensors, for which the shape must be checked beforehand. + The user is responsible for providing the right shape, any mismatch will trigger eventually an exception in Spark Note: nullable fields are not accepted. @@ -390,10 +391,12 @@ def append_shape(dframe, col, size): :param dframe: a Spark DataFrame :param col: a Column expression - :param size: a shape corresponding to the tensor + :param shape: a shape corresponding to the tensor, + detailed explanation https://www.tensorflow.org/programmers_guide/tensors#shape :return: a Spark DataFrame with metadata information embedded. """ - return DataFrame(_java_api().appendShape(dframe._jdf, col._jc, size), _sql) + shape = [i or -1 for i in shape] + return DataFrame(_java_api().appendShape(dframe._jdf, col._jc, shape), _sql) def aggregate(fetches, grouped_data, initial_variables=_initial_variables_default): """ diff --git a/src/main/python/tensorframes/core_test.py b/src/main/python/tensorframes/core_test.py index 01451cf..21608a9 100644 --- a/src/main/python/tensorframes/core_test.py +++ b/src/main/python/tensorframes/core_test.py @@ -202,8 +202,7 @@ def test_reduce_rows_1(self): def test_append_shape(self): data = [Row(x=float(x)) for x in range(5)] df = self.sql.createDataFrame(data) - ddf = tfs.append_shape(df, col('x'), [-1, 1]) - import ipdb; ipdb.set_trace() + ddf = tfs.append_shape(df, col('x'), [-1]) with tf.Graph().as_default(): # The placeholder that corresponds to column 'x' x_1 = tf.placeholder(tf.double, shape=[], name="x_1") @@ -214,7 +213,7 @@ def test_append_shape(self): res = tfs.reduce_rows(x, ddf) assert res == sum([r.x for r in data]) -# This test fails + # This test fails def test_reduce_blocks_1(self): data = [Row(x=float(x)) for x in range(5)] df = self.sql.createDataFrame(data) diff --git a/src/main/scala/org/tensorframes/ExperimentalOperations.scala b/src/main/scala/org/tensorframes/ExperimentalOperations.scala index c12cb0e..347c093 100644 --- a/src/main/scala/org/tensorframes/ExperimentalOperations.scala +++ b/src/main/scala/org/tensorframes/ExperimentalOperations.scala @@ -1,12 +1,14 @@ package org.tensorframes +import java.util import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{ArrayType, DataType, MetadataBuilder, NumericType} import org.tensorframes.impl.{ScalarType, SupportedOperations} +import scala.collection.JavaConverters._ + -import java.util /** * Some useful methods for operating on dataframes that are not part of the official API (and thus may change anytime). @@ -52,9 +54,9 @@ trait ExperimentalOperations { val meta = new MetadataBuilder val colDtypes = df.select(col).schema.fields.head.dataType - val basicDatatype = { + val basicDatatype = ExtraOperations.extractBasicType(colDtypes).getOrElse(throw new Exception(s"'$colDtypes' was not supported")) - } + meta.putString(MetadataConstants.tensorStructType, SupportedOperations.opsFor(basicDatatype).sqlType.toString ) @@ -63,7 +65,7 @@ trait ExperimentalOperations { } def appendShape(df: DataFrame, col:Column, shape: util.ArrayList[Int]): DataFrame = - appendShape(df, col, shape) + appendShape(df, col, shape.asScala.toArray[Int]) } private[tensorframes] object ExtraOperations extends ExperimentalOperations with Logging {