Serializers should be topic aware.
From java reference documentation:
byte[] serialize(java.lang.String topic,
T data)
T deserialize(java.lang.String topic,
byte[] data)
The implementation in Python is missing this information.
KafkaPoducer._serialize is called with topic name, but the topic is not propagated to the serializer.
KafkaConsumer calls Fetcher, whos _deserialize api is missing topic information