Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add python model support #188

Closed

Conversation

Avinash-1394
Copy link
Contributor

@Avinash-1394 Avinash-1394 commented Mar 27, 2023

Description

Support dbt python models by using Spark

Docs - https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html

Bugs currently identified:

  • mock_athena does not support these functions yet
  • Incremental model does not fully utilize spark capabilities

Prerequisites

  1. A spark enabled work group created in athena
  2. Spark execution role granted access to Athena, Glue and S3
  3. The spark work group is added to the ~/.dbt/profiles.yml file and the profile is referenced in dbt_project.yml
analytics_spark:
  outputs:
    dev:
      database: awsdatacatalog
      region_name: us-east-1
      s3_data_dir: s3://dbt-athena/
      s3_staging_dir: s3://<user>-athena-query-results/
      schema: analytics_dev
      threads: 4
      type: athena
      work_group: primary
      spark_work_group: spark
  target: dev

Models used to test - Optional

You can add the below models to your dbt project or you can clone this repository and do poetry installation & run

model
{{ config(materialized="table") }}
select 1 as column_1, 2 as column_2, '{{ run_started_at.strftime("%Y-%m-%d") }}' as run_date
python_table
import pandas as pd


def model(dbt, session):
    dbt.config(materialized="table")

    model_df = pd.DataFrame({"A": [1, 2, 3, 4]})

    return model_df
python_incremental
import pandas as pd


def model(dbt, session):
    dbt.config(materialized="incremental")
    df = dbt.ref("model")

    if dbt.is_incremental:
        max_from_this = f"select max(run_date) from {dbt.this}"
        df = df.filter(df.run_date >= session.sql(max_from_this).collect()[0][0])

    return df

Build

dbt output


============================== 2023-04-03 23:24:42.103241 | 3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75 ==============================
�[0m23:24:42.103241 [info ] [MainThread]: Running with dbt=1.4.4
�[0m23:24:42.105171 [debug] [MainThread]: running dbt with arguments {'debug': True, 'write_json': True, 'use_colors': True, 'printer_width': 80, 'version_check': True, 'partial_parse': True, 'static_parser': True, 'profiles_dir': '/home/avinash1394/.dbt', 'send_anonymous_usage_stats': True, 'quiet': False, 'no_print': False, 'cache_selected_only': False, 'which': 'run', 'rpc_method': 'run', 'indirect_selection': 'eager'}
�[0m23:24:42.105507 [debug] [MainThread]: Tracking: tracking
�[0m23:24:42.106708 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efecc7e7d30>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efeca585c60>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efeca5847c0>]}
�[0m23:24:42.115289 [debug] [MainThread]: checksum: 170d819e8a7f11e09c497566dd7f61e1355cb9fb514921503937b951cb4a2250, vars: {}, profile: None, target: None, version: 1.4.4
�[0m23:24:42.135156 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
�[0m23:24:42.135605 [debug] [MainThread]: Partial parsing enabled, no changes found, skipping parsing
�[0m23:24:42.141326 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'load_project', 'label': '3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efeca5480d0>]}
�[0m23:24:42.146448 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': '3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efeca518490>]}
�[0m23:24:42.146964 [info ] [MainThread]: Found 3 models, 0 tests, 0 snapshots, 0 analyses, 313 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
�[0m23:24:42.147303 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efecc7e7d30>]}
�[0m23:24:42.148463 [info ] [MainThread]: 
�[0m23:24:42.150216 [debug] [MainThread]: Acquiring new athena connection 'master'
�[0m23:24:42.151734 [debug] [ThreadPool]: Acquiring new athena connection 'list_awsdatacatalog'
�[0m23:24:42.152344 [debug] [ThreadPool]: Opening a new connection, currently in state init
�[0m23:24:42.723778 [debug] [ThreadPool]: On list_awsdatacatalog: Close
�[0m23:24:42.727164 [debug] [ThreadPool]: Acquiring new athena connection 'list_awsdatacatalog_analytics_dev'
�[0m23:24:42.730022 [debug] [ThreadPool]: Opening a new connection, currently in state closed
�[0m23:24:43.445183 [debug] [ThreadPool]: On list_awsdatacatalog_analytics_dev: Close
�[0m23:24:43.450169 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efeca5c9390>]}
�[0m23:24:43.451946 [info ] [MainThread]: Concurrency: 4 threads (target='dev')
�[0m23:24:43.452818 [info ] [MainThread]: 
�[0m23:24:43.464314 [debug] [Thread-1 (]: Began running node model.dbt_athena_spark.model
�[0m23:24:43.465632 [debug] [Thread-2 (]: Began running node model.dbt_athena_spark.my_first_dbt_model
�[0m23:24:43.466841 [info ] [Thread-1 (]: 1 of 3 START sql table model analytics_dev.model ............................... [RUN]
�[0m23:24:43.467984 [info ] [Thread-2 (]: 2 of 3 START python table model analytics_dev.my_first_dbt_model ............... [RUN]
�[0m23:24:43.469556 [debug] [Thread-1 (]: Acquiring new athena connection 'model.dbt_athena_spark.model'
�[0m23:24:43.471026 [debug] [Thread-2 (]: Acquiring new athena connection 'model.dbt_athena_spark.my_first_dbt_model'
�[0m23:24:43.472068 [debug] [Thread-1 (]: Began compiling node model.dbt_athena_spark.model
�[0m23:24:43.473049 [debug] [Thread-2 (]: Began compiling node model.dbt_athena_spark.my_first_dbt_model
�[0m23:24:43.478703 [debug] [Thread-1 (]: Writing injected SQL for node "model.dbt_athena_spark.model"
�[0m23:24:43.517472 [debug] [Thread-2 (]: Writing injected SQL for node "model.dbt_athena_spark.my_first_dbt_model"
�[0m23:24:43.518586 [debug] [Thread-1 (]: Timing info for model.dbt_athena_spark.model (compile): 2023-04-03 23:24:43.473840 => 2023-04-03 23:24:43.518438
�[0m23:24:43.519504 [debug] [Thread-2 (]: Timing info for model.dbt_athena_spark.my_first_dbt_model (compile): 2023-04-03 23:24:43.479242 => 2023-04-03 23:24:43.519394
�[0m23:24:43.520381 [debug] [Thread-1 (]: Began executing node model.dbt_athena_spark.model
�[0m23:24:43.521304 [debug] [Thread-2 (]: Began executing node model.dbt_athena_spark.my_first_dbt_model
�[0m23:24:43.561076 [debug] [Thread-1 (]: Opening a new connection, currently in state closed
�[0m23:24:43.564212 [debug] [Thread-2 (]: Opening a new connection, currently in state init
�[0m23:24:44.145523 [debug] [Thread-2 (]: Athena adapter: table_name : my_first_dbt_model
�[0m23:24:44.146605 [debug] [Thread-2 (]: Athena adapter: table type : table
�[0m23:24:44.161340 [debug] [Thread-1 (]: Athena adapter: table_name : model
�[0m23:24:44.162252 [debug] [Thread-1 (]: Athena adapter: table type : table
�[0m23:24:44.619308 [debug] [Thread-1 (]: Athena adapter: analytics_dev.model is stored in s3://dbt-athena/analytics_dev/model/1ab22d75-15e7-48f6-bf5c-c7a9d6b33ff9/
�[0m23:24:44.645411 [debug] [Thread-2 (]: Athena adapter: analytics_dev.my_first_dbt_model is stored in s3://dbt-athena/analytics_dev/my_first_dbt_model/ab81468d-6efc-48ac-a6f5-1ce660ae758c
�[0m23:24:45.158371 [debug] [Thread-1 (]: Athena adapter: Deleting table data: path='s3://dbt-athena/analytics_dev/model/1ab22d75-15e7-48f6-bf5c-c7a9d6b33ff9/', bucket='dbt-athena', prefix='analytics_dev/model/1ab22d75-15e7-48f6-bf5c-c7a9d6b33ff9/'
�[0m23:24:45.161924 [debug] [Thread-2 (]: Athena adapter: Deleting table data: path='s3://dbt-athena/analytics_dev/my_first_dbt_model/ab81468d-6efc-48ac-a6f5-1ce660ae758c', bucket='dbt-athena', prefix='analytics_dev/my_first_dbt_model/ab81468d-6efc-48ac-a6f5-1ce660ae758c/'
�[0m23:24:45.680401 [debug] [Thread-1 (]: Using athena connection "model.dbt_athena_spark.model"
�[0m23:24:45.681385 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: drop table if exists `analytics_dev`.`model`
�[0m23:24:45.685620 [debug] [Thread-2 (]: Using athena connection "model.dbt_athena_spark.my_first_dbt_model"
�[0m23:24:45.690158 [debug] [Thread-2 (]: On model.dbt_athena_spark.my_first_dbt_model: drop table if exists `analytics_dev`.`my_first_dbt_model`
�[0m23:24:47.550615 [debug] [Thread-1 (]: SQL status: OK -1 in 2 seconds
�[0m23:24:47.564600 [debug] [Thread-2 (]: SQL status: OK -1 in 2 seconds
�[0m23:24:48.314214 [debug] [Thread-1 (]: Athena adapter: S3 path does not exist
�[0m23:24:48.315321 [debug] [Thread-1 (]: Writing runtime sql for node "model.dbt_athena_spark.model"
�[0m23:24:48.317033 [debug] [Thread-1 (]: Using athena connection "model.dbt_athena_spark.model"
�[0m23:24:48.317810 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: -- /* {"app": "dbt", "dbt_version": "1.4.4", "profile_name": "analytics_spark", "target_name": "dev", "node_id": "model.dbt_athena_spark.model"} */

  
    

  create table "analytics_dev"."model"
    with (
      table_type='hive',
      is_external=true,
      external_location='s3://dbt-athena/analytics_dev/model/6d1b473e-ee73-4a6c-b9f2-195e54b2b40d',
      format='parquet'
    )
    as
      
select 1 as column_1, 2 as column_2, '2023-04-03' as run_date
  
�[0m23:24:48.350239 [debug] [Thread-2 (]: Athena adapter: S3 path does not exist
�[0m23:24:48.355072 [debug] [Thread-2 (]: Writing runtime python for node "model.dbt_athena_spark.my_first_dbt_model"
�[0m23:24:48.356260 [debug] [Thread-2 (]: On model.dbt_athena_spark.my_first_dbt_model: 
  
    

  import pandas as pd


def model(dbt, session):
    dbt.config(materialized="table")

    model_df = pd.DataFrame({"A": [1, 2, 3, 4]})

    return model_df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args,dbt_load_df_function):
    refs = {}
    key = ".".join(args)
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {}
    key = ".".join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = 'awsdatacatalog'
    schema = 'analytics_dev'
    identifier = 'my_first_dbt_model'
    def __repr__(self):
        return 'analytics_dev.my_first_dbt_model'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = False

# COMMAND ----------



def materialize(spark_session, df, target_relation):
    import pandas
    try:
        if isinstance(df, pandas.core.frame.DataFrame):
            df = spark_session.createDataFrame(df)
        df.write \
        .format("parquet") \
        .option("path", "s3://dbt-athena/analytics_dev/my_first_dbt_model/bd1ebe1c-e14f-4f6a-839c-d265353e15ce") \
        .mode("overwrite") \
        .saveAsTable(
            name="analytics_dev.my_first_dbt_model",
        )
        return "OK"
    except Exception:
        raise

dbt = dbtObj(spark.table)
df = model(dbt, spark)
materialize(spark, df, dbt.this)
  
�[0m23:24:49.277160 [debug] [Thread-2 (]: Athena adapter: Submitted calculation execution id 20c3a4b7-ffb7-dbed-4ffa-20f9fd060d07
�[0m23:24:49.792450 [debug] [Thread-1 (]: SQL status: OK -1 in 1 seconds
�[0m23:24:49.799724 [debug] [Thread-1 (]: Using athena connection "model.dbt_athena_spark.model"
�[0m23:24:49.800766 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: alter table `analytics_dev`.`model` set tblproperties ('classification' = 'parquet')
�[0m23:24:51.256504 [debug] [Thread-1 (]: SQL status: OK -1 in 1 seconds
�[0m23:24:51.270813 [debug] [Thread-1 (]: Timing info for model.dbt_athena_spark.model (execute): 2023-04-03 23:24:43.522016 => 2023-04-03 23:24:51.270699
�[0m23:24:51.271712 [debug] [Thread-1 (]: On model.dbt_athena_spark.model: Close
�[0m23:24:51.273239 [debug] [Thread-1 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efeca585ab0>]}
�[0m23:24:51.274397 [info ] [Thread-1 (]: 1 of 3 OK created sql table model analytics_dev.model .......................... [�[32mOK -1�[0m in 7.80s]
�[0m23:24:51.277656 [debug] [Thread-1 (]: Finished running node model.dbt_athena_spark.model
�[0m23:24:51.279058 [debug] [Thread-4 (]: Began running node model.dbt_athena_spark.my_incremental_dbt_model
�[0m23:24:51.280067 [info ] [Thread-4 (]: 3 of 3 START python incremental model analytics_dev.my_incremental_dbt_model ... [RUN]
�[0m23:24:51.281391 [debug] [Thread-4 (]: Acquiring new athena connection 'model.dbt_athena_spark.my_incremental_dbt_model'
�[0m23:24:51.282138 [debug] [Thread-4 (]: Began compiling node model.dbt_athena_spark.my_incremental_dbt_model
�[0m23:24:51.293378 [debug] [Thread-4 (]: Writing injected SQL for node "model.dbt_athena_spark.my_incremental_dbt_model"
�[0m23:24:51.294473 [debug] [Thread-4 (]: Timing info for model.dbt_athena_spark.my_incremental_dbt_model (compile): 2023-04-03 23:24:51.282741 => 2023-04-03 23:24:51.294358
�[0m23:24:51.295253 [debug] [Thread-4 (]: Began executing node model.dbt_athena_spark.my_incremental_dbt_model
�[0m23:24:51.365718 [debug] [Thread-4 (]: temporary relation isanalytics_devmy_incremental_dbt_model__dbt_tmp
�[0m23:24:51.367263 [debug] [Thread-4 (]: Opening a new connection, currently in state init
�[0m23:24:51.897433 [debug] [Thread-4 (]: Athena adapter: Error calling Glue get_table: An error occurred (EntityNotFoundException) when calling the GetTable operation: Table my_incremental_dbt_model__dbt_tmp not found.
�[0m23:24:52.640014 [debug] [Thread-4 (]: Athena adapter: S3 path does not exist
�[0m23:24:52.641257 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: 
  
    

  import pandas as pd


def model(dbt, session):
    dbt.config(materialized="incremental")
    df = dbt.ref("model")

    if dbt.is_incremental:
        max_from_this = f"select max(run_date) from {dbt.this}"
        df = df.filter(df.run_date >= session.sql(max_from_this).collect()[0][0])

    return df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args,dbt_load_df_function):
    refs = {"model": "analytics_dev.model"}
    key = ".".join(args)
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {}
    key = ".".join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = 'awsdatacatalog'
    schema = 'analytics_dev'
    identifier = 'my_incremental_dbt_model'
    def __repr__(self):
        return 'analytics_dev.my_incremental_dbt_model'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args: ref(*args, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = True

# COMMAND ----------



def materialize(spark_session, df, target_relation):
    import pandas
    try:
        if isinstance(df, pandas.core.frame.DataFrame):
            df = spark_session.createDataFrame(df)
        df.write \
        .format("parquet") \
        .option("path", "s3://dbt-athena/analytics_dev/my_incremental_dbt_model__dbt_tmp/024a5a18-9176-477e-9ef6-71a912166b6c") \
        .mode("overwrite") \
        .saveAsTable(
            name="analytics_dev.my_incremental_dbt_model__dbt_tmp",
        )
        return "OK"
    except Exception:
        raise

dbt = dbtObj(spark.table)
df = model(dbt, spark)
materialize(spark, df, dbt.this)
  
�[0m23:24:53.376039 [debug] [Thread-4 (]: Athena adapter: Submitted calculation execution id 00c3a4b8-07b8-b91c-7333-4444fd3b2875
�[0m23:25:03.888763 [debug] [Thread-2 (]: Athena adapter: Received execution status COMPLETED
�[0m23:25:03.976429 [debug] [Thread-2 (]: Execution status: OK in 15.62 seconds
�[0m23:25:03.978580 [debug] [Thread-2 (]: Timing info for model.dbt_athena_spark.my_first_dbt_model (execute): 2023-04-03 23:24:43.542495 => 2023-04-03 23:25:03.978517
�[0m23:25:03.979020 [debug] [Thread-2 (]: On model.dbt_athena_spark.my_first_dbt_model: Close
�[0m23:25:03.979870 [debug] [Thread-2 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efec9415420>]}
�[0m23:25:03.980409 [info ] [Thread-2 (]: 2 of 3 OK created python table model analytics_dev.my_first_dbt_model .......... [�[32mOK�[0m in 20.51s]
�[0m23:25:03.980877 [debug] [Thread-2 (]: Finished running node model.dbt_athena_spark.my_first_dbt_model
�[0m23:25:07.865205 [debug] [Thread-4 (]: Athena adapter: Received execution status COMPLETED
�[0m23:25:08.007879 [debug] [Thread-4 (]: Execution status: OK in 15.37 seconds
�[0m23:25:08.486506 [debug] [Thread-4 (]: Athena adapter: Columns in relation my_incremental_dbt_model: [{'Name': 'column_1', 'Type': 'int', 'Comment': ''}, {'Name': 'column_2', 'Type': 'int', 'Comment': ''}, {'Name': 'run_date', 'Type': 'varchar(10)', 'Comment': ''}]
�[0m23:25:08.488714 [debug] [Thread-4 (]: 
    
    insert into "analytics_dev"."my_incremental_dbt_model" ("column_1", "column_2", "run_date")
    (
       select "column_1", "column_2", "run_date"
       from "analytics_dev"."my_incremental_dbt_model__dbt_tmp"
    );
�[0m23:25:08.492212 [debug] [Thread-4 (]: Writing runtime python for node "model.dbt_athena_spark.my_incremental_dbt_model"
�[0m23:25:08.494996 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: 
    
      
          
          
      
    
  
�[0m23:25:09.347206 [debug] [Thread-4 (]: Athena adapter: Submitted calculation execution id d8c3a4b8-26e7-e6a5-cbcc-e64157ed6fbf
�[0m23:25:09.476405 [debug] [Thread-4 (]: Athena adapter: Received execution status COMPLETED
�[0m23:25:09.560324 [debug] [Thread-4 (]: Execution status: OK in 1.06 seconds
�[0m23:25:10.052199 [debug] [Thread-4 (]: Athena adapter: table_name : my_incremental_dbt_model__dbt_tmp
�[0m23:25:10.052961 [debug] [Thread-4 (]: Athena adapter: table type : table
�[0m23:25:10.551817 [debug] [Thread-4 (]: Athena adapter: analytics_dev.my_incremental_dbt_model__dbt_tmp is stored in s3://dbt-athena/analytics_dev/my_incremental_dbt_model__dbt_tmp/024a5a18-9176-477e-9ef6-71a912166b6c
�[0m23:25:10.918950 [debug] [Thread-4 (]: Athena adapter: Deleting table data: path='s3://dbt-athena/analytics_dev/my_incremental_dbt_model__dbt_tmp/024a5a18-9176-477e-9ef6-71a912166b6c', bucket='dbt-athena', prefix='analytics_dev/my_incremental_dbt_model__dbt_tmp/024a5a18-9176-477e-9ef6-71a912166b6c/'
�[0m23:25:11.408505 [debug] [Thread-4 (]: Using athena connection "model.dbt_athena_spark.my_incremental_dbt_model"
�[0m23:25:11.410070 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: drop table if exists `analytics_dev`.`my_incremental_dbt_model__dbt_tmp`
�[0m23:25:13.086708 [debug] [Thread-4 (]: SQL status: OK -1 in 2 seconds
�[0m23:25:13.091217 [debug] [Thread-4 (]: Timing info for model.dbt_athena_spark.my_incremental_dbt_model (execute): 2023-04-03 23:24:51.295881 => 2023-04-03 23:25:13.091109
�[0m23:25:13.092127 [debug] [Thread-4 (]: On model.dbt_athena_spark.my_incremental_dbt_model: Close
�[0m23:25:13.093652 [debug] [Thread-4 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '3d6bdb09-bba6-4b8c-9bdc-ff76eb1afb75', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efea76d69e0>]}
�[0m23:25:13.094774 [info ] [Thread-4 (]: 3 of 3 OK created python incremental model analytics_dev.my_incremental_dbt_model  [�[32mOK�[0m in 21.81s]
�[0m23:25:13.095632 [debug] [Thread-4 (]: Finished running node model.dbt_athena_spark.my_incremental_dbt_model
�[0m23:25:13.098636 [debug] [MainThread]: Acquiring new athena connection 'master'
�[0m23:25:13.100103 [debug] [MainThread]: Connection 'master' was properly closed.
�[0m23:25:13.100840 [debug] [MainThread]: Connection 'model.dbt_athena_spark.model' was properly closed.
�[0m23:25:13.101504 [debug] [MainThread]: Connection 'model.dbt_athena_spark.my_first_dbt_model' was properly closed.
�[0m23:25:13.102163 [debug] [MainThread]: Connection 'model.dbt_athena_spark.my_incremental_dbt_model' was properly closed.
�[0m23:25:13.104309 [info ] [MainThread]: 
�[0m23:25:13.105121 [info ] [MainThread]: Finished running 2 table models, 1 incremental model in 0 hours 0 minutes and 30.96 seconds (30.96s).
�[0m23:25:13.106144 [debug] [MainThread]: Command end result
�[0m23:25:13.120593 [info ] [MainThread]: 
�[0m23:25:13.121505 [info ] [MainThread]: �[32mCompleted successfully�[0m
�[0m23:25:13.122211 [info ] [MainThread]: 
�[0m23:25:13.122947 [info ] [MainThread]: Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
�[0m23:25:13.123752 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efec9266290>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efea76a3df0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7efea76a3a60>]}
�[0m23:25:13.124622 [debug] [MainThread]: Flushing usage events

Tests added

  1. Check if session id is obtained

TODO:

  1. Check if calculation is submitted
  2. Check if execution is polled

Checklist

  • You followed contributing section
  • You added unit testing when necessary
  • You added functional testing when necessary

@Avinash-1394 Avinash-1394 marked this pull request as draft March 27, 2023 19:24
@Avinash-1394 Avinash-1394 changed the title Add python model support feat: Add python model support Mar 27, 2023
@Avinash-1394
Copy link
Contributor Author

@nicor88
Can you provide some help on how to test this with mock_athena? I want to mock some calls with dict results to boto3.Session.client calls.

Also, I want to use cached_property which was only introduced in version 3.8 so the unit tests for 3.7 is failing. 3.7 is almost end of support can we ignore it?

@nicor88
Copy link
Member

nicor88 commented Mar 28, 2023

@Avinash-1394 have a lock at monkeypatch to mock specific calls from a function - https://www.patricksoftwareblog.com/monkeypatching-with-pytest/

Regarding the issue with cached_property I left you a suggestion, it's should work, but I didn't test it.

Copy link
Member

@Jrmyy Jrmyy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, we should add some documentation on:

  • the requirements
  • how it works

@Avinash-1394 Avinash-1394 marked this pull request as ready for review April 5, 2023 02:15
@Avinash-1394
Copy link
Contributor Author

Overall LGTM, we should add some documentation on:

  • the requirements
  • how it works

@Jrmyy I have updated the README with instructions and also provided docstrings to explain what happens behind the scenes. Let me know if I need to add more details.

@Avinash-1394
Copy link
Contributor Author

@Avinash-1394 have a lock at monkeypatch to mock specific calls from a function - https://www.patricksoftwareblog.com/monkeypatching-with-pytest/

Regarding the issue with cached_property I left you a suggestion, it's should work, but I didn't test it.

@nicor88 I have added one test but mock_athena does not have support for these client calls since they were added very recently. Its making it a little difficult to test these. Reference

I am going to try and contribute to moto for these. But can we merge this and do that in a future MR? Or should we try to add more tests before we can merge this?

@nicor88
Copy link
Member

nicor88 commented Apr 7, 2023

@Avinash-1394 what you did here is an amazing feature, regarding mocking the requests could be done with mock patch, but let's consider to open an issue in moto and add a Todo in the tests.

I want to take some more time to review the changes proposed here, also @Jrmyy @mattiamatrix could you have a look too? Ideally we include this in the next release :)

@nicor88 nicor88 added the feature New feature or request label Apr 18, 2023
@nicor88 nicor88 mentioned this pull request Apr 18, 2023
@mattiamatrix mattiamatrix changed the title feat: Add python model support feat: add python model support Apr 18, 2023
@Avinash-1394 Avinash-1394 marked this pull request as draft April 18, 2023 12:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants