From 5325cef60dfd00e0f7fa3156678677f1b3a2ef1a Mon Sep 17 00:00:00 2001 From: CHOIJAEHONG Date: Sat, 18 Jul 2015 20:57:26 +0900 Subject: [PATCH 1/4] [SPARK-8951] support Unicode characters in collect() --- R/pkg/inst/tests/test_sparkSQL.R | 22 +++++++++++++++++++ .../scala/org/apache/spark/api/r/SerDe.scala | 9 ++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index a3039d36c9402..bcf56e902c94c 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -417,6 +417,28 @@ test_that("collect() and take() on a DataFrame return the same number of rows an expect_equal(ncol(collect(df)), ncol(take(df, 10))) }) + +test_that("collect() support Unicode characters", { + lines <- c("{\"name\":\"안녕하세요\"}", + "{\"name\":\"您好\", \"age\":30}", + "{\"name\":\"こんにちは\", \"age\":19}", + "{\"name\":\"Xin chào\"}") + jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") + writeLines(lines, jsonPath) + + df <- read.df(sqlContext, jsonPath, "json") + rdf <- collect(df) + expect_true(is.data.frame(rdf)) + expect_equal(rdf$name[1], "안녕하세요") + expect_equal(rdf$name[2], "您好") + expect_equal(rdf$name[3], "こんにちは") + expect_equal(rdf$name[4], "Xin chào") + + df2 <- createDataFrame(sqlContext, rdf) + expect_equal(collect(where(df2, df2$name == "您好"))[[2]], "您好") +}) + + test_that("multiple pipeline transformations result in an RDD with the correct values", { df <- jsonFile(sqlContext, jsonPath) first <- lapply(df, function(row) { diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index d5b4260bf4529..422f9277948a0 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -303,12 +303,11 @@ private[spark] object SerDe { out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9) } - // NOTE: Only works for ASCII right now def writeString(out: DataOutputStream, value: String): Unit = { - val len = value.length - out.writeInt(len + 1) // For the \0 - out.writeBytes(value) - out.writeByte(0) + val utf8 = value.getBytes("UTF-8") + val len = utf8.length + out.writeInt(len) + out.write(utf8, 0, len) } def writeBytes(out: DataOutputStream, value: Array[Byte]): Unit = { From 15f6c7d169c1ae26f55cf429c1fb79897b5aa069 Mon Sep 17 00:00:00 2001 From: CHOIJAEHONG Date: Fri, 24 Jul 2015 12:59:20 +0900 Subject: [PATCH 2/4] modify R/pkg/R/deserialize.R to support unicode strings --- R/pkg/R/deserialize.R | 6 ++++-- R/pkg/inst/tests/test_sparkSQL.R | 20 +++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 7d1f6b0819ed0..1808488cfa002 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -56,8 +56,10 @@ readTypedObject <- function(con, type) { readString <- function(con) { stringLen <- readInt(con) - string <- readBin(con, raw(), stringLen, endian = "big") - rawToChar(string) + raw <- readBin(con, raw(), stringLen, endian = "big") + string <- rawToChar(raw) + Encoding(string) <- "UTF-8" + enc2native(string) } readInt <- function(con) { diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index bcf56e902c94c..909d4e0b90438 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -417,8 +417,12 @@ test_that("collect() and take() on a DataFrame return the same number of rows an expect_equal(ncol(collect(df)), ncol(take(df, 10))) }) - test_that("collect() support Unicode characters", { + convertToNative <- function(s) { + Encoding(s) <- "UTF-8" + enc2native(s) + } + lines <- c("{\"name\":\"안녕하세요\"}", "{\"name\":\"您好\", \"age\":30}", "{\"name\":\"こんにちは\", \"age\":19}", @@ -430,12 +434,14 @@ test_that("collect() support Unicode characters", { rdf <- collect(df) expect_true(is.data.frame(rdf)) expect_equal(rdf$name[1], "안녕하세요") - expect_equal(rdf$name[2], "您好") - expect_equal(rdf$name[3], "こんにちは") - expect_equal(rdf$name[4], "Xin chào") - - df2 <- createDataFrame(sqlContext, rdf) - expect_equal(collect(where(df2, df2$name == "您好"))[[2]], "您好") + expect_equal(rdf$name[1], convertToNative("안녕하세요")) + expect_equal(rdf$name[2], convertToNative("您好")) + expect_equal(rdf$name[3], convertToNative("こんにちは")) + expect_equal(rdf$name[4], convertToNative("Xin chào")) + + df1 <- createDataFrame(sqlContext, rdf) + expect_equal(collect(where(df1, df1$name == convertToNative("您好")))$name, convertToNative("您好")) + expect_equal(collect(where(df1, df1$name == "您好"))$name, "您好") }) From bc469d8ae91024675abb37b76a1354c99206c5c2 Mon Sep 17 00:00:00 2001 From: CHOIJAEHONG Date: Fri, 24 Jul 2015 13:01:41 +0900 Subject: [PATCH 3/4] modify R/pkg/R/deserialize.R to support unicode strings --- R/pkg/inst/tests/test_sparkSQL.R | 2 -- 1 file changed, 2 deletions(-) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 909d4e0b90438..6d7899faf9439 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -433,7 +433,6 @@ test_that("collect() support Unicode characters", { df <- read.df(sqlContext, jsonPath, "json") rdf <- collect(df) expect_true(is.data.frame(rdf)) - expect_equal(rdf$name[1], "안녕하세요") expect_equal(rdf$name[1], convertToNative("안녕하세요")) expect_equal(rdf$name[2], convertToNative("您好")) expect_equal(rdf$name[3], convertToNative("こんにちは")) @@ -441,7 +440,6 @@ test_that("collect() support Unicode characters", { df1 <- createDataFrame(sqlContext, rdf) expect_equal(collect(where(df1, df1$name == convertToNative("您好")))$name, convertToNative("您好")) - expect_equal(collect(where(df1, df1$name == "您好"))$name, "您好") }) From 3686f155749e4c96f9f7553907f84c21be78d5d1 Mon Sep 17 00:00:00 2001 From: CHOIJAEHONG Date: Wed, 26 Aug 2015 14:21:52 +0900 Subject: [PATCH 4/4] support unicode strings in SparkR (writeBin with useBytes=TRUE) --- R/pkg/R/deserialize.R | 2 +- R/pkg/R/serialize.R | 2 +- R/pkg/inst/tests/test_sparkSQL.R | 22 +++++++++++----------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 1808488cfa002..32a1dc86196ef 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -59,7 +59,7 @@ readString <- function(con) { raw <- readBin(con, raw(), stringLen, endian = "big") string <- rawToChar(raw) Encoding(string) <- "UTF-8" - enc2native(string) + string } readInt <- function(con) { diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R index 311021e5d8473..c2e71171af386 100644 --- a/R/pkg/R/serialize.R +++ b/R/pkg/R/serialize.R @@ -79,7 +79,7 @@ writeJobj <- function(con, value) { writeString <- function(con, value) { utfVal <- enc2utf8(value) writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1)) - writeBin(utfVal, con, endian = "big") + writeBin(utfVal, con, endian = "big", useBytes=TRUE) } writeInt <- function(con, value) { diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 6d7899faf9439..30f48344a0ba2 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -418,31 +418,31 @@ test_that("collect() and take() on a DataFrame return the same number of rows an }) test_that("collect() support Unicode characters", { - convertToNative <- function(s) { + markUtf8 <- function(s) { Encoding(s) <- "UTF-8" - enc2native(s) + s } lines <- c("{\"name\":\"안녕하세요\"}", - "{\"name\":\"您好\", \"age\":30}", - "{\"name\":\"こんにちは\", \"age\":19}", - "{\"name\":\"Xin chào\"}") + "{\"name\":\"您好\", \"age\":30}", + "{\"name\":\"こんにちは\", \"age\":19}", + "{\"name\":\"Xin chào\"}") + jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") writeLines(lines, jsonPath) df <- read.df(sqlContext, jsonPath, "json") rdf <- collect(df) expect_true(is.data.frame(rdf)) - expect_equal(rdf$name[1], convertToNative("안녕하세요")) - expect_equal(rdf$name[2], convertToNative("您好")) - expect_equal(rdf$name[3], convertToNative("こんにちは")) - expect_equal(rdf$name[4], convertToNative("Xin chào")) + expect_equal(rdf$name[1], markUtf8("안녕하세요")) + expect_equal(rdf$name[2], markUtf8("您好")) + expect_equal(rdf$name[3], markUtf8("こんにちは")) + expect_equal(rdf$name[4], markUtf8("Xin chào")) df1 <- createDataFrame(sqlContext, rdf) - expect_equal(collect(where(df1, df1$name == convertToNative("您好")))$name, convertToNative("您好")) + expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好")) }) - test_that("multiple pipeline transformations result in an RDD with the correct values", { df <- jsonFile(sqlContext, jsonPath) first <- lapply(df, function(row) {