Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INTEGRATION][AIRFLOW] Introduce a new extractor for Trino #1288

Merged
merged 5 commits into from
Dec 13, 2022

Conversation

sekikn
Copy link
Contributor

@sekikn sekikn commented Nov 11, 2022

Signed-off-by: Kengo Seki sekikn@apache.org

Problem

Currently OL doesn't have a built-in extractor for Trino.

Solution

This PR add TrinoExtractor corresponding to TrinoOperator.

Note: All schema changes require discussion. Please link the issue for context.

  • Your change modifies the core OpenLineage model
  • Your change modifies one or more OpenLineage facets

If you're contributing a new integration, please specify the scope of the integration and how/where it has been tested (e.g., Apache Spark integration supports S3 and GCS filesystem operations, tested with AWS EMR).

Checklist

  • You've signed-off your work
  • Your pull request title follows our guidelines
  • Your changes are accompanied by tests (if relevant)
  • Your change contains a small diff and is self-contained
  • You've updated any relevant documentation (if relevant)
  • You've updated the CHANGELOG.md with details about your change under the "Unreleased" section (if relevant, depending on the change, this may not be necessary)
  • You've versioned the core OpenLineage model or facets according to SchemaVer (if relevant)
  • You've added a header to source files (if relevant)

Signed-off-by: Kengo Seki <sekikn@apache.org>
@boring-cyborg boring-cyborg bot added area:documentation Improvements or additions to documentation extractor area:integration/airflow openlineage-airflow area:spec Specifications and standards for the project labels Nov 11, 2022
@sekikn
Copy link
Contributor Author

sekikn commented Nov 11, 2022

Though I submitted a PR, there's one known issue in the current implementation. Let me explain it.

It correctly works if input and output database (or catalog, in Trino's terminology) is the same as the one TrinoOperator connects (it's specified by the extra parameter of Airflow's connection).
For example, given the following Airflow connection and DAG:

$ airflow connections get trino_default
id | conn_id       | conn_type | description | host      | schema  | login | password | port | is_encrypted | is_extra_encrypted | extra_dejson          | get_uri                                            
===+===============+===========+=============+===========+=========+=======+==========+======+==============+====================+=======================+====================================================
58 | trino_default | trino     |             | localhost | default | trino | None     | 3400 | False        | False              | {'catalog': 'memory'} | trino://trino@localhost:3400/default?catalog=memory
                                                                                                                                                                                                              
$ cat airflow/dags/trino.py 
from airflow import DAG
from airflow.providers.trino.operators.trino import TrinoOperator
from airflow.utils.dates import days_ago

with DAG(dag_id="test_dag", start_date=days_ago(7)) as dag:
    TrinoOperator(
        task_id="test_task",
        trino_conn_id="trino_default",
        sql="CREATE TABLE memory.default.customer2 AS SELECT * FROM memory.default.customer"
    )

In this case, the OL proxy receives the following event after triggering the DAG, which contains both input and output correctly.

INFO  [2022-11-11 09:25:32,950] io.openlineage.proxy.api.models.ConsoleLineageStream: {                                                                       
  "eventTime" : "2022-11-11T09:25:32.834882Z",                                                                                                                
  "eventType" : "COMPLETE",                                                                                                                                   
  "inputs" : [ {                                                                                                                                              
    "facets" : {                                                                                                                                              
      "dataSource" : {                                                                                                                                        
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",                                                           
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DataSourceDatasetFacet",                                                                                                                                                                           
        "name" : "trino://localhost:3400",                                                                                                                    
        "uri" : "trino://localhost:3400/default"                                                                                                              
      },                                                                                                                                                      
      "schema" : {                                                                                                                                            
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",                                                           
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",                                                                                                                                                                               
        "fields" : [ {                                                                                                                                        
          "name" : "custkey",                                                                                                                                 
          "type" : "bigint"                                                                                                                                   
        }, {                                                                                                                                                  
          "name" : "name",                                                                                                                                    
          "type" : "varchar(25)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "address",                                                                                                                                 
          "type" : "varchar(40)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "nationkey",                                                                                                                               
          "type" : "bigint"                                                                                                                                   
        }, {                                                                                                                                                  
          "name" : "phone",                                                                                                                                   
          "type" : "varchar(15)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "acctbal",                                                                                                                                 
          "type" : "double"                                                                                                                                   
        }, {                                                                                                                                                  
          "name" : "mktsegment",                                                                                                                              
          "type" : "varchar(10)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "comment",                                                                                                                                 
          "type" : "varchar(117)"                                                                                                                             
        } ]                                                                                                                                                   
      }                                                                                                                                                       
    },                                                                                                                                                        
    "name" : "memory.default.customer",                                                                                                                       
    "namespace" : "trino://localhost:3400"                                                                                                                    
  } ],                                                                                                                                                        
  "job" : {                                                                                                                                                   
    "facets" : {                                                                                                                                              
      "sql" : {                                                                                                                                               
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",                                                           
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet",                                                                                                                                                                                      
        "query" : "CREATE TABLE memory.default.customer2 AS SELECT * FROM memory.default.customer"                                                            
      }                                                                                                                                                       
    },                                                                                                                                                        
    "name" : "test_dag.test_task",                                                                                                                            
    "namespace" : "default"                                                                                                                                   
  },                                                                                                                                                          
  "outputs" : [ {                                                                                                                                             
    "facets" : {                                                                                                                                              
      "dataSource" : {                                                                                                                                        
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",                                                           
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DataSourceDatasetFacet",                                                                                                                                                                           
        "name" : "trino://localhost:3400",                                                                                                                    
        "uri" : "trino://localhost:3400/default"                                                                                                              
      },                                                                                                                                                      
      "schema" : {                                                                                                                                            
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",                                                           
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",                                                                                                                                                                               
        "fields" : [ {                                                                                                                                        
          "name" : "custkey",                                                                                                                                 
          "type" : "bigint"                                                                                                                                   
        }, {                                                                                                                                                  
          "name" : "name",                                                                                                                                    
          "type" : "varchar(25)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "address",                                                                                                                                 
          "type" : "varchar(40)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "nationkey",                                                                                                                               
          "type" : "bigint"                                                                                                                                   
        }, {                                                                                                                                                  
          "name" : "phone",                                                                                                                                   
          "type" : "varchar(15)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "acctbal",                                                                                                                                 
          "type" : "double"                                                                                                                                   
        }, {                                                                                                                                                  
          "name" : "mktsegment",                                                                                                                              
          "type" : "varchar(10)"                                                                                                                              
        }, {                                                                                                                                                  
          "name" : "comment",                                                                                                                                 
          "type" : "varchar(117)"                                                                                                                             
        } ]                                                                                                                                                   
      }                                                                                                                                                       
    },                                                                                                                                                        
    "name" : "memory.default.customer2",                                                                                                                      
    "namespace" : "trino://localhost:3400"                                                                                                                    
  } ],                                                                                                                                                        
  "producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",                                                                  
  "run" : {                                                                                                                                                   
    "facets" : { },                                                                                                                                           
    "runId" : "5990b3b8-9871-4250-8394-a16a514641b4"                                                                                                          
  }                                                                                                                                                           
}                                                                                                                                                             

But if the catalog of input or output table is differ from the one TrinoOperator connects, that table is not picked up by the extractor (Trino allows users to insert or join tables over catalogs).
In the following example, the catalog of the input table (tpch) is differ from connection's (memory), so it doesn't appear in the received event as input.

$ cat airflow/dags/trino.py 
from airflow import DAG
from airflow.providers.trino.operators.trino import TrinoOperator
from airflow.utils.dates import days_ago

with DAG(dag_id="test_dag", start_date=days_ago(7)) as dag:
    TrinoOperator(
        task_id="test_task",
        trino_conn_id="trino_default",
        #sql="CREATE TABLE memory.default.customer2 AS SELECT * FROM memory.default.customer"
        sql="CREATE TABLE memory.default.customer2 AS SELECT * FROM tpch.tiny.customer"
    )
INFO  [2022-11-11 09:30:04,757] io.openlineage.proxy.api.models.ConsoleLineageStream: {
  "eventTime" : "2022-11-11T09:30:04.664059Z",
  "eventType" : "COMPLETE",
  "inputs" : [ ],
  "job" : {
    "facets" : {
      "sql" : {
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet",
        "query" : "CREATE TABLE memory.default.customer2 AS SELECT * FROM tpch.tiny.customer"
      }
    },
    "name" : "test_dag.test_task",
    "namespace" : "default"
  },
  "outputs" : [ {
    "facets" : {
      "dataSource" : {
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DataSourceDatasetFacet",
        "name" : "trino://localhost:3400",
        "uri" : "trino://localhost:3400/default"
      },
      "schema" : {
        "_producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",
        "_schemaURL" : "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
        "fields" : [ {
          "name" : "custkey",
          "type" : "bigint"
        }, {
          "name" : "name",
          "type" : "varchar(25)"
        }, {
          "name" : "address",
          "type" : "varchar(40)"
        }, {
          "name" : "nationkey",
          "type" : "bigint"
        }, {
          "name" : "phone",
          "type" : "varchar(15)"
        }, {
          "name" : "acctbal",
          "type" : "double"
        }, {
          "name" : "mktsegment",
          "type" : "varchar(10)"
        }, {
          "name" : "comment",
          "type" : "varchar(117)"
        } ]
      }
    },
    "name" : "memory.default.customer2",
    "namespace" : "trino://localhost:3400"
  } ],
  "producer" : "https://github.com/OpenLineage/OpenLineage/tree/0.17.0/integration/airflow",
  "run" : {
    "facets" : { },
    "runId" : "ea82edce-2bc3-4cb3-a994-04f6e93cf6a2"
  }
}

This is because SqlExtractor doesn't support the multiple database situation for now. I'd like to fix it in the future, but it may be time-taking, especially if it requires to dig into openlineage-sql, because I'm not familar with Rust :)
So I'd like to land this PR first, since it still works within the same catalog.

@mobuchowski
Copy link
Member

As a note, there is separate Trino integration: https://github.com/takezoe/trino-openlineage/

In the end, internal integration is better because database always has the correct info, and it might be able to avoid any of the issues resulting from the use of a SQL parser.

But if the catalog of input or output table is differ from the one TrinoOperator connects, that table is not picked up by the extractor (Trino allows users to insert or join tables over catalogs).

I think the bug results from us not being able to get information about this table by using information_schema. I wonder whether Trino disallows this queries. @JDarDagran I think you worked on the table/database information schema query segregation - might be that you have something to add

@sekikn
Copy link
Contributor Author

sekikn commented Nov 16, 2022

As a note, there is separate Trino integration: https://github.com/takezoe/trino-openlineage/

In the end, internal integration is better because database always has the correct info

Totally agreed. I just got in touch with its author @takezoe, who is a famous developer in Japan as a creator of open source git-hosting software and an author of several Java and Scala books. He said that it is OK for him to contribute his code if the OL community welcomes it, but the current implementation is just a prototype and it will take some time to improve its quality. So I'm looking forward to his contribution in the future, when he finds some time to do it. ;)

By the way, I think this extractor is still useful in some cases, for example users leverage Trino but don't have its ownership. So I'll also investigate the real cause of the problem in question.

@mobuchowski
Copy link
Member

@sekikn Thanks for reaching out to @takezoe! We'll be happy to accept the contribution, and help with improving native Trino integration.

By the way, I think this extractor is still useful in some cases, for example users leverage Trino but don't have its ownership.

I agree. I'm happy to accept the current extractor, just that we have similar issue related to Airflow 2.1.4 as SFTP Extractor PR: #1263

In addition, could you open GitHub issue regarding multiple database problem?

@takezoe
Copy link

takezoe commented Nov 16, 2022

Thank you for summoning me :-)

My trino-openlineage implementation is very experimental as @sekikn mentioned so it may take some time to make it practical. Anyway, I will try to find a time to do it!

@JDarDagran
Copy link
Contributor

I think the bug results from us not being able to get information about this table by using information_schema. I wonder whether Trino disallows this queries. @JDarDagran I think you worked on the table/database information schema query segregation - might be that you have something to add

Yes, I did. @sekikn, you should set _is_information_schema_cross_db = True. It's not documented (we should certainly do that) but it tells if extractor should support cross database queries.

Integration tests need fixes - DAG lacks start_date, Trino has same port set as Airflow Webserver.

@sekikn
Copy link
Contributor Author

sekikn commented Nov 16, 2022

Thanks everyone, will update the PR!

@sekikn
Copy link
Contributor Author

sekikn commented Nov 20, 2022

Hmm, only setting _is_information_schema_cross_db to True doesn't seem to resolve the problem completely in this case. Input source is correctly extracted by setting that, but output isn't instead for some reason. I need some more time to figure out the cause...

@JDarDagran
Copy link
Contributor

JDarDagran commented Nov 20, 2022

@sekikn I'm not too familiar with how Trino works with Airflow. I tested it with postgresql and tpch catalogs and after this change it works. Somehow I could not create table in memory catalog with TrinoOperator and, in consequence, get output dataset. Maybe this has something to do? Sorry I can't help you more!

