Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8264][SQL]add substring_index function #7533

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ object FunctionRegistry {
expression[StringSplit]("split"),
expression[Substring]("substr"),
expression[Substring]("substring"),
expression[Substring_index]("substring_index"),
expression[StringTrim]("trim"),
expression[UnBase64]("unbase64"),
expression[Upper]("ucase"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.text.DecimalFormat
import java.util.Locale
import java.util.regex.{MatchResult, Pattern}

import org.apache.commons.lang.StringUtils

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions.codegen._
Expand Down Expand Up @@ -355,6 +357,92 @@ case class StringInstr(str: Expression, substr: Expression)
}
}

/**
* Returns the substring from string str before count occurrences of the delimiter delim.
* If count is positive, everything the left of the final delimiter (counting from left) is
* returned. If count is negative, every to the right of the final delimiter (counting from the
* right) is returned. substring_index performs a case-sensitive match when searching for delim.
*/
case class Substring_index(strExpr: Expression, delimExpr: Expression, countExpr: Expression)
extends Expression with ImplicitCastInputTypes with CodegenFallback {

override def dataType: DataType = StringType
override def inputTypes: Seq[DataType] = Seq(StringType, StringType, IntegerType)
override def nullable: Boolean = strExpr.nullable || delimExpr.nullable || countExpr.nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to override the nullable method, as the default value is false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's abstract in Expression which Substring_index inherited from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my bad, actually I mean the function foldable, not the nullable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, actually I mean we need to override the function foldable, which is false by default, but obviously in this expression, which depends on its children.

override def children: Seq[Expression] = Seq(strExpr, delimExpr, countExpr)
override def prettyName: String = "substring_index"
override def toString: String = s"substring_index($strExpr, $delimExpr, $countExpr)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this line, overriding the prettyName is enough.


override def eval(input: InternalRow): Any = {
val str = strExpr.eval(input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do that like:

   val str = ...
   if (str != null) {
      val delim = ...
      if (delim != null) {
         val count = ...

That's kind of short circuit evaluation.

val delim = delimExpr.eval(input)
val count = countExpr.eval(input)
if (str == null || delim == null || count == null) {
null
} else {
subStrIndex(
str.asInstanceOf[UTF8String],
delim.asInstanceOf[UTF8String],
count.asInstanceOf[Int])
}
}

private def lastOrdinalIndexOf(
str: UTF8String, searchStr: UTF8String, ordinal: Int, lastIndex: Boolean = false): Int = {
ordinalIndexOf(str, searchStr, ordinal, true)
}

private def ordinalIndexOf(
str: UTF8String, searchStr: UTF8String, ordinal: Int, lastIndex: Boolean = false): Int = {
if (str == null || searchStr == null || ordinal <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the null checking here?

return -1
}
val strNumChars = str.numChars()
if (searchStr.numBytes() == 0) {
return if (lastIndex) {strNumChars} else {0}
}
var found = 0
var index = if (lastIndex) {strNumChars} else {0}
do {
if (lastIndex) {
index = str.lastIndexOf(searchStr, index - 1)
} else {
index = str.indexOf(searchStr, index + 1)
}
if (index < 0) {
return index
}
found += 1
} while (found < ordinal)
index
}

private def subStrIndex(strUtf8: UTF8String, delimUtf8: UTF8String, count: Int): UTF8String = {
if (strUtf8 == null || delimUtf8 == null || count == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the null checking here?

return null
}
if (strUtf8.numBytes() == 0 || delimUtf8.numBytes() == 0 || count == 0) {
return UTF8String.fromString("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UTF8String.EMPTY_UTF8

}
val res = if (count > 0) {
val idx = ordinalIndexOf(strUtf8, delimUtf8, count)
if (idx != -1) {
strUtf8.substring(0, idx)
} else {
strUtf8
}
} else {
val idx = lastOrdinalIndexOf(strUtf8, delimUtf8, -count)
if (idx != -1) {
strUtf8.substring(idx + delimUtf8.numChars(), strUtf8.numChars())
} else {
strUtf8
}
}
res
}
}

/**
* A function that returns the position of the first occurrence of substr
* in given string after position pos.
Expand Down
25 changes: 24 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1777,8 +1777,31 @@ object functions {
def instr(str: Column, substring: String): Column = StringInstr(str.expr, lit(substring).expr)

/**
* Locate the position of the first occurrence of substr in a string column.
* Returns the substring from string str before count occurrences of the delimiter delim.
* If count is positive, everything the left of the final delimiter (counting from left) is
* returned. If count is negative, every to the right of the final delimiter (counting from the
* right) is returned. substring_index performs a case-sensitive match when searching for delim.
*
* @group string_funcs
* @since 1.5.0
*/
def substring_index(str: String, delim: String, count: Int): Column =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this version of API. @rxin actually made some clean up, and removed the string (the column name) version API.

substring_index(Column(str), delim, count)

/**
* Returns the substring from string str before count occurrences of the delimiter delim.
* If count is positive, everything the left of the final delimiter (counting from left) is
* returned. If count is negative, every to the right of the final delimiter (counting from the
* right) is returned. substring_index performs a case-sensitive match when searching for delim.
*
* @group string_funcs
* @since 1.5.0
*/
def substring_index(str: Column, delim: String, count: Int): Column =
Substring_index(str.expr, lit(delim).expr, lit(count).expr)

/**
* Locate the position of the first occurrence of substr.
* NOTE: The position is not zero based, but 1 based index, returns 0 if substr
* could not be found in str.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,63 @@ class StringFunctionsSuite extends QueryTest {
Row(1))
}

test("string substring_index function") {
val df = Seq(("www.apache.org", ".", "zz")).toDF("a", "b", "c")
checkAnswer(
df.select(substring_index($"a", ".", 3)),
Row("www.apache.org"))
checkAnswer(
df.select(substring_index($"a", ".", 2)),
Row("www.apache"))
checkAnswer(
df.select(substring_index($"a", ".", 1)),
Row("www"))
checkAnswer(
df.select(substring_index($"a", ".", 0)),
Row(""))
checkAnswer(
df.select(substring_index(lit("www.apache.org"), ".", -1)),
Row("org"))
checkAnswer(
df.select(substring_index(lit("www.apache.org"), ".", -2)),
Row("apache.org"))
checkAnswer(
df.select(substring_index(lit("www.apache.org"), ".", -3)),
Row("www.apache.org"))
// str is empty string
checkAnswer(
df.select(substring_index(lit(""), ".", 1)),
Row(""))
// empty string delim
checkAnswer(
df.select(substring_index(lit("www.apache.org"), "", 1)),
Row(""))
// delim does not exist in str
checkAnswer(
df.select(substring_index(lit("www.apache.org"), "#", 1)),
Row("www.apache.org"))
// delim is 2 chars
checkAnswer(
df.select(substring_index(lit("www||apache||org"), "||", 2)),
Row("www||apache"))
checkAnswer(
df.select(substring_index(lit("www||apache||org"), "||", -2)),
Row("apache||org"))
// null
checkAnswer(
df.select(substring_index(lit(null), "||", 2)),
Row(null))
checkAnswer(
df.select(substring_index(lit("www.apache.org"), null, 2)),
Row(null))
// non ascii chars
// scalastyle:off
checkAnswer(
df.selectExpr("""substring_index("大千世界大千世界", "千", 2)"""),
Row("大千世界大"))
// scalastyle:on
}

test("string locate function") {
val df = Seq(("aaads", "aa", "zz", 1)).toDF("a", "b", "c", "d")

Expand Down
63 changes: 63 additions & 0 deletions unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,69 @@ public int indexOf(UTF8String v, int start) {
return -1;
}

private enum ByteType {FIRSTBYTE, MIDBYTE, SINGLEBYTECHAR};

private ByteType checkByteType(Byte b) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

byte b, we can avoid the primitive boxing.

int firstTwoBits = (b >>> 6) & 0x03;
if (firstTwoBits == 3) {
return ByteType.FIRSTBYTE;
} else if (firstTwoBits == 2) {
return ByteType.MIDBYTE;
} else {
return ByteType.SINGLEBYTECHAR;
}
}

/**
* Return the first byte position for a given byte which shared the same code point.
* @param bytePos any byte within the code point
* @return the first byte position of a given code point, throw exception if not a valid UTF8 str
*/
private int firstOfCurrentCodePoint(int bytePos) {
while (bytePos >= 0) {
if (ByteType.FIRSTBYTE == checkByteType(getByte(bytePos))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a tmp variable for the result of checkByteType(getByte(bytePos)), as it will be used later.

|| ByteType.SINGLEBYTECHAR == checkByteType(getByte(bytePos))) {
return bytePos;
}
bytePos--;
}
throw new RuntimeException("Invalid utf8 string");
}

private int indexEnd(int startCodePoint) {
int i = numBytes -1; // position in byte
int c = numChars() - 1; // position in character
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numChars() is actually very expensive. as the indexEnd (or lastIndexOf) will be called multiple times for the substring_index, we need to think about how to optimize that by removing the function calls.

while (i >=0 && c > startCodePoint) {
i = firstOfCurrentCodePoint(i) - 1;
c -= 1;
}
return i;
}

public int lastIndexOf(UTF8String v, int startCodePoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be protected? (for tests)

if (v.numBytes == 0) {
return 0;
}
if (numBytes == 0) {
return -1;
}
int fromIndexEnd = indexEnd(startCodePoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since numChars() will be called within indexEnd, which mean we will iterate all of the current UTF8String, can we reduce the overhead by going through the bytes directly ?

int count = startCodePoint;
int vNumChars = v.numChars();
do {
if (fromIndexEnd - v.numBytes + 1 < 0 ) {
return -1;
}
if (ByteArrayMethods.arrayEquals(
base, offset + fromIndexEnd - v.numBytes + 1, v.base, v.offset, v.numBytes)) {
return count - vNumChars + 1;
}
fromIndexEnd = firstOfCurrentCodePoint(fromIndexEnd) - 1;
count--;
} while (fromIndexEnd >= 0);
return -1;
}

/**
* Returns str, right-padded with pad to a length of len
* For example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ public void indexOf() {
assertEquals(3, fromString("数据砖头").indexOf(fromString("头"), 0));
}

@Test
public void lastIndexOf() {
assertEquals(0, fromString("").lastIndexOf(fromString(""), 0));
assertEquals(-1, fromString("").lastIndexOf(fromString("l"), 0));
assertEquals(0, fromString("hello").lastIndexOf(fromString(""), 0));
assertEquals(-1, fromString("hello").lastIndexOf(fromString("l"), 0));
assertEquals(3, fromString("hello").lastIndexOf(fromString("l"), 3));
assertEquals(-1, fromString("hello").lastIndexOf(fromString("a"), 4));
assertEquals(2, fromString("hello").lastIndexOf(fromString("ll"), 4));
assertEquals(-1, fromString("hello").lastIndexOf(fromString("ll"), 0));
assertEquals(5, fromString("数据砖头数据砖头").lastIndexOf(fromString("据砖"), 7));
assertEquals(0, fromString("数据砖头").lastIndexOf(fromString("数"), 3));
assertEquals(0, fromString("数据砖头").lastIndexOf(fromString("数"), 0));
assertEquals(3, fromString("数据砖头").lastIndexOf(fromString("头"), 3));
}

@Test
public void reverse() {
assertEquals(fromString("olleh"), fromString("hello").reverse());
Expand Down