Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery Storage API: RuntimeError: ("can't start new thread", 'occurred at index 0') #9

Closed
megancooper opened this issue Dec 11, 2019 · 3 comments
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. needs more info This issue needs more information from the customer to proceed. type: question Request for information or clarification. Not an issue.

Comments

@megancooper
Copy link

Issue

I am running a spark script that needs to perform a count(*) query 30x for every row in a dataframe. The dataframe on average has 25000 rows, which means after completing the script should have made 750000 requests/queries to the BigQuery table.

For some reason with a large amount of executors I ran into the RuntimeError detailed in the stacktrace below, where it seems the google api core is unable to create a bigquery client. Is this because my script is creating too many clients? Apologies if my code is incorrect or I am using the client wrong, I am new to BigQuery and have not used this api before. What would be the best way to use the BIgQuery storage api in this use case?

Environment details

  1. PySpark script running in AWS EMR clusters, with 30+ executors
  2. BigQuery Storage API (python)

Steps to reproduce

  1. ?

Code Example Of PySpark Script

def query_cache(query):
    bqclient, bqstorageclient = clients()

        dataframe = (
            bqclient.query(query)
                .result()
                .to_dataframe(bqstorage_client=bqstorageclient)
        )
        return dataframe['f0_'][0]

@pandas_udf(schema(), PandasUDFType.GROUPED_MAP)
def calc_counts(df):
    query = "select count(*) from dataset.table where ...{some column filters}..."
    df['count'] = df.apply(query_cache, args=(query), axis=1)

Stack trace

py4j.protocol.Py4JJavaError: An error occurred while calling o526.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 54, ip-172-31-8-118.us-west-2.compute.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 113, in wrapped
    result = f(pd.concat(value_series, axis=1))
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/util.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/home/hadoop/metrics_bq.py", line 724, in calc_comps
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/frame.py", line 6928, in apply
    return op.get_result()
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 186, in get_result
    return self.apply_standard()
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 292, in apply_standard
    self.apply_series_generator()
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 321, in apply_series_generator
    results[i] = self.f(v)
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 112, in f
    return func(x, *args, **kwds)
  File "/home/hadoop/metrics_bq.py", line 718, in count_comps_sql
  File "/home/hadoop/metrics_bq.py", line 652, in query_cache
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1636, in to_dataframe
    bqstorage_client=bqstorage_client, dtypes=dtypes
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1414, in _to_page_iterable
    for item in bqstorage_download():
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 632, in _download_table_bqstorage
    requested_streams=requested_streams,
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1beta1/gapic/big_query_storage_client.py", line 318, in create_read_session
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 277, in retry_wrapped_func
    on_error=on_error,
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 182, in retry_target
    return target()
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 689, in __call__
    wait_for_ready, compression)
  File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 676, in _blocking
    ),), self._context)
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 500, in grpc._cython.cygrpc.Channel.segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 368, in grpc._cython.cygrpc._segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 362, in grpc._cython.cygrpc._segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 222, in grpc._cython.cygrpc._call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 250, in grpc._cython.cygrpc._call
  File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 56, in grpc._cython.cygrpc._get_metadata
  File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 31, in grpc._cython.cygrpc._spawn_callback_async
  File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 22, in grpc._cython.cygrpc._spawn_callback_in_thread
  File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 119, in grpc._cython.cygrpc.ForkManagedThread.start
  File "/home/hadoop/conda/lib/python3.7/threading.py", line 852, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: ("can't start new thread", 'occurred at index 0')

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/worker.py", line 113, in wrapped
    result = f(pd.concat(value_series, axis=1))
  File "/mnt/yarn/usercache/hadoop/appcache/application_1576044279513_0001/container_1576044279513_0001_01_000009/pyspark.zip/pyspark/util.py", line 113, in wrapper
    return f(*args, **kwargs)
  File "/home/hadoop/metrics_bq.py", line 724, in calc_comps
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/frame.py", line 6928, in apply
    return op.get_result()
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 186, in get_result
    return self.apply_standard()
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 292, in apply_standard
    self.apply_series_generator()
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 321, in apply_series_generator
    results[i] = self.f(v)
  File "/home/hadoop/conda/lib/python3.7/site-packages/pandas/core/apply.py", line 112, in f
    return func(x, *args, **kwds)
  File "/home/hadoop/metrics_bq.py", line 718, in count_comps_sql
  File "/home/hadoop/metrics_bq.py", line 652, in query_cache
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1636, in to_dataframe
    bqstorage_client=bqstorage_client, dtypes=dtypes
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1414, in _to_page_iterable
    for item in bqstorage_download():
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 632, in _download_table_bqstorage
    requested_streams=requested_streams,
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/cloud/bigquery_storage_v1beta1/gapic/big_query_storage_client.py", line 318, in create_read_session
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 277, in retry_wrapped_func
    on_error=on_error,
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/retry.py", line 182, in retry_target
    return target()
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/home/hadoop/conda/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 689, in __call__
    wait_for_ready, compression)
  File "/home/hadoop/conda/lib/python3.7/site-packages/grpc/_channel.py", line 676, in _blocking
    ),), self._context)
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 500, in grpc._cython.cygrpc.Channel.segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 368, in grpc._cython.cygrpc._segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 362, in grpc._cython.cygrpc._segregated_call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 222, in grpc._cython.cygrpc._call
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 250, in grpc._cython.cygrpc._call
  File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 56, in grpc._cython.cygrpc._get_metadata
  File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 31, in grpc._cython.cygrpc._spawn_callback_async
  File "src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi", line 22, in grpc._cython.cygrpc._spawn_callback_in_thread
  File "src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi", line 119, in grpc._cython.cygrpc.ForkManagedThread.start
  File "/home/hadoop/conda/lib/python3.7/threading.py", line 852, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: ("can't start new thread", 'occurred at index 0')

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:156)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:295)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:266)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
@plamut plamut transferred this issue from googleapis/google-cloud-python Feb 4, 2020
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label Feb 4, 2020
@plamut plamut added the type: process A process-related concern. May include testing, release, or the like. label Feb 4, 2020
@plamut
Copy link
Contributor

plamut commented Mar 5, 2020

@megancooper This is more of an educated guess, but the following bit indeed seems relevant:

  ...
  File "/home/hadoop/conda/lib/python3.7/threading.py", line 852, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: ("can't start new thread", 'occurred at index 0')

An error is raised when Python cannot start a new thread. Since you mention a high number of executors, could it be that a max thread limit is hit and no more threads can be created?

You could log the number of active threads at key points in the program and see if the number gets too high just before the crash?

import threading
threading.active_count()   # this should be logged

If there are indeed too many threads created, one possible would be reducing the number of concurrent queries or somehow throttle them (I'm not familiar with Spark, unfortunately).

@meredithslota meredithslota added needs more info This issue needs more information from the customer to proceed. type: question Request for information or clarification. Not an issue. and removed type: process A process-related concern. May include testing, release, or the like. labels Jun 16, 2020
@shell-no-shell
Copy link

Hi, how do you solve this problem?

@meredithslota
Copy link
Contributor

Hi @shell-no-shell — we ended up closing this since the OP did not respond. If you have a similar issue, can you please file a new ticket (if you think it's a bug in the library) or a support issue (if it would require digging into project details, since we don't have access to that info here). Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. needs more info This issue needs more information from the customer to proceed. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

4 participants