Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diff Failure on AWS Glue #105

Closed
bobhaffner opened this issue Jul 28, 2022 · 7 comments
Closed

Diff Failure on AWS Glue #105

bobhaffner opened this issue Jul 28, 2022 · 7 comments

Comments

@bobhaffner
Copy link

Hi

Thanks for this awesome lib!

Hey, looking for some guidance on an issue I'm having

I'm trying to compare two dataframes for equality. It's not a requirement to know what's different just if they're different.

It works great when both dataframes are small (1m to 10m rows) rows, but fails when both frames are over 10 million. The same 59 columns exist in both frames. No crazy data types. Fairly sparse/Fair amount of NULLs

Any ideas or things I should try? Any additional details I can provide?

Additional Details

  • AWS Glue
  • 10 G.2X (32 gigs of ram, 8 vCPUs) Workers (i tried up to 50 workers)
  • Spark 3.1
  • spark-extension_2.12-2.1.0-3.1.jar

Code

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
import boto3

import rsg_gis_utils.core as rsg
import rsg_gis_utils.extract_core as rsg_extract

from gresearch.spark.diff import *

glue_client = boto3.client("glue")
ss = rsg.get_db_secret('pg_wh_db')


# glue and spark stuff
sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session


a_frame = (
    spark.read.format("jdbc")
    .option("url", ss["jdbc_url"])
    .option("user", ss["username"])
    .option("password", ss["password"])
    .option("dbtable", "(SELECT * FROM table WHERE something) as t")
    .option("driver", ss["driver"])
    .load()
)

b_frame = (
    spark.read.format("jdbc")
    .option("url", ss["jdbc_url"])
    .option("user", ss["username"])
    .option("password", ss["password"])
    .option("dbtable", "(SELECT * FROM table WHERE something) as t")
    .option("driver", ss["driver"])
    .load()
)

print('dataframes are the same', a_frame.diff(b_frame).where("diff != 'N'").count() == 0)

Errors that I've gotten
An error occurred while calling o118.count. Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 14) (10.226.42.94 executor 25): ExecutorLostFailure (executor 25 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

An error occurred while calling o110.count. Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6) (10.226.42.117 executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 652032 ms

@EnricoMi
Copy link
Contributor

EnricoMi commented Jul 28, 2022

Minor optimization:

Instead of

a_frame.diff(b_frame).where("diff != 'N'").count() == 0

you could do

not a_frame.diff(b_frame).where("diff != 'N'").isEmpty()

Because the former makes Spark compute the entire diff in order to return the count.
The latter makes Spark stop computing the diff once a single non-N row is found.

@EnricoMi
Copy link
Contributor

It looks like your a_frame and b_frame are single-partition dataframes. This means that Spark has all the data in a single executor, which runs out of memory with sufficiently many rows. Add more workers does not help then.

Try adding a .repartition(100) behind load().

@bobhaffner
Copy link
Author

Thanks, @EnricoMi ! I'll try out your suggestions

@bobhaffner
Copy link
Author

Hello! I tried not a_frame.diff(b_frame).where("diff != 'N'").isEmpty() and I tried a few Ns for the .repartition(), but no luck unfortunately. Any other suggestions are much appreciated!

@EnricoMi
Copy link
Contributor

EnricoMi commented Aug 4, 2022

Can you go to the Spark UI -> SQL tab -> click on the job that fails. If you could save that HTML page or screenshot it, that would help a lot. That page looks like this:

grafik

Also useful is the Executors tab and the Stages tab (click on the stage that fails).

Does .count() behind .load() work at all for over 10m rows?

@bobhaffner
Copy link
Author

So I stumbled across something...

Setting a fetchsize (eg.option("fetchsize","100000")) in the JDBC config allows me to successfully perform the subsquent diff on two 21M dataframes

@EnricoMi
Copy link
Contributor

EnricoMi commented Sep 6, 2022

So looks like this is unrelated to spark-extension and solved. Closing.

@EnricoMi EnricoMi closed this as completed Sep 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants