Skip to content

Commit

Permalink
rebased : Handle spark's BinaryType (#377)
Browse files Browse the repository at this point in the history
* Bump release version
* Add new behavior to type conversion docs
* Convert binary data into base64 strings

---------

Co-authored-by: Jeff <16146855+fishfood34@users.noreply.github.com>
  • Loading branch information
asaharn and fishfood34 committed May 14, 2024
1 parent 36d6b2e commit 6a11e2d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ src_managed/
project/boot/
project/plugins/project/

# VS Code and extensions
.vscode/
.bloop/
.metals/
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.CharArrayWriter
import java.math.BigInteger
import java.time.temporal.ChronoUnit
import java.time.{Instant, LocalDateTime, ZoneId}
import java.util.Base64

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
Expand Down Expand Up @@ -61,6 +62,7 @@ object RowCSVWriterUtils {
nested: Boolean): Unit = {
dataType match {
case StringType => writeStringFromUTF8(row.getUTF8String(fieldIndexInRow), writer)
case BinaryType => writeStringFromBinary(row.getBinary(fieldIndexInRow), writer)
case DateType =>
writer.writeStringField(DateTimeUtils.toJavaDate(row.getInt(fieldIndexInRow)).toString)
case TimestampType =>
Expand Down Expand Up @@ -247,4 +249,8 @@ object RowCSVWriterUtils {
private def writeStringFromUTF8(str: UTF8String, writer: Writer): Unit = {
writer.writeStringField(str.toString)
}

private def writeStringFromBinary(bytes: Array[Byte], writer: Writer): Unit = {
writer.writeStringField(Base64.getEncoder.encodeToString(bytes))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,42 @@ class WriterTests extends AnyFlatSpec with Matchers {
res5 shouldEqual "\"-123456789123456789.0000000000\"" + lineSep + "\"0.0000000000\"" + lineSep + "\"0.1000000000\"" +
lineSep + "\"-0.1000000000\"" + lineSep
}

"convertRowToCsv" should "convert the row as expected with binary data" in {
val sparkConf: SparkConf = new SparkConf()
.set("spark.testing", "true")
.set("spark.ui.enabled", "false")
.setAppName("SimpleKustoDataSink")
.setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val byteArrayOutputStream = new ByteArrayOutputStream()
val streamWriter = new OutputStreamWriter(byteArrayOutputStream)
val writer = new BufferedWriter(streamWriter)
val csvWriter = CountingWriter(writer)

val someData =
List(
"Test string for spark binary".getBytes(),
"A second test string for spark binary".getBytes(),
null
)

val someSchema = List(StructField("binaryString", BinaryType, nullable = true))

val df = sparkSession.createDataFrame(
sparkSession.sparkContext.parallelize(WriterTests.asRows(someData)),
StructType(WriterTests.asSchema(someSchema)))

val dfRows: Array[InternalRow] = df.queryExecution.toRdd.collect()

RowCSVWriterUtils.writeRowAsCSV(dfRows(0), df.schema, TimeZone.getTimeZone("UTC").toZoneId, csvWriter)
RowCSVWriterUtils.writeRowAsCSV(dfRows(1), df.schema, TimeZone.getTimeZone("UTC").toZoneId, csvWriter)
RowCSVWriterUtils.writeRowAsCSV(dfRows(2), df.schema, TimeZone.getTimeZone("UTC").toZoneId, csvWriter)

writer.flush()
val res1 = byteArrayOutputStream.toString
res1 shouldEqual "\"VGVzdCBzdHJpbmcgZm9yIHNwYXJrIGJpbmFyeQ==\"" + lineSep + "\"QSBzZWNvbmQgdGVzdCBzdHJpbmcgZm9yIHNwYXJrIGJpbmFyeQ==\"" + lineSep + lineSep
}
}

object WriterTests {
Expand Down
3 changes: 3 additions & 0 deletions docs/Spark-Kusto DataTypes mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ When writing to or reading from a Kusto table, the connector converts types from
| Spark data type | Kusto data type |
|-----------------|-----------------|
| StringType | string |
| BinaryType | string |
| IntegerType | int |
| LongType | long |
| BooleanType | bool |
Expand Down Expand Up @@ -47,6 +48,8 @@ Kusto **datetime** data type is always read in '%Y-%m-%d %H:%M:%s' format , whil
hand spark **DateType** is of format '%Y-%m-%d' and **TimestampType** is of format '%Y-%m-%d %H:%M:%s'. This is why Kusto 'timespan'
type is translated into a string by the connector and **we recommend using only datetime and TimestampType**.

Spark **BinaryType** is convereted to a base 64 encoded string.

Kusto **decimal** type
- Kusto as a source : The precision and scale supported with Kusto as a source is (38,18) respectively.
- Kusto as a sink : The precision and scale supported with Kusto as a source is (34,14) respectively.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>5.0.6</revision>
<revision>5.0.7</revision>
<!-- Spark dependencies -->
<scala.version.major>2.12</scala.version.major>
<scalafmt.plugin.version>1.1.1640084764.9f463a9</scalafmt.plugin.version>
Expand Down

0 comments on commit 6a11e2d

Please sign in to comment.