@sekikn
Copy link
Contributor Author

sekikn commented Nov 21, 2022

Thank you for the investigation, @JDarDagran! Yes, I also ensured it works in some cases, but it doesn't seem to work with the current integration test for some reason.

With _is_information_schema_cross_db = True, both input and output are correctly extracted at the first step in SQLExtractor.extract().

[2022-11-21T00:20:37.916+0000] {sql_extractor.py:55} DEBUG - Got meta {"in_tables": [DbTableMeta(DbTableMeta { database: Some("postgresql"), schema: Some("public"), name: "top_delivery_times" })], "out_tables": [DbTableMeta(DbTableMeta { database: Some("memory"), schema: Some("default"), name: "popular_orders_day_of_week" })] }

But the latter is lost from the event sent to the backend, and additionally, wrong database and schema+table seem to be wrongly combined. It still occurs if I inserted an additional task to create "default" schema inside the "memory" catalog before the insertion query.

[2022-11-21T00:20:41.269+0000] {http.py:121} DEBUG - Sending openlineage event {"eventTime": "2022-11-21T00:20:37.779912Z", "eventType": "START", "inputs": [{"facets": {"dataSource": {..., "name": "trino://trino:8080", "uri": "trino://trino:8080/default"}, "schema": {..., "fields": [{"name": "order_id", "type": "integer"}, ..., {"name": "driver_id", "type": "integer"}]}}, "name": "memory.public.top_delivery_times", "namespace": "trino://trino:8080"}], ..., "outputs": [], ...}

I'm not familar enough with the functions in sql_extractor.py and dbapi_utils.py yet, so I think I should understand them first.

@sekikn
Copy link
Contributor Author

sekikn commented Dec 11, 2022

But the latter is lost from the event sent to the backend, and additionally, wrong database and schema+table seem to be wrongly combined.

I finally figured out its cause. I had to give the optional column which refers to the database name in information_schema to TrinoExtractor, otherwise dbapi_utils assumes that all tables and columns are in the current database. Thank you for your help @JDarDagran @mobuchowski, I'll update the PR soon!

…fferent databases

Signed-off-by: Kengo Seki <sekikn@apache.org>
@sekikn
Copy link
Contributor Author

sekikn commented Dec 12, 2022

The CI failure occurred on the Spark tests and seems unrelated with this fix.

Copy link
Contributor

@JDarDagran JDarDagran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sekikn, thank you for your marvelous contribution!

@JDarDagran JDarDagran merged commit ab26286 into OpenLineage:main Dec 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:documentation Improvements or additions to documentation area:integration/airflow openlineage-airflow area:spec Specifications and standards for the project
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants