Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Error when Python Model Goes To Write To Database #628

Closed
2 tasks done
cuff-links opened this issue Apr 23, 2024 · 14 comments · Fixed by #665
Closed
2 tasks done

[Bug] Error when Python Model Goes To Write To Database #628

cuff-links opened this issue Apr 23, 2024 · 14 comments · Fixed by #665
Labels
bug Something isn't working

Comments

@cuff-links
Copy link
Contributor

cuff-links commented Apr 23, 2024

Is this a new bug in dbt-athena?

  • I believe this is a new bug in dbt-athena
  • I have searched the existing issues, and I could not find an existing issue for this bug

Current Behavior

When I run dbt run for a python model, there is a crash when the data is being inserted into the output. This does not happen with an equivalent SQL model.

Expected Behavior

We expect the database and tables to be created without error as is the same with the SQL Model.

Steps To Reproduce

1.) With the config and models provided in the additional context section,
attempt to run dbt run with that singular model.

Environment

- OS: OSX Sonoma
- Python:3.10.1
- dbt: 1.7.11
- dbt-athena-community: 1.7.2

Additional Context

Our Python Model

# import pyspark.sql.functions as F

def model(dbt, spark_session):
    dbt.config(materialized="table")

    # I have also tried with 
    # df = dbt.this
    df = dbt.source('sugardemo_accounts', 'sugardemo_accounts')
    
    return df

Our Equivalent SQL Model

{{
    config(materialized='table')
}}

SELECT *
FROM {{ source('sugardemo_accounts', 'sugardemo_accounts') }}

reef_aws_pipeline:
  outputs:
    dev:
      aws_access_key_id: {key}
      aws_profile_name: {name}
      aws_secret_access_key: {secret key}
      database: awsdatacatalog
      region_name: us-west-2
      s3_staging_dir: s3://temp-client-data-bucket/athena_results/
      schema: silver
      threads: 2
      type: athena
      work_group: primary
      spark_work_group: athena-spark
  target: dev
sources:
  - name: sugardemo_accounts
    schema: sugardemo_sugarcrm 
    tables:
      - name: sugardemo_accounts

Tables and Database list in AWS
Screenshot 2024-04-23 at 2 46 02 PM

Stack Trace from Spark Session


  File "<stdin>", line 122, in <module>
  File "<stdin>", line 93, in materialize
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 777, in saveAsTable
    self._jwrite.saveAsTable(name)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: Can not create a Path from an empty string

Output from console with debug flag on

00:53:35  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x107ff3cd0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x111e5bb50>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x111e5beb0>]}
00:53:35  Running with dbt=1.7.13
00:53:35  running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'log_cache_events': 'False', 'write_json': 'True', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/Users/cuffs/.dbt', 'debug': 'True', 'warn_error': 'None', 'log_path': '/Users/cuffs/Desktop/workspace/reef-aws-pipeline/reef_aws_pipeline/logs', 'fail_fast': 'False', 'version_check': 'True', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'invocation_command': 'dbt run -d', 'introspect': 'True', 'log_format': 'default', 'target_path': 'None', 'static_parser': 'True', 'send_anonymous_usage_stats': 'True'}
00:53:35  Sending event: {'category': 'dbt', 'action': 'project_id', 'label': '15ab759e-0be6-4c1b-8005-98a98a2f6030', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x113c5ff10>]}
00:53:35  Sending event: {'category': 'dbt', 'action': 'adapter_info', 'label': '15ab759e-0be6-4c1b-8005-98a98a2f6030', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x113be2260>]}
00:53:35  Registered adapter: athena=1.7.2
00:53:35  checksum: 6c24e2ee36a42cedd7e5de2827d214776eb7a8e8f425f6d8acb06c31add0afef, vars: {}, profile: , target: , version: 1.7.13
00:53:35  Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
00:53:35  Partial parsing enabled, no changes found, skipping parsing
00:53:35  Sending event: {'category': 'dbt', 'action': 'load_project', 'label': '15ab759e-0be6-4c1b-8005-98a98a2f6030', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x113fd8a00>]}
00:53:35  Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': '15ab759e-0be6-4c1b-8005-98a98a2f6030', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x113f9e740>]}
00:53:35  Found 2 models, 2 tests, 1 source, 0 exposures, 0 metrics, 427 macros, 0 groups, 0 semantic models
00:53:35  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '15ab759e-0be6-4c1b-8005-98a98a2f6030', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x113f9ec50>]}
00:53:35
00:53:35  Acquiring new athena connection 'master'
00:53:35  Acquiring new athena connection 'list_awsdatacatalog'
00:53:35  Opening a new connection, currently in state init
00:53:35  On list_awsdatacatalog: Close
00:53:35  Re-using an available connection from the pool (formerly list_awsdatacatalog, now list_awsdatacatalog_silver)
00:53:35  Opening a new connection, currently in state closed
00:53:36  On list_awsdatacatalog_silver: Close
00:53:36  Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '15ab759e-0be6-4c1b-8005-98a98a2f6030', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x113f9d4e0>]}
00:53:36  Concurrency: 2 threads (target='dev')
00:53:36
00:53:36  Began running node model.reef_aws_pipeline.sugardemo_accounts
00:53:36  1 of 1 START python table model silver.sugardemo_accounts ...................... [RUN]
00:53:36  Re-using an available connection from the pool (formerly list_awsdatacatalog_silver, now model.reef_aws_pipeline.sugardemo_accounts)
00:53:36  Began compiling node model.reef_aws_pipeline.sugardemo_accounts
00:53:36  Writing injected SQL for node "model.reef_aws_pipeline.sugardemo_accounts"
00:53:36  Timing info for model.reef_aws_pipeline.sugardemo_accounts (compile): 20:53:36.553003 => 20:53:36.577295
00:53:36  Began executing node model.reef_aws_pipeline.sugardemo_accounts
00:53:36  Skip partitioning: False
00:53:36  Opening a new connection, currently in state closed
00:53:37  dbt.adapters.athena.constants adapter: S3 path does not exist
00:53:37  On model.reef_aws_pipeline.sugardemo_accounts:













import pyspark


# import pyspark.sql.functions as F

def model(dbt, spark_session):
    # dbt.config(materialized="incremental", table_type='iceberg', incremental_strategy='append')
    dbt.config(materialized='table')
    df = dbt.source('sugardemo_accounts', 'sugardemo_accounts')

    return df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args, **kwargs):
    refs = {}
    key = '.'.join(args)
    version = kwargs.get("v") or kwargs.get("version")
    if version:
        key += f".v{version}"
    dbt_load_df_function = kwargs.get("dbt_load_df_function")
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {"sugardemo_accounts.sugardemo_accounts": "\"awsdatacatalog\".\"sugardemo_sugarcrm\".\"sugardemo_accounts\""}
    key = '.'.join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = "awsdatacatalog"
    schema = "silver"
    identifier = "sugardemo_accounts"

    def __repr__(self):
        return '"awsdatacatalog"."silver"."sugardemo_accounts"'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args, **kwargs: ref(*args, **kwargs, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = False

# COMMAND ----------



def materialize(spark_session, df, target_relation):
    import pandas
    if isinstance(df, pyspark.sql.dataframe.DataFrame):
        pass
    elif isinstance(df, pandas.core.frame.DataFrame):
        df = spark_session.createDataFrame(df)
    else:
        msg = f"{type(df)} is not a supported type for dbt Python materialization"
        raise Exception(msg)


    writer = df.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "s3://temp-client-data-bucket/athena_results/tables/silver/sugardemo_accounts/87fd25c1-2631-425b-9c0c-92bb3f4cf4d7") \
    .option("compression", "None") \
    .option("mergeSchema", "True") \
    .option("delimiter", "None")
    if None is not None:
        writer = writer.partitionBy(None)
    if None is not None:
        writer = writer.bucketBy(None,None)
    if None is not None:
        writer = writer.sortBy(None)

    writer.saveAsTable(
        name="silver.sugardemo_accounts",
    )


    return "Success: silver.sugardemo_accounts"

def get_spark_df(identifier):
    """
    Override the arguments to ref and source dynamically

    spark.table('awsdatacatalog.analytics_dev.model')
    Raises pyspark.sql.utils.AnalysisException:
    spark_catalog requires a single-part namespace,
    but got [awsdatacatalog, analytics_dev]

    So the override removes the catalog component and only
    provides the schema and identifer to spark.table()
    """
    return spark.table(".".join(identifier.split(".")[1:]).replace('"', ''))

class SparkdbtObj(dbtObj):
    def __init__(self):
        super().__init__(load_df_function=get_spark_df)
        self.source = lambda *args: source(*args, dbt_load_df_function=get_spark_df)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=get_spark_df)

dbt = SparkdbtObj()
df = model(dbt, spark)
materialize(spark, df, dbt.this)

00:53:37  dbt.adapters.athena.constants adapter: Setting timeout: 43200
00:53:37  dbt.adapters.athena.constants adapter: Setting polling_interval: 1.0
00:53:37  dbt.adapters.athena.constants adapter: Setting engine configuration: {'CoordinatorDpuSize': 1, 'MaxConcurrentDpus': 2, 'DefaultExecutorDpuSize': 1, 'SparkProperties': {}}
00:53:37  dbt.adapters.athena.constants adapter: Within thread limit, creating new session for model: "awsdatacatalog"."silver"."sugardemo_accounts" with session description: dbt: 15ab759e-0be6-4c1b-8005-98a98a2f6030 - 55e70a64c772e0a5d72d52af5f80f748.
00:53:41  dbt.adapters.athena.constants adapter: Session: dec786cc-0344-0e5a-f7b0-b0a3ea87430e created
00:53:41  dbt.adapters.athena.constants adapter: Model "awsdatacatalog"."silver"."sugardemo_accounts" - Using session: dec786cc-0344-0e5a-f7b0-b0a3ea87430e to start calculation execution.
00:53:59  Timing info for model.reef_aws_pipeline.sugardemo_accounts (execute): 20:53:36.578154 => 20:53:59.624478
00:53:59  On model.reef_aws_pipeline.sugardemo_accounts: Close
00:53:59  Runtime Error in model sugardemo_accounts (models/sugardemo_accounts.py)
  Calculation Id:   f4c786cc-0adb-8780-1813-ed4464486f59
  Session Id:     dec786cc-0344-0e5a-f7b0-b0a3ea87430e
  Status:         FAILED
  Reason:         None
  Stderr s3 path: s3://temp-client-data-bucket/athena_spark_results/session/dec786cc-0344-0e5a-f7b0-b0a3ea87430e/f4c786cc-0adb-8780-1813-ed4464486f59/stderr

00:53:59  Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '15ab759e-0be6-4c1b-8005-98a98a2f6030', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x115a08e50>]}
00:53:59  1 of 1 ERROR creating python table model silver.sugardemo_accounts ............. [ERROR in 23.09s]
00:53:59  Finished running node model.reef_aws_pipeline.sugardemo_accounts
00:53:59  Connection 'master' was properly closed.
00:53:59  Connection 'model.reef_aws_pipeline.sugardemo_accounts' was properly closed.
00:53:59
00:53:59  Finished running 1 table model in 0 hours 0 minutes and 24.35 seconds (24.35s).
00:53:59  Command end result
00:53:59
00:53:59  Completed with 1 error and 0 warnings:
00:53:59
00:53:59    Runtime Error in model sugardemo_accounts (models/sugardemo_accounts.py)
  Calculation Id:   f4c786cc-0adb-8780-1813-ed4464486f59
  Session Id:     dec786cc-0344-0e5a-f7b0-b0a3ea87430e
  Status:         FAILED
  Reason:         None
  Stderr s3 path: s3://temp-client-data-bucket/athena_spark_results/session/dec786cc-0344-0e5a-f7b0-b0a3ea87430e/f4c786cc-0adb-8780-1813-ed4464486f59/stderr

00:53:59
00:53:59  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
00:53:59  Resource report: {"command_name": "run", "command_wall_clock_time": 24.68739, "process_user_time": 1.707424, "process_kernel_time": 0.257221, "process_mem_max_rss": "187400192", "command_success": false, "process_in_blocks": "0", "process_out_blocks": "0"}
00:53:59  Command `dbt run` failed at 20:53:59.659130 after 24.69 seconds
00:53:59  Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x107ff3cd0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x115a53d90>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1134a5f90>]}
00:53:59  Flushing usage events

Output from console

Using python3.10 (3.10.14)
03:09:40  Running with dbt=1.7.13
03:09:41  Registered adapter: athena=1.7.2
03:09:41  Found 2 models, 2 tests, 1 source, 0 exposures, 0 metrics, 427 macros, 0 groups, 0 semantic models
03:09:41
03:09:42  Concurrency: 2 threads (target='dev')
03:09:42
03:09:42  1 of 1 START python incremental model silver.sugardemo_accounts ................ [RUN]
03:10:10  1 of 1 ERROR creating python incremental model silver.sugardemo_accounts ....... [ERROR in 28.77s]
03:10:10
03:10:10  Finished running 1 incremental model in 0 hours 0 minutes and 29.85 seconds (29.85s).
03:10:10
03:10:10  Completed with 1 error and 0 warnings:
03:10:10
03:10:10    Runtime Error in model sugardemo_accounts (models/sugardemo_accounts.py)
  Calculation Id:   c2c78477-28ef-d99c-abcf-17789d5a095f
  Session Id:     64c78477-214e-fbf7-2d09-3eac41854188
  Status:         FAILED
  Reason:         None
  Stderr s3 path: s3://temp-client-data-bucket/athena_spark_results/session/64c78477-214e-fbf7-2d09-3eac41854188/c2c78477-28ef-d99c-abcf-17789d5a095f/stderr

03:10:10
03:10:10  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
@cuff-links cuff-links added the bug Something isn't working label Apr 23, 2024
@nicor88
Copy link
Member

nicor88 commented Apr 24, 2024

@Avinash-1394 maybe you have some clue here? since you implemented the python models feature.

@Avinash-1394
Copy link
Contributor

I think the get_spark_df function needs to be slightly modified for sources. dbt default does use the same logic for both sources and ref but somehow that does not seem to be valid here.

I will try and recreate this issue.

@iamrobo
Copy link

iamrobo commented Apr 24, 2024

@Avinash-1394 do you think this could be the issue? https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-troubleshooting-tables.html#notebooks-spark-troubleshooting-tables-illegal-argument-exception

my guess is that the database was created initially by a Athena SQL model, which didn’t set the location field at the database level. Then the user tries to write to the same database with a python model and it throws the error.

@Avinash-1394
Copy link
Contributor

@iamrobo That is does make sense. I didn't test the source feature when I added python models so I will have to recreate this to fully understand what is happening. I just assumed it will be very similar to ref but looks like there are some differences I didn't foresee.

@cuff-links
Copy link
Contributor Author

cuff-links commented Apr 24, 2024

@Avinash-1394 do you think this could be the issue? https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-troubleshooting-tables.html#notebooks-spark-troubleshooting-tables-illegal-argument-exception

my guess is that the database was created initially by a Athena SQL model, which didn’t set the location field at the database level. Then the user tries to write to the same database with a python model and it throws the error.

This is correct! I went in and modified the table directly and added a location and it did work.

@Avinash-1394
Copy link
Contributor

@cuff-links Glad you found a resolution so quickly. Do you mind providing some steps you took to fix so that it is documented in the GH issue itself? I will let @nicor88 decide if the issue should be closed or kept open.

@nicor88
Copy link
Member

nicor88 commented Apr 24, 2024

@Avinash-1394 if we don't have any action need in term of code changes we can close it.
To me seems that the issue is being fixed, and there are no changes in the adapter to be made. If this is correct we can close it.

@cuff-links
Copy link
Contributor Author

@cuff-links Glad you found a resolution so quickly. Do you mind providing some steps you took to fix so that it is documented in the GH issue itself? I will let @nicor88 decide if the issue should be closed or kept open.

@nicor88 I would say that it's more of a workaround. The issue still persists. I was following the dialog here and saw that there were comments made about the location field on the database. It was empty so I filled it out in AWS and reran the model and it was able to work. But I think there should still be a fix since this would take manual intervention if the same situation happens. I think the models should be able to be used interchangeably.

@nicor88
Copy link
Member

nicor88 commented Apr 24, 2024

@cuff-links Let me get this right, without having a location in the database this won't work? Also I really recommend to have database creation outside dbt, this apply to whatever system you use. I'm not sure then that we need to fix the issue here.

@iamrobo
Copy link

iamrobo commented Apr 24, 2024

@nicor88, would it make sense to add the location to the database creation in the SQL materialization macros, or to add the location if it doesn't already exists in the python materialization macros, or just throw a better exception?

@nicor88
Copy link
Member

nicor88 commented Apr 24, 2024

I believe that we should throw a better error, and then we need to add to the documentation somewhere (in the readme) a section about python models, where we need to specify that the location must be set.

I'm unsure now that we should add the location in the SQL macro materialization. If the db is created from scratch could make sense, but if the db already exists we shouldn't touch it.

@cuff-links in your case the source db was already existing right?

@cuff-links
Copy link
Contributor Author

cuff-links commented Apr 24, 2024

I believe that we should throw a better error, and then we need to add to the documentation somewhere (in the readme) a section about python models, where we need to specify that the location must be set.

I'm unsure now that we should add the location in the SQL macro materialization. If the db is created from scratch could make sense, but if the db already exists we shouldn't touch it.

@cuff-links in your case the source db was already existing right?

Yes, that is correct. I agree that a better error and some documentation would be good for this.

@cuff-links
Copy link
Contributor Author

Should this still be open?

@nicor88
Copy link
Member

nicor88 commented May 27, 2024

@cuff-links yes, any of the actions about were not taken.
Fell free to pick this up, at least the documentation part.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants