Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read timeout for getHash function #98

Open
beyhangl opened this issue Oct 11, 2018 · 9 comments
Open

Read timeout for getHash function #98

beyhangl opened this issue Oct 11, 2018 · 9 comments

Comments

@beyhangl
Copy link

Hello,

While i'm reading table from redis getting this below error.

Below code normally working well.

val readDF= spark.sparkContext.fromRedisKeyPattern(tableName,5).getHash().toDS()

Normally it's working for less than 2 million rows. But if i'm reading big table getting this error.

18/10/11 17:08:25 ERROR Executor: Exception in task 37.0 in stage 3.0
(TID 338) redis.clients.jedis.exceptions.JedisConnectionException:
java.net.SocketTimeoutException: Read timed out at
redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202)
at
redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)

val redis =
spark.sparkContext.fromRedisKeyPattern(tableName,100).getHash().toDS()

I also changed some settings on redis but i think it's not about that.
Do you know how can i solve this problem ?

@fe2s
Copy link
Collaborator

fe2s commented Oct 12, 2018

Hello,

Could you please provide some details on your setup - how many Spark executors and Redis nodes you use.

@beyhangl
Copy link
Author

Hello

Redis installed only 1 node and spark working like standalone.

Is it because of that ? 2 million rows comes back in seconds but 7 million rows gives me this error. It can be memory size of redis driver?

@itamarhaber
Copy link
Member

@fe2s
Copy link
Collaborator

fe2s commented Oct 15, 2018

val redis =
spark.sparkContext.fromRedisKeyPattern(tableName,100).getHash().toDS()

I don't think that 100 partitions makes sense if you are running a single Spark executor. Reading keys doesn't scale, see this link for explanation #86 (comment)
Try to change to 1 partition:
spark.sparkContext.fromRedisKeyPattern(tableName,1).getHash()

I inserted 9M hashes to local Redis with default settings.
Redis memory: used_memory_human:1.21G
Running Spark in a local mode with -Xmx8G. Reading hashes took 410 sec for me.

val rdd = spark.sparkContext.fromRedisKeyPattern("person:*", 1).getHash()
println(rdd.count())

@fe2s
Copy link
Collaborator

fe2s commented Oct 15, 2018

@beyhangl, could you please also include a full stacktrace of the exception?

@beyhangl
Copy link
Author

@fe2s Hello, here is the full log

`/usr/lib/jvm/java-8-oracle/bin/java -javaagent:/home/kahin/Downloads/idea-IC-182.3684.101/lib/idea_rt.jar=40953:/home/kahin/Downloads/idea-IC-182.3684.101/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-8-oracle/jre/lib/charsets.jar:/usr/lib/jvm/java-8-oracle/jre/lib/deploy.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jfxrt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-8-oracle/jre/lib/javaws.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jce.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfr.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfxswt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jsse.jar:/usr/lib/jvm/java-8-oracle/jre/lib/management-agent.jar:/usr/lib/jvm/java-8-oracle/jre/lib/plugin.jar:/usr/lib/jvm/java-8-oracle/jre/lib/resources.jar:/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar:/home/kahin/Downloads/SparkStKafka/target/scala-2.11/classes:/home/kahin/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.8.jar:/home/kahin/.ivy2/cache/redis.clients/jedis/jars/jedis-2.9.0.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-pool2/jars/commons-pool2-2.5.0.jar:/home/kahin/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.16.jar:/home/kahin/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.25.jar:/home/kahin/.ivy2/cache/org.lz4/lz4-java/jars/lz4-java-1.4.0.jar:/home/kahin/.ivy2/cache/org.codehaus.janino/janino/jars/janino-3.0.8.jar:/home/kahin/.ivy2/cache/org.codehaus.janino/commons-compiler/jars/commons-compiler-3.0.8.jar:/home/kahin/.ivy2/cache/org.bouncycastle/bcprov-jdk15on/jars/bcprov-jdk15on-1.52.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-unsafe_2.11/jars/spark-unsafe_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-tags_2.11/jars/spark-tags_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-streaming_2.11/jars/spark-streaming_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-sql_2.11/jars/spark-sql_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-sketch_2.11/jars/spark-sketch_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-network-shuffle_2.11/jars/spark-network-shuffle_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-network-common_2.11/jars/spark-network-common_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-launcher_2.11/jars/spark-launcher_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-kvstore_2.11/jars/spark-kvstore_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-core_2.11/jars/spark-core_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-catalyst_2.11/jars/spark-catalyst_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-jackson/jars/parquet-jackson-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-hadoop/jars/parquet-hadoop-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-encoding/jars/parquet-encoding-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-common/jars/parquet-common-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-column/jars/parquet-column-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.orc/orc-mapreduce/jars/orc-mapreduce-1.4.4-nohive.jar:/home/kahin/.ivy2/cache/org.apache.orc/orc-core/jars/orc-core-1.4.4-nohive.jar:/home/kahin/.ivy2/cache/org.apache.httpcomponents/httpcore/jars/httpcore-4.4.1.jar:/home/kahin/.ivy2/cache/org.apache.httpcomponents/httpclient/jars/httpclient-4.5.jar:/home/kahin/.ivy2/cache/org.apache.arrow/arrow-vector/jars/arrow-vector-0.8.0.jar:/home/kahin/.ivy2/cache/org.apache.arrow/arrow-memory/jars/arrow-memory-0.8.0.jar:/home/kahin/.ivy2/cache/org.apache.arrow/arrow-format/jars/arrow-format-0.8.0.jar:/home/kahin/.ivy2/cache/org.antlr/antlr4-runtime/jars/antlr4-runtime-4.7.jar:/home/kahin/.ivy2/cache/net.sf.py4j/py4j/jars/py4j-0.10.7.jar:/home/kahin/.ivy2/cache/net.java.dev.jets3t/jets3t/jars/jets3t-0.9.4.jar:/home/kahin/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.9.9.jar:/home/kahin/.ivy2/cache/io.netty/netty-all/jars/netty-all-4.1.17.Final.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-jvm/bundles/metrics-jvm-3.1.5.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-json/bundles/metrics-json-3.1.5.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-graphite/bundles/metrics-graphite-3.1.5.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-core/bundles/metrics-core-3.1.5.jar:/home/kahin/.ivy2/cache/io.airlift/aircompressor/jars/aircompressor-0.8.jar:/home/kahin/.ivy2/cache/commons-codec/commons-codec/jars/commons-codec-1.11.jar:/home/kahin/.ivy2/cache/com.vlkan/flatbuffers/jars/flatbuffers-1.2.0-3f79e055.jar:/home/kahin/.ivy2/cache/com.univocity/univocity-parsers/jars/univocity-parsers-2.5.9.jar:/home/kahin/.ivy2/cache/com.typesafe.scala-logging/scala-logging_2.11/bundles/scala-logging_2.11-3.8.0.jar:/home/kahin/.ivy2/cache/com.twitter/chill_2.11/jars/chill_2.11-0.8.4.jar:/home/kahin/.ivy2/cache/com.twitter/chill-java/jars/chill-java-0.8.4.jar:/home/kahin/.ivy2/cache/com.thoughtworks.paranamer/paranamer/bundles/paranamer-2.8.jar:/home/kahin/.ivy2/cache/com.jamesmurty.utils/java-xmlbuilder/jars/java-xmlbuilder-1.1.jar:/home/kahin/.ivy2/cache/com.google.code.findbugs/jsr305/jars/jsr305-3.0.2.jar:/home/kahin/.ivy2/cache/com.github.luben/zstd-jni/bundles/zstd-jni-1.3.2-2.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.module/jackson-module-scala_2.11/bundles/jackson-module-scala_2.11-2.6.7.1.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.module/jackson-module-paranamer/bundles/jackson-module-paranamer-2.7.9.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.core/jackson-databind/bundles/jackson-databind-2.6.7.1.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.core/jackson-core/bundles/jackson-core-2.7.9.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/bundles/jackson-annotations-2.6.7.jar:/home/kahin/.ivy2/cache/com.carrotsearch/hppc/bundles/hppc-0.7.2.jar:/home/kahin/.ivy2/cache/aopalliance/aopalliance/jars/aopalliance-1.0.jar:/home/kahin/.ivy2/cache/xmlenc/xmlenc/jars/xmlenc-0.52.jar:/home/kahin/.ivy2/cache/oro/oro/jars/oro-2.0.8.jar:/home/kahin/.ivy2/cache/org.xerial.snappy/snappy-java/bundles/snappy-java-1.1.2.6.jar:/home/kahin/.ivy2/cache/org.tukaani/xz/jars/xz-1.0.jar:/home/kahin/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:/home/kahin/.ivy2/cache/org.sonatype.sisu.inject/cglib/jars/cglib-2.2.1-v20090111.jar:/home/kahin/.ivy2/cache/org.slf4j/jul-to-slf4j/jars/jul-to-slf4j-1.7.16.jar:/home/kahin/.ivy2/cache/org.slf4j/jcl-over-slf4j/jars/jcl-over-slf4j-1.7.16.jar:/home/kahin/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.4.jar:/home/kahin/.ivy2/cache/org.scala-lang.modules/scala-parser-combinators_2.11/bundles/scala-parser-combinators_2.11-1.0.4.jar:/home/kahin/.ivy2/cache/org.scala-lang/scalap/jars/scalap-2.11.8.jar:/home/kahin/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.8.jar:/home/kahin/.ivy2/cache/org.scala-lang/scala-compiler/jars/scala-compiler-2.11.8.jar:/home/kahin/.ivy2/cache/org.roaringbitmap/RoaringBitmap/bundles/RoaringBitmap-0.5.11.jar:/home/kahin/.ivy2/cache/org.objenesis/objenesis/jars/objenesis-2.1.jar:/home/kahin/.ivy2/cache/org.mortbay.jetty/jetty-util/jars/jetty-util-6.1.26.jar:/home/kahin/.ivy2/cache/org.json4s/json4s-jackson_2.11/jars/json4s-jackson_2.11-3.2.11.jar:/home/kahin/.ivy2/cache/org.json4s/json4s-core_2.11/jars/json4s-core_2.11-3.2.11.jar:/home/kahin/.ivy2/cache/org.json4s/json4s-ast_2.11/jars/json4s-ast_2.11-3.2.11.jar:/home/kahin/.ivy2/cache/org.javassist/javassist/bundles/javassist-3.18.1-GA.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.media/jersey-media-jaxb/jars/jersey-media-jaxb-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.core/jersey-server/jars/jersey-server-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.core/jersey-common/jars/jersey-common-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.core/jersey-client/jars/jersey-client-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.containers/jersey-container-servlet-core/jars/jersey-container-servlet-core-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.containers/jersey-container-servlet/jars/jersey-container-servlet-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.bundles.repackaged/jersey-guava/bundles/jersey-guava-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2.external/javax.inject/jars/javax.inject-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2.external/aopalliance-repackaged/jars/aopalliance-repackaged-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/osgi-resource-locator/jars/osgi-resource-locator-1.0.1.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/hk2-utils/jars/hk2-utils-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/hk2-locator/jars/hk2-locator-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/hk2-api/jars/hk2-api-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.fusesource.leveldbjni/leveldbjni-all/bundles/leveldbjni-all-1.8.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-mapper-asl/jars/jackson-mapper-asl-1.9.13.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-core-asl/jars/jackson-core-asl-1.9.13.jar:/home/kahin/.ivy2/cache/org.apache.xbean/xbean-asm5-shaded/bundles/xbean-asm5-shaded-4.4.jar:/home/kahin/.ivy2/cache/org.apache.ivy/ivy/jars/ivy-2.4.0.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-math3/jars/commons-math3-3.4.1.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-compress/jars/commons-compress-1.4.1.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro-mapred/jars/avro-mapred-1.7.7-hadoop2.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro-ipc/jars/avro-ipc-1.7.7-tests.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro-ipc/jars/avro-ipc-1.7.7.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro/jars/avro-1.7.7.jar:/home/kahin/.ivy2/cache/net.razorvine/pyrolite/jars/pyrolite-4.13.jar:/home/kahin/.ivy2/cache/log4j/log4j/bundles/log4j-1.2.17.jar:/home/kahin/.ivy2/cache/javax.ws.rs/javax.ws.rs-api/jars/javax.ws.rs-api-2.0.1.jar:/home/kahin/.ivy2/cache/javax.validation/validation-api/jars/validation-api-1.1.0.Final.jar:/home/kahin/.ivy2/cache/javax.servlet/javax.servlet-api/jars/javax.servlet-api-3.1.0.jar:/home/kahin/.ivy2/cache/javax.inject/javax.inject/jars/javax.inject-1.jar:/home/kahin/.ivy2/cache/javax.annotation/javax.annotation-api/jars/javax.annotation-api-1.2.jar:/home/kahin/.ivy2/cache/commons-net/commons-net/jars/commons-net-2.2.jar:/home/kahin/.ivy2/cache/commons-httpclient/commons-httpclient/jars/commons-httpclient-3.1.jar:/home/kahin/.ivy2/cache/commons-digester/commons-digester/jars/commons-digester-1.8.jar:/home/kahin/.ivy2/cache/commons-configuration/commons-configuration/jars/commons-configuration-1.6.jar:/home/kahin/.ivy2/cache/commons-cli/commons-cli/jars/commons-cli-1.2.jar:/home/kahin/.ivy2/cache/commons-beanutils/commons-beanutils-core/jars/commons-beanutils-core-1.8.0.jar:/home/kahin/.ivy2/cache/commons-beanutils/commons-beanutils/jars/commons-beanutils-1.7.0.jar:/home/kahin/.ivy2/cache/com.ning/compress-lzf/bundles/compress-lzf-1.0.3.jar:/home/kahin/.ivy2/cache/com.google.protobuf/protobuf-java/bundles/protobuf-java-2.5.0.jar:/home/kahin/.ivy2/cache/com.google.inject/guice/jars/guice-3.0.jar:/home/kahin/.ivy2/cache/com.esotericsoftware/minlog/bundles/minlog-1.3.0.jar:/home/kahin/.ivy2/cache/com.esotericsoftware/kryo-shaded/bundles/kryo-shaded-3.0.3.jar:/home/kahin/.ivy2/cache/com.clearspring.analytics/stream/jars/stream-2.7.0.jar:/home/kahin/.ivy2/cache/com.google.code.gson/gson/jars/gson-2.2.4.jar:/home/kahin/.ivy2/cache/com.google.guava/guava/jars/guava-11.0.2.jar:/home/kahin/.ivy2/cache/commons-collections/commons-collections/jars/commons-collections-3.2.2.jar:/home/kahin/.ivy2/cache/commons-io/commons-io/jars/commons-io-2.4.jar:/home/kahin/.ivy2/cache/commons-lang/commons-lang/jars/commons-lang-2.6.jar:/home/kahin/.ivy2/cache/io.netty/netty/bundles/netty-3.9.9.Final.jar:/home/kahin/.ivy2/cache/javax.activation/activation/jars/activation-1.1.1.jar:/home/kahin/.ivy2/cache/javax.xml.bind/jaxb-api/jars/jaxb-api-2.2.2.jar:/home/kahin/.ivy2/cache/javax.xml.stream/stax-api/jars/stax-api-1.0-2.jar:/home/kahin/.ivy2/cache/jline/jline/jars/jline-0.9.94.jar:/home/kahin/.ivy2/cache/net.iharder/base64/jars/base64-2.3.8.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-crypto/jars/commons-crypto-1.0.0.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.5.jar:/home/kahin/.ivy2/cache/org.apache.curator/curator-client/bundles/curator-client-2.6.0.jar:/home/kahin/.ivy2/cache/org.apache.curator/curator-framework/bundles/curator-framework-2.6.0.jar:/home/kahin/.ivy2/cache/org.apache.curator/curator-recipes/bundles/curator-recipes-2.6.0.jar:/home/kahin/.ivy2/cache/org.apache.directory.api/api-asn1-api/bundles/api-asn1-api-1.0.0-M20.jar:/home/kahin/.ivy2/cache/org.apache.directory.api/api-util/bundles/api-util-1.0.0-M20.jar:/home/kahin/.ivy2/cache/org.apache.directory.server/apacheds-i18n/bundles/apacheds-i18n-2.0.0-M15.jar:/home/kahin/.ivy2/cache/org.apache.directory.server/apacheds-kerberos-codec/bundles/apacheds-kerberos-codec-2.0.0-M15.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-annotations/jars/hadoop-annotations-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-auth/jars/hadoop-auth-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-client/jars/hadoop-client-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-common/jars/hadoop-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-hdfs/jars/hadoop-hdfs-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-app/jars/hadoop-mapreduce-client-app-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-common/jars/hadoop-mapreduce-client-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-core/jars/hadoop-mapreduce-client-core-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-jobclient/jars/hadoop-mapreduce-client-jobclient-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-shuffle/jars/hadoop-mapreduce-client-shuffle-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-api/jars/hadoop-yarn-api-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-client/jars/hadoop-yarn-client-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-common/jars/hadoop-yarn-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-server-common/jars/hadoop-yarn-server-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.zookeeper/zookeeper/jars/zookeeper-3.4.6.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-jaxrs/jars/jackson-jaxrs-1.9.13.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-xc/jars/jackson-xc-1.9.13.jar:/home/kahin/.ivy2/cache/org.codehaus.jettison/jettison/bundles/jettison-1.1.jar:/home/kahin/.ivy2/cache/org.htrace/htrace-core/jars/htrace-core-3.0.4.jar:/home/kahin/.ivy2/cache/xerces/xercesImpl/jars/xercesImpl-2.9.1.jar:/home/kahin/.ivy2/cache/xml-apis/xml-apis/jars/xml-apis-1.3.04.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-format/jars/parquet-format-2.3.1.jar:/home/kahin/.ivy2/cache/RedisLabs/spark-redis/jars/spark-redis-0.3.2.jar:/home/kahin/.ivy2/cache/net.jpountz.lz4/lz4/jars/lz4-1.3.0.jar:/home/kahin/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-sql-kafka-0-10_2.11/jars/spark-sql-kafka-0-10_2.11-2.3.0.jar:/home/kahin/Downloads/SparkStKafka/ojdbc6.jar MainStreaming
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/10/15 17:29:27 WARN Utils: Your hostname, kahin resolves to a loopback address: 127.0.1.1; using 192.168.34.216 instead (on interface enp33s0)
18/10/15 17:29:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
18/10/15 17:29:27 INFO SparkContext: Running Spark version 2.3.1
18/10/15 17:29:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/10/15 17:29:28 INFO SparkContext: Submitted application: MainStreaming
18/10/15 17:29:28 INFO SecurityManager: Changing view acls to: kahin
18/10/15 17:29:28 INFO SecurityManager: Changing modify acls to: kahin
18/10/15 17:29:28 INFO SecurityManager: Changing view acls groups to:
18/10/15 17:29:28 INFO SecurityManager: Changing modify acls groups to:
18/10/15 17:29:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(kahin); groups with view permissions: Set(); users with modify permissions: Set(kahin); groups with modify permissions: Set()
18/10/15 17:29:28 INFO Utils: Successfully started service 'sparkDriver' on port 37343.
18/10/15 17:29:28 INFO SparkEnv: Registering MapOutputTracker
18/10/15 17:29:28 INFO SparkEnv: Registering BlockManagerMaster
18/10/15 17:29:28 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/10/15 17:29:28 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/10/15 17:29:28 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-ff18929a-4ff2-40db-894c-86f937840dca
18/10/15 17:29:28 INFO MemoryStore: MemoryStore started with capacity 1962.0 MB
18/10/15 17:29:28 INFO SparkEnv: Registering OutputCommitCoordinator
18/10/15 17:29:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
18/10/15 17:29:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
18/10/15 17:29:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
18/10/15 17:29:28 INFO Utils: Successfully started service 'SparkUI' on port 4043.
18/10/15 17:29:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.34.216:4043
18/10/15 17:29:29 INFO Executor: Starting executor ID driver on host localhost
18/10/15 17:29:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40639.
18/10/15 17:29:29 INFO NettyBlockTransferService: Server created on 192.168.34.216:40639
18/10/15 17:29:29 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/10/15 17:29:29 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.34.216, 40639, None)
18/10/15 17:29:29 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.34.216:40639 with 1962.0 MB RAM, BlockManagerId(driver, 192.168.34.216, 40639, None)
18/10/15 17:29:29 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.34.216, 40639, None)
18/10/15 17:29:29 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.34.216, 40639, None)
18/10/15 17:29:29 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/kahin/Downloads/SparkStKafka/spark-warehouse/').
18/10/15 17:29:29 INFO SharedState: Warehouse path is 'file:/home/kahin/Downloads/SparkStKafka/spark-warehouse/'.
18/10/15 17:29:29 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
18/10/15 17:29:30 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [192.168.34.216:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-1f8c3bd1-53fd-4eca-bcde-6179f50082cb-111795803-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

18/10/15 17:29:30 INFO ConsumerConfig: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [192.168.34.216:9092]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = consumer-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 1
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = spark-kafka-source-1f8c3bd1-53fd-4eca-bcde-6179f50082cb-111795803-driver-0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

18/10/15 17:29:30 INFO AppInfoParser: Kafka version : 0.10.0.1
18/10/15 17:29:30 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
Replace view ==campaignNoneWLView
18/10/15 17:29:32 INFO SparkContext: Starting job: count at MainStreaming.scala:43
18/10/15 17:29:32 INFO DAGScheduler: Got job 0 (count at MainStreaming.scala:43) with 1 output partitions
18/10/15 17:29:32 INFO DAGScheduler: Final stage: ResultStage 0 (count at MainStreaming.scala:43)
18/10/15 17:29:32 INFO DAGScheduler: Parents of final stage: List()
18/10/15 17:29:32 INFO DAGScheduler: Missing parents: List()
18/10/15 17:29:32 INFO DAGScheduler: Submitting ResultStage 0 (RedisKVRDD[1] at RDD at RedisRDD.scala:19), which has no missing parents
18/10/15 17:29:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 1962.0 MB)
18/10/15 17:29:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1438.0 B, free 1962.0 MB)
18/10/15 17:29:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.34.216:40639 (size: 1438.0 B, free: 1962.0 MB)
18/10/15 17:29:32 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039
18/10/15 17:29:32 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (RedisKVRDD[1] at RDD at RedisRDD.scala:19) (first 15 tasks are for partitions Vector(0))
18/10/15 17:29:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/10/15 17:29:32 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 8460 bytes)
18/10/15 17:29:32 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/10/15 17:29:32 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 832 bytes result sent to driver
18/10/15 17:29:32 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 257 ms on localhost (executor driver) (1/1)
18/10/15 17:29:32 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/10/15 17:29:32 INFO DAGScheduler: ResultStage 0 (count at MainStreaming.scala:43) finished in 0.397 s
18/10/15 17:29:32 INFO DAGScheduler: Job 0 finished: count at MainStreaming.scala:43, took 0.446641 s
018/10/15 17:29:33 INFO CodeGenerator: Code generated in 255.077498 ms
18/10/15 17:29:33 INFO MapPartitionsRDD: Removing RDD 6 from persistence list
18/10/15 17:29:33 INFO BlockManager: Removing RDD 6
Replace view ==campaignWLView
18/10/15 17:29:33 INFO SparkContext: Starting job: count at MainStreaming.scala:43
18/10/15 17:29:33 INFO DAGScheduler: Got job 1 (count at MainStreaming.scala:43) with 1 output partitions
18/10/15 17:29:33 INFO DAGScheduler: Final stage: ResultStage 1 (count at MainStreaming.scala:43)
18/10/15 17:29:33 INFO DAGScheduler: Parents of final stage: List()
18/10/15 17:29:33 INFO DAGScheduler: Missing parents: List()
18/10/15 17:29:33 INFO DAGScheduler: Submitting ResultStage 1 (RedisKVRDD[11] at RDD at RedisRDD.scala:19), which has no missing parents
18/10/15 17:29:33 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.1 KB, free 1962.0 MB)
18/10/15 17:29:33 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1436.0 B, free 1962.0 MB)
18/10/15 17:29:33 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.34.216:40639 (size: 1436.0 B, free: 1962.0 MB)
18/10/15 17:29:33 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
18/10/15 17:29:33 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (RedisKVRDD[11] at RDD at RedisRDD.scala:19) (first 15 tasks are for partitions Vector(0))
18/10/15 17:29:33 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
18/10/15 17:29:33 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 8460 bytes)
18/10/15 17:29:33 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
18/10/15 17:29:33 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 746 bytes result sent to driver
18/10/15 17:29:33 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 9 ms on localhost (executor driver) (1/1)
18/10/15 17:29:33 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
18/10/15 17:29:33 INFO DAGScheduler: ResultStage 1 (count at MainStreaming.scala:43) finished in 0.019 s
18/10/15 17:29:33 INFO DAGScheduler: Job 1 finished: count at MainStreaming.scala:43, took 0.023213 s
018/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 38
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 43
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 40
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 41
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 42
18/10/15 17:29:34 INFO BlockManager: Removing RDD 6
18/10/15 17:29:34 INFO ContextCleaner: Cleaned RDD 6
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 46
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 54
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 52
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 33
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 49
18/10/15 17:29:34 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.34.216:40639 in memory (size: 1436.0 B, free: 1962.0 MB)
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 48
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 31
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 37
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 34
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 36
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 25
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 53
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 47
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 32
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 55
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 26
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 27
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 51
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 35
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 45
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 39
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 44
18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 50
18/10/15 17:29:34 INFO MapPartitionsRDD: Removing RDD 16 from persistence list
18/10/15 17:29:34 INFO BlockManager: Removing RDD 16
Replace view ==customerView
18/10/15 17:29:34 INFO SparkContext: Starting job: count at MainStreaming.scala:43
18/10/15 17:29:34 INFO DAGScheduler: Got job 2 (count at MainStreaming.scala:43) with 1 output partitions
18/10/15 17:29:34 INFO DAGScheduler: Final stage: ResultStage 2 (count at MainStreaming.scala:43)
18/10/15 17:29:34 INFO DAGScheduler: Parents of final stage: List()
18/10/15 17:29:34 INFO DAGScheduler: Missing parents: List()
18/10/15 17:29:34 INFO DAGScheduler: Submitting ResultStage 2 (RedisKVRDD[21] at RDD at RedisRDD.scala:19), which has no missing parents
18/10/15 17:29:34 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.1 KB, free 1962.0 MB)
18/10/15 17:29:34 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1441.0 B, free 1962.0 MB)
18/10/15 17:29:34 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.34.216:40639 (size: 1441.0 B, free: 1962.0 MB)
18/10/15 17:29:34 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039
18/10/15 17:29:34 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (RedisKVRDD[21] at RDD at RedisRDD.scala:19) (first 15 tasks are for partitions Vector(0))
18/10/15 17:29:34 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/10/15 17:29:34 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, ANY, 8460 bytes)
18/10/15 17:29:34 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
18/10/15 17:29:36 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202)
at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
at redis.clients.jedis.Protocol.process(Protocol.java:151)
at redis.clients.jedis.Protocol.read(Protocol.java:215)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276)
at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55)
at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:127)
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196)
... 32 more
18/10/15 17:29:36 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202)
at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
at redis.clients.jedis.Protocol.process(Protocol.java:151)
at redis.clients.jedis.Protocol.read(Protocol.java:215)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276)
at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55)
at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:127)
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196)
... 32 more

18/10/15 17:29:36 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
18/10/15 17:29:36 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
18/10/15 17:29:36 INFO TaskSchedulerImpl: Cancelling stage 2
18/10/15 17:29:36 INFO DAGScheduler: ResultStage 2 (count at MainStreaming.scala:43) failed in 2.048 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202)
at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
at redis.clients.jedis.Protocol.process(Protocol.java:151)
at redis.clients.jedis.Protocol.read(Protocol.java:215)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276)
at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55)
at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:127)
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196)
... 32 more

Driver stacktrace:
18/10/15 17:29:36 INFO DAGScheduler: Job 2 failed: count at MainStreaming.scala:43, took 2.052060 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202)
at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
at redis.clients.jedis.Protocol.process(Protocol.java:151)
at redis.clients.jedis.Protocol.read(Protocol.java:215)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276)
at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55)
at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:127)
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196)
... 32 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at MainStreaming$.createReplaceRedisView(MainStreaming.scala:43)
at MainStreaming$.customerUpdates(MainStreaming.scala:165)
at MainStreaming$.main(MainStreaming.scala:240)
at MainStreaming.main(MainStreaming.scala)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202)
at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)
at redis.clients.jedis.Protocol.process(Protocol.java:151)
at redis.clients.jedis.Protocol.read(Protocol.java:215)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276)
at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60)
at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55)
at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:127)
at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196)
... 32 more
18/10/15 17:29:45 INFO SparkContext: Invoking stop() from shutdown hook
18/10/15 17:29:45 INFO SparkUI: Stopped Spark web UI at http://192.168.34.216:4043
18/10/15 17:29:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/10/15 17:29:45 INFO MemoryStore: MemoryStore cleared
18/10/15 17:29:45 INFO BlockManager: BlockManager stopped
18/10/15 17:29:45 INFO BlockManagerMaster: BlockManagerMaster stopped
18/10/15 17:29:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/10/15 17:29:45 INFO SparkContext: Successfully stopped SparkContext
18/10/15 17:29:45 INFO ShutdownHookManager: Shutdown hook called
18/10/15 17:29:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-11ef82ef-a4b2-49ea-9e65-66955bc948cb
18/10/15 17:29:45 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-c274d20d-87f4-4c51-963e-bd7699233ab0

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
`

@itamarhaber
Copy link
Member

@beyhangl Is it also possible to look at the redis.log file from around that time? Also, does running SLOWLOG GET return anything?

@beyhangl
Copy link
Author

Redis log

16184:C 11 Oct 16:53:01.976 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
16184:C 11 Oct 16:53:01.976 # Redis version=4.0.11, bits=64, commit=00000000, modified=0, pid=16184, just started
16184:C 11 Oct 16:53:01.976 # Configuration loaded
16184:M 11 Oct 16:53:01.977 * Increased maximum number of open files to 10032 (it was originally set to 1024).
.
.-__ ''-._ _.- . . ''-._ Redis 4.0.11 (00000000/0) 64 bit
.- .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.-.|'_.-'| Port: 6379 | -. ._ / _.-' | PID: 16184 -._ -._ -./ .-' .-'
|-._-.
-.__.-' _.-'_.-'| | -.
-._ _.-'_.-' | http://redis.io -._ -._-..-'.-' .-'
|-._-.
-.__.-' _.-'_.-'| | -.
-._ _.-'_.-' | -._ -._-.
.-'_.-' _.-'
-._ -..-' _.-'
-._ _.-' -.
.-'

16184:M 11 Oct 16:53:01.978 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
16184:M 11 Oct 16:53:01.978 # Server initialized
16184:M 11 Oct 16:53:01.978 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
16184:M 11 Oct 16:53:01.978 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
16184:M 11 Oct 16:53:11.189 * DB loaded from disk: 9.210 seconds
16184:M 11 Oct 16:53:11.189 * Ready to accept connections

SLOWLOG GET

127.0.0.1:6379> SLOWLOG GET

    1. (integer) 0
    2. (integer) 1539615703
    3. (integer) 4652680
      1. "HGETALL"
      2. "customerTesttim"
    4. "127.0.0.1:48558"
    5. ""

@fe2s
Copy link
Collaborator

fe2s commented Oct 18, 2018

As a workaround you may try to increase the timeout by setting
.set("redis.timeout", "10000") or
.set("spark.redis.timeout", "10000")
depending on which spark-redis version you use.
The default timeout is 2000 (2 seconds).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants