Skip to content

Conversation

zentol
Copy link
Contributor

@zentol zentol commented Jun 10, 2014

The StringComparator now works on serialized data.

To this end new string read/write/copy/compare methods were introduced, which use a variable-length encoding for the characters.

key-points:
- The most significant bits are written/read first.
- The first 2 bits of the character are used to encode the size of the character.
- A character is at most 3 Bytes big.

Additionally, the StringSerializer now has full unicode support. ~~i couldn't find a unicode character that uses more than 22 bits, as such 3 Bytes should be sufficient.

@zentol
Copy link
Contributor Author

zentol commented Jun 12, 2014

I reworked the serializer/comparator again. It now uses the first bit of every byte to indicate whether there is at least one more byte coming.

This has the bonus that all letters are serialized as one byte (opposed to the previous version which could only do this for the numbers and a few special characters (which actually made the variable length encoding pointless...))

I currently do a selective shift starting on the flag positions to make space for them. I wonder if there is a more efficient way to do that, here's an example how i do it:

char to send:
0010 0110 1111 1001
1) move the lowest 7 bits into a tmp variable (by doing & with 0000 0000 0111 1111)
0000 0000 0111 1001
2) shift char to the right by 7 positions to omit the lower part
0000 0000 0100 1101
3) shift char to the right by 8 positions (finalizing the shifting of the upper part)
0100 1101 0000 0000
4) char |= tmp
0100 1100 0111 1001
(this would be done resursively for every flag position needed, starting from the right, so up to 3 times)

@StephanEwen
Copy link
Contributor

This is a nice change. Do you have some micro-benchmark numbers on how the methods perform in comparison to the old writeString() methods?

Also, it would be interesting what the benefit of doing binary comparisons is. So, a microbenchmark of sorting the strings with a normalizedKeySorter. And a case where all prefixes are equal.

@StephanEwen
Copy link
Contributor

Also, can you add a few more strings where some of the branches get executed? Like collisions on the high byte of a code point, but differences in the low bytes (and vice versa) ?

This code needs to be absolutely robust, or it will lead to impossible-to-debug situations.

@zentol
Copy link
Contributor Author

zentol commented Jun 13, 2014

the new serialization seems to take longer, but due to faster deserialization in total it breaks even.

some comparison benchmarks:

strings of length 90
1000 repetitions
code for New measurement:

long startTime = System.nanoTime();
cmp = StringValue.compareUnicode(in1, in2);
long endTime = System.nanoTime();

code for Old measurement:

long startTime = System.nanoTime();
cmp = StringValue.readString(in1).compareTo(StringValue.readString(in2));
long endTime = System.nanoTime();

result = SUM(endTime - startTime) / 1000

equality
New 26627
Old 25782

affix (difference in the beginning of the sring)
New 4259
Old 23431

infix (difference in the middle of the string)
New 13560
Old 30757

suffix (difference at the end of the string)
New 29385
Old 28293

not particularly surprising results. no progress on the prefix-issue :/

adding some more tests now.

@StephanEwen
Copy link
Contributor

That is very nice progress! And I think the numbers are good, actually.

I was wondering whether it is possible to modify the encoding such that the comparison loop could still be something like:

byte[] bytes1 = ...;
byte[] bytes2 = ...;

int len = min(bytes1.length, bytes2.length);
int cmp = 0;
for (int pos = 0; pos < len && (cmp = (bytes1[pos] & 0xff) - (bytes2[pos] & 0xff)) == 0; pos++);

That would allow us to push the comparisons into the memory segments (eventually doing something like a memcmp()) and avoid repeatedly grabbing individual bytes.

@zentol
Copy link
Contributor Author

zentol commented Jun 18, 2014

the loop could look like that right now, the encoding scheme wouldn't need to be changed.

if you want it character wise you could have something like this

...
for (int i = 0; i < Math.min(lengthFirst, lengthSecond); i++) {         
    byte[] char1 = readUnicodeCharAsByteArray(firstSource);
    byte[] char2 = readUnicodeCharAsByteArray(secondSource);

    int len = Math.min(char1.length, char2.length);
    int cmp = 0;
    for (int pos = 0; pos < len && (cmp = (char1[pos] - char2[pos])) == 0; pos++);
    if (cmp != 0) {
        return cmp;     
    }
}
...

if you want to read the whole string it would look like this

...
byte[] string1 = readUnicodeStringAsByteArray(firstSource);
byte[] string2 = readUnicodeStringAsByteArray(secondSource)

int len = Math.min(string1.length, string2.length);
int cmp = 0;
for (int pos = 0; pos < len && (cmp = (string1[pos] - string2[pos])) == 0; pos++);
if (cmp != 0) {
    return cmp;     
}
...

the string part can easily be split into several smaller parts.

(The readXAsByteArray methods are already written)

@zentol
Copy link
Contributor Author

zentol commented Jun 18, 2014

waaait...scratch that. let me think a bit more about it...

@zentol
Copy link
Contributor Author

zentol commented Jun 18, 2014

ok i was right after all :)
the readUnicodeCharAsByteArray could look like this

private static byte[] readUnicodeCharAsByteArray(DataInput in) throws IOException {
    byte a;
    if ((a = in.readByte()) >= HIGH_BIT) {
        byte b;
        if ((b = in.readByte()) >= HIGH_BIT) {
            byte c;
            if ((c = in.readByte()) >= HIGH_BIT) {
                byte d = in.readByte();
                return new byte[]{a,b,c,d};
            }
            return new byte[]{0,a,b,c};
        }
        return new byte[]{0,0,a,b};
    }
    return new byte[]{0,0,0,a};
}

and the string one like this

public static final byte[] readUnicodeStringAsByteArray(DataInput in) throws IOException {
    int len = readLength(in);

    ByteArrayOutputStream out = new ByteArrayOutputStream(len);

    if(len==0) {
        return new byte[0];
    }

    for (int i = 0; i < len; i++) {
        out.write(readUnicodeCharAsByteArray(in));
    }
    return out.toByteArray();
}

@zentol
Copy link
Contributor Author

zentol commented Jun 20, 2014

i may have misunderstood your question. if you want to compare the data byte-wise, without having to modify it (like padding with zeroes) or analyze it (like reading them as chars), then the encoding has to be changed, my idea would be using the first 2 bits of every byte to encode the index of that particular byte.

This will reduce the effectiveness of the variable-length encoding (all letters are encoded with 2 bytes), but you could use the very loop you described.

@StephanEwen
Copy link
Contributor

Why would all letters need two bytes? We are currently using an encoding
that wastes only one bit, making it am indicator.

@zentol
Copy link
Contributor Author

zentol commented Jun 20, 2014

yes, currently one bit is used to designate that another byte is coming. (this is also what the current code in this PR does)
this means though, that if you look only at single bytes, (as this loop would do)

for (int pos = 0; pos < len && (cmp = (string1[pos] - string2[pos])) == 0; pos++);

you wouldnt be able to tell whether you look at a 3 or 2 byte long character. if you want to compare on the pure serialized data bytewise, you have to know at which position you are; since characters may end up 3 bytes long you need 2 bits to flag the position, reducing the payload to 6 bits, into which letters dont fit.

@StephanEwen
Copy link
Contributor

To make sure it does not matter whether you are looking at a two-byte char
or a three byte char, you can do the following: If the three byte chars are
always sorted higher than the two byte chars, then we can use an encoding
where the indicating bits for three byte chars are large than those for two
byte chars.

@zentol
Copy link
Contributor Author

zentol commented Jun 20, 2014

i see, so we have more flag bits for the later bytes. i never thought of that.

@zentol
Copy link
Contributor Author

zentol commented Jul 5, 2014

should we postpone this PR until #53 and [FLINK-987] Extend TypeSerializers and -Comparators to work directly on Memory Segments are done?

@StephanEwen
Copy link
Contributor

Yes, I think we should merge the other two first,

Any updates on the comparison routines? To they work strictly on bytes now?

@zentol
Copy link
Contributor Author

zentol commented Jul 7, 2014

yes they do.

rmetzger referenced this pull request in rmetzger/flink Jul 13, 2014
StephanEwen referenced this pull request in StephanEwen/flink Jul 13, 2014
@StephanEwen
Copy link
Contributor

Here is a micro benchmark of String Serialization I did recently. It is extremely simple and performs well. Do you have any feeling how it compares to

The basic trick is not to use the varlength encodings, but represent the binary variant the same way as object variant.

public class StringSortingBenchmark {

    private static final long SEED = 0xa2e2223fc6643bbl;

    private static final int MIN_LEN = 4;
    private static final int MAX_LEN = 213;

    private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
    private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

    public static long datasink = 0; // side effect variable to prevent optimizing loops away

    public static void main(String[] args) {
        // we run comparisons between elements in arrays in the pattern quickstart does is: pointers coming from both ends

        final int NUM = 2000000; // 2 million

        final String[] elements = new String[NUM];

        final byte[] bytes = new byte[Integer.MAX_VALUE / 2];
        final int[] pointers = new int[NUM];

        System.out.println("Creating the strings");
        {
            Random rnd = new Random(SEED);

            int ptr = 0;

            for (int i = 0; i < NUM; i++) {
                String str = getRandomString(rnd);
                elements[i] = str;

                pointers[i] = ptr;
                ptr += serializeString(bytes, ptr, str);
            }
        }

        System.out.println("Verifying serialization and comparison methods");
        {
            int a = 0;
            int b = NUM - 1;

            while (a < NUM) {               
                String deser = deserializeString(bytes, pointers[a]);
                if (!elements[a].equals(deser)) {
                    throw new RuntimeException("Wrong serialization: " + elements[a] + " versus " + deser);
                }

                int cmp1 = elements[a].compareTo(elements[b]);
                int cmp2 = compareDeserializing(bytes, pointers[a], pointers[b]);
                int cmp3 = compareBinaryStrings(bytes, pointers[a], pointers[b]);

                if (Math.signum(cmp1) != Math.signum(cmp2)) {
                    throw new RuntimeException("Wrong comparison result between deserialized and original");
                }

                if (Math.signum(cmp1) != Math.signum(cmp3)) {
                    throw new RuntimeException("Wrong comparison result between binary and original");
                }

                a++;
                b--;
            }
        }

        long objectsTime;
        long deserializingTime;
        long binaryTime;

        System.out.println("Running experiments with string objects.");
        {
            int a = 0;
            int b = NUM - 1;

            long acc = 0;  // use this so no compiler optimizes the loop away
            long start = System.nanoTime();

            while (a < NUM) {
                long cmp = elements[a++].compareTo(elements[b--]);
                acc += cmp;
            }

            objectsTime = System.nanoTime() - start;
            datasink += acc;
        }

        System.out.println("Running experiments deserializing string objects.");
        {
            int a = 0;
            int b = NUM - 1;

            long acc = 0;  // use this so no compiler optimizes the loop away
            long start = System.nanoTime();

            while (a < NUM) {
                long cmp = compareDeserializing(bytes, pointers[a++], pointers[b--]);
                acc += cmp;
            }

            deserializingTime = System.nanoTime() - start;
            datasink += acc;
        }

        System.out.println("Running experiments with binary comparisons.");
        {
            int a = 0;
            int b = NUM - 1;

            long acc = 0;  // use this so no compiler optimizes the loop away
            long start = System.nanoTime();

            while (a < NUM) {
                long cmp = compareBinaryStrings(bytes, pointers[a++], pointers[b--]);
                acc += cmp;
            }

            binaryTime = System.nanoTime() - start; 
            datasink += acc;
        }

        System.out.println(String.format("Time taken\n   - String objects: %d µsecs\n   - String deserialization: %d µsecs\n   - Binary comparison: %d µsecs", objectsTime / 1000, deserializingTime / 1000, binaryTime / 1000));
    }

    private static String getRandomString(Random rnd) {
        final int len = rnd.nextInt(MAX_LEN - MIN_LEN + 1) + MIN_LEN;
        final StringBuilder bld = new StringBuilder(len);

        for (int i = 0; i < len; i++) {
            bld.append((char) (rnd.nextInt(100) + 33));
        }   
        return bld.toString();
    }

    private static final int serializeString(byte[] target, int offset, String str) {
        final int len = str.length();
        long pos = offset + BASE_OFFSET;

        UNSAFE.putInt(target, pos, len);
        pos += 4;

        for (int i = 0; i < len; i++, pos += 2) {
            char towrite = Character.reverseBytes(str.charAt(i));
            UNSAFE.putChar(target, pos, towrite);
        }

        return 4 + 2*len;
    }

    private static final String deserializeString(byte[] data, int offset) {
        long pos = offset + BASE_OFFSET;
        final int len = UNSAFE.getInt(data, pos);
        pos += 4;

        StringBuilder bld = new StringBuilder(len);

        for (int i = 0; i < len; i++, pos += 2) {
            char read = UNSAFE.getChar(data, pos);
            read = Character.reverseBytes(read);
            bld.append(read);
        }

        return bld.toString();
    }

    private static final int compareBinaryStrings(byte[] data, int off1, int off2) {    
        int len1 = UNSAFE.getInt(data, off1 + BASE_OFFSET);
        int len2 = UNSAFE.getInt(data, off2 + BASE_OFFSET);
        off1 += 4;
        off2 += 4;

        final int binCompLen = 2 * Math.min(len1, len2);
        int val = 0;

        // binary comparison
        for (int pos = 0; pos < binCompLen && (val = (data[off1 + pos] & 0xff) - (data[off2 + pos] & 0xff)) == 0; pos++);
        return val != 0 ? val : len1 - len2;
    }

    private static final int compareDeserializing(byte[] data, int off1, int off2) {
        String str1 = deserializeString(data, off1);
        String str2 = deserializeString(data, off2);
        return str1.compareTo(str2);
    }
}

@zentol
Copy link
Contributor Author

zentol commented Sep 3, 2014

i integrated my code (with some modifications, like the use of stringbuilder when deserializing) into yours and got these results:

my code:
Time taken

  • String objects: 65559 µsecs
  • String deserialization: 1498175 µsecs
  • Binary comparison: 36816 µsecs

your code:
Time taken

  • String objects: 65650 µsecs
  • String deserialization: 1243583 µsecs
  • Binary comparison: 71670 µsecs

@StephanEwen
Copy link
Contributor

Wow, does that mean that your binary comparison code is faster than the regular String.compareTo(anotherString) ?

Or did I misinterpret the experiment setup?

@zentol
Copy link
Contributor Author

zentol commented Sep 3, 2014

yup, it appears to be faster than compareTo.

mxm added a commit to mxm/flink that referenced this pull request Feb 2, 2015
mxm added a commit to mxm/flink that referenced this pull request Feb 3, 2015
kerberos apache#2

kerberos apache#3

kerberos apache#3

kerberos apache#4

kerberos apache#5

bla

last test

blub

bla

blub

jo

ll

afaf

haha

null

jo

just testing

final?

blub

uiui

haha

jojojo
@fhueske
Copy link
Contributor

fhueske commented Aug 5, 2015

Hi @zentol, @StephanEwen
What's the status of this PR? Is it still relevant? What is missing to merge it?

@StephanEwen
Copy link
Contributor

I am not sure sure about this pull request. The Java Strings already support unicode (interpreting multiple chars as one) and we simply write and read the chars.

I still like serialized string comparison, but that should be in sync with the Java String comparison, which is actually on the char[] model, not on the unicode code point interpretation. Even though both may end up with the same collation, I feat this code makes Flink code deal with unicode code points, without a need to do so.

@zentol
Copy link
Contributor Author

zentol commented Aug 5, 2015

oh well now this has been a while, let's see...

from what i can tell the comparison doesn't work on code points but compares on single bytes, so it should be equivalent to the Java String Comparison. it depends on the serialization code though, so it could not be merged without modifications.

I can only guess why i decided to use codePoints during serialization, i assume it had to with the length of the string. Since some Unicode chars are represented as 2 chars internally just counting the chars written would lead to a wrong result. Here we are also dealing with a CharSequence which doesn't hide the unicode aspect like a String does.

@zentol
Copy link
Contributor Author

zentol commented Aug 27, 2015

I'll close this one; It couldn't be merged in the current state anyway. We can revisit the issue at a later time if need be.

@zentol zentol closed this Aug 27, 2015
smarthi pushed a commit to smarthi/flink that referenced this pull request Jul 29, 2016
VenturaDelMonte added a commit to VenturaDelMonte/flink that referenced this pull request Sep 29, 2016
VenturaDelMonte added a commit to VenturaDelMonte/flink that referenced this pull request Sep 30, 2016
VenturaDelMonte added a commit to VenturaDelMonte/flink that referenced this pull request Oct 5, 2016
VenturaDelMonte added a commit to VenturaDelMonte/flink that referenced this pull request Nov 4, 2016
aditiverma pushed a commit to aditiverma/flink that referenced this pull request May 28, 2018
tillrohrmann referenced this pull request in tillrohrmann/flink Sep 27, 2018
Migrated them to #4, #5, and #6.
benbiti pushed a commit to benbiti/flink that referenced this pull request Nov 15, 2018
senorcarbone added a commit to senorcarbone/flink that referenced this pull request Aug 15, 2019
chaojianok added a commit to chaojianok/flink that referenced this pull request Jun 30, 2020
Merge from apache/flink master
YuriGusev referenced this pull request in YuriGusev/flink Sep 24, 2021
@zentol zentol deleted the string-serialization-comparator branch April 28, 2022 10:56
chengkaiyang2025 added a commit to chengkaiyang2025/flink-25705 that referenced this pull request May 2, 2022
parent 8d00622
author Chengkai Yang <yangchengkai2022@163.com> 1650023265 +0800
committer Chengkai Yang <yangchengkai2022@163.com> 1651508979 +0800

# This is a combination of 3 commits.
# This is the 1st commit message:

[FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment" in to Chinese.

# This is the commit message apache#2:

[FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment" in to Chinese.

# This is the commit message apache#3:

[FLINK-25705][docs]Translate "Metric Reporters" page of "Deployment" in to Chinese.

# This is the commit message apache#4:

[FLINK-25705][docs]Translate some description in the table of html file.
JackWangCS pushed a commit to JackWangCS/flink that referenced this pull request Jun 20, 2022
Bug: The keytab is not shipped to Yarn when submitting jobs.
In order to fix this, we add the parsed security options into globalConfiguration and when loading configuration, these globalConfiguration will also be included.
wsry pushed a commit to wsry/flink that referenced this pull request Aug 5, 2022
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Sep 20, 2022
cbornet added a commit to cbornet/flink that referenced this pull request Jun 6, 2023
…onsuming messages in PulsarSinkITCase. (apache#21252) (apache#4)

Co-authored-by: Yufan Sheng <yufan@streamnative.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants