Skip to content

Conversation

@boneill42
Copy link
Contributor

… bad (non utf8) message through Kinesis.

Brian O'Neill added 2 commits October 29, 2015 14:49
@tdas
Copy link
Contributor

tdas commented Oct 29, 2015

Good catch. Could you add unit tests for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt it better to just change this line to s.decode('utf-8', errors='ignore')

@tdas
Copy link
Contributor

tdas commented Oct 30, 2015

this is ok to test

@tdas
Copy link
Contributor

tdas commented Oct 30, 2015

ok to test

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44649 has finished for PR 9360 at commit 0bbdc85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@boneill42
Copy link
Contributor Author

will do...
Note: The case that triggered this is KPL aggregation. I commented on SPARK-11198, and will explore a solution.

@boneill42
Copy link
Contributor Author

I don't believe the new signature for decode is available in 2.6....

(spark)blu:streaming brianoneill$ python2.6
Python 2.6.9 (unknown, Aug 22 2015, 20:33:41)
>>> s = "foo"
>>> s.decode('utf-8', errors='ignore')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: decode() takes no keyword arguments

(spark)blu:streaming brianoneill$ python2.7
Python 2.7.10 (default, Jul 13 2015, 12:05:58)
>>> s = "foo"
>>> s.decode('utf-8', errors='ignore')
u'foo'

I've left the code as-is, and added a test. Update coming...

@SparkQA
Copy link

SparkQA commented Nov 2, 2015

Test build #44833 has finished for PR 9360 at commit a2e9d18.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n * abstract class CentralMomentAgg(child: Expression) extends ImperativeAggregate with Serializable\n * case class Variance(child: Expression,\n * case class VarianceSamp(child: Expression,\n * case class VariancePop(child: Expression,\n * case class Skewness(child: Expression,\n * case class Kurtosis(child: Expression,\n * case class Kurtosis(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class Skewness(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class Variance(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class VariancePop(child: Expression) extends UnaryExpression with AggregateExpression1\n * case class VarianceSamp(child: Expression) extends UnaryExpression with AggregateExpression1\n

@zsxwing
Copy link
Member

zsxwing commented Nov 4, 2015

I feel throwing an Error for invalid bytes does make sense. If people want to ignore such error, they can use a custom decoder.

@boneill42
Copy link
Contributor Author

I'm inclined to agree as long as there is a way to catch that Exception and continue. I'm not a python wizard, but it appeared as though the process died in worker.py, without giving the job a chance to catch the Exception.

@zsxwing
Copy link
Member

zsxwing commented Nov 4, 2015

I'm inclined to agree as long as there is a way to catch that Exception and continue. I'm not a python wizard, but it appeared as though the process died in worker.py, without giving the job a chance to catch the Exception.

You can use the decoder parameter in the createStream method to set a custom decoder to catch the exception.

@boneill42
Copy link
Contributor Author

I guess I'm just accustomed to explicit exceptions in Java, especially for something that might kill a job.

But perhaps its sufficient to document this and let people know that everyone should implement a custom decoder if they want to protect against bad bytes in a record.

(feel free to close PR)

@zsxwing
Copy link
Member

zsxwing commented Dec 11, 2015

@boneill42 could you close this PR? We don't have permission to close it. It would be better if you can submit another PR to document this method. Thanks a lot!

@andrewor14
Copy link
Contributor

@boneill42 can you close this issue?

@boneill42 boneill42 closed this Dec 15, 2015
@boneill42
Copy link
Contributor Author

Yep, closed.

@obaidcuet
Copy link

obaidcuet commented May 4, 2016

Hi,

Just FYI.
Hope it may be helpful for someone.

I had exactly same issue while reading twitter data from Kafka (collected using flume).
I have no idea where those non-UTF8 data coming from. The should be rejected by flume itself.
However, I do not need those special characters and used below to ignore them:

Create a decoder function with parameter "ignore":

def utf8_decoder_ignore_error(s): """ Decode the unicode as UTF-8 """ if s is None: return None return s.decode('utf-8', "ignore")

Then use that decoder in createDirectStream as below:

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}, keyDecoder=utf8_decoder_ignore_error, valueDecoder=utf8_decoder_ignore_error )

-Obaid

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants