Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for s3 compatible object storage #750

Closed
mohittalele opened this issue Aug 25, 2022 · 10 comments
Closed

Support for s3 compatible object storage #750

mohittalele opened this issue Aug 25, 2022 · 10 comments
Assignees
Labels
priority/high High priority product/python-sdk Label describing products
Milestone

Comments

@mohittalele
Copy link

Describe the bug
I am trying to use the astr sdk with airflow 2.3.3, postgresql as database and minio as an s3 compatible object storage
Version

  • Astro: [1.0.2]
  • OS: [ Debian]
  • Airflow : 2.3.3

To Reproduce
Steps to reproduce the behavior:

  1. Write the task to load the csv file stored in pandas dataframe or database
test_s3_load = aql.load_file(
       input_file=File(path=f"{s3_bucket}/data/falcon_heartbeat.csv", conn_id="s3_conn"),
   )
my_homes_table = aql.load_file(
   input_file=File(path=f"{s3_bucket}/data/falcon_heartbeat.csv", conn_id="s3_conn"),
   # [START temp_table_example]  skipcq: PY-W0069
   output_table=Table(
       conn_id="falcon_postgresql_conn",
       name="astro_example_test",
   )
  1. Create connection using env variables as :
AIRFLOW_CONN_S3_CONN: 's3://MINIO_USERNAME:MINIO_PASSWORD@?host=http%3A%2F%2Fminio.airflow.svc.cluster.local%3A9000'
AIRFLOW_CONN_FALCON_POSTGRESQL_CONN: 'postgresql://falcon:falcon@falcon-postgres-postgresql.airflow.svc.cluster.local:5432/falcon_db'
  1. Run using the dag
  2. See error
  File "/home/airflow/.local/lib/python3.7/site-packages/botocore/retryhandler.py", line 374, in _check_caught_exception
    raise caught_exception
  File "/home/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 249, in _do_get_response
    http_response = self._send(request)
  File "/home/airflow/.local/lib/python3.7/site-packages/botocore/endpoint.py", line 321, in _send
    return self.http_session.send(request)
  File "/home/airflow/.local/lib/python3.7/site-packages/botocore/httpsession.py", line 434, in send
    raise EndpointConnectionError(endpoint_url=request.url, error=e)
botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "https://falcon.s3.amazonaws.com/data/falcon_heartbeat.csv"
[2022-08-25, 16:54:05 UTC] {taskinstance.py:1420} INFO - Marking task as FAILED. dag_id=example_amazon_s3_***, task_id=load_file, execution_date=20220825T165259, start_date=20220825T165401, end_date=20220825T165405
[2022-08-25, 16:54:05 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 125 for task load_file (Could not connect to the endpoint URL: "https://falcon.s3.amazonaws.com/data/falcon_heartbeat.csv"; 73)

I have tested the connection to postgresql service and minio service in my k8s cluster. They all work. From the error msg it is clear that it is trying to connect to amaon-aws.

@kaxil
Copy link
Collaborator

kaxil commented Aug 31, 2022

Thanks for creating the issue. cc @utkarsharma2

@utkarsharma2 utkarsharma2 self-assigned this Sep 5, 2022
@utkarsharma2
Copy link
Collaborator

utkarsharma2 commented Sep 5, 2022

@mohittalele

I'm seeing different results with below code:

import os
from datetime import datetime, timedelta

from airflow.models import DAG
from pandas import DataFrame

from astro import sql as aql
from astro.files import File
from astro.sql.table import Table

s3_bucket = os.getenv("S3_BUCKET", "s3://tmp9")
os.environ["AIRFLOW_CONN_S3_CONN"] = "s3://MINIO_USERNAME:MINIO_PASSWORD@?host=http%3A%2F%2Fminio.airflow.svc.cluster.local%3A9000"

default_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": 0,
}

dag = DAG(
    dag_id="example_amazon_s3_postgres",
    start_date=datetime(2019, 1, 1),
    max_active_runs=3,
    schedule_interval=timedelta(minutes=30),
    default_args=default_args,
)

with dag:
    test_s3_load = aql.load_file(
        input_file=File(path=f"{s3_bucket}/data/falcon_heartbeat.csv", conn_id="s3_conn"),
    )

Output:

  File "/Users/utkarsharma/sandbox/astronomer/astro/.nox/dev/lib/python3.8/site-packages/botocore/httpsession.py", line 434, in send
    raise EndpointConnectionError(endpoint_url=request.url, error=e)
botocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "http://minio.airflow.svc.cluster.local:9000/tmp9?list-type=2&prefix=data%2Ffalcon_heartbeat.csv&delimiter=&start-after=&encoding-type=url"
[2022-09-05 18:58:59,114] {backfill_job.py:190} ERROR - Task instance <TaskInstance: example_amazon_s3_postgres.load_file backfill__2016-01-01T00:00:00+00:00 [failed]> failed
[2022-09-05 18:58:59,121] {dagrun.py:549} ERROR - Marking run <DagRun example_amazon_s3_postgres @ 2016-01-01T00:00:00+00:00: backfill__2016-01-01T00:00:00+00:00, externally triggered: False> failed
[2022-09-05 18:58:59,122] {dagrun.py:609} INFO - DagRun Finished: dag_id=example_amazon_s3_postgres, execution_date=2016-01-01T00:00:00+00:00, run_id=backfill__2016-01-01T00:00:00+00:00, run_start_date=2022-09-05 13:28:15.436141+00:00, run_end_date=2022-09-05 13:28:59.122271+00:00, run_duration=43.68613, state=failed, external_trigger=False, run_type=backfill, data_interval_start=2016-01-01T00:00:00+00:00, data_interval_end=2016-01-01T00:30:00+00:00, dag_hash=None
[2022-09-05 18:58:59,123] {backfill_job.py:367} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0

for me AstroSDK is picking the right URL since I don't have MINIO on my local, failing to connect to it.

Are you sure AIRFLOW_CONN_S3_CONN is the env var that is used to create a connection?

@mohittalele
Copy link
Author

Interesting. I am pretty sure that connection is created using env since there are non-astro sdk dags which are running fine. But i will double check with your code.

Between which astro sdk version did you used to test ?

@utkarsharma2
Copy link
Collaborator

@mohittalele Can you try above dag and see if it works for you?

I was using version - 1.0.2

@mohittalele
Copy link
Author

@utkarsharma2
I am also using the same version. I still get the same error upon running your code.
It looks like there is some redirect going on.
https://gist.github.com/mohittalele/74d7133c852de407aa3fa20a78d0b627

@mohittalele
Copy link
Author

@utkarsharma2 were you able to reproduce it ?

@utkarsharma2
Copy link
Collaborator

@mohittalele I'm still seeing the same results on local. Can you try to create a separate instance with the new airflow home dir and run the above-listed dag?

@pankajkoti pankajkoti added the priority/high High priority label Sep 14, 2022
@kaxil kaxil added the product/python-sdk Label describing products label Oct 6, 2022
@kaxil
Copy link
Collaborator

kaxil commented Oct 20, 2022

@fletchjeff was trying it with minio and he had some issues too: https://astronomer.slack.com/archives/C02B8SPT93K/p1666289425280609

@fletchjeff
Copy link
Contributor

I think the issue here is that the transport_params argument needs to be modified to use smart_open with minio.

This works for me running minio locally with a normal python session:

from smart_open import open
import os, boto3

client = boto3.client('s3', 
    endpoint_url='http://localhost:9000/',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin',
    )

for line in open("s3://test/smaller.csv",transport_params={'client': client}):
    print(line)

The smart_open call is here:

 with smart_open.open(self.path, mode="wb", transport_params=self.location.transport_params) as stream:

I dont' think that transport_params=self.location.transport_params has the required information, specifically endpoint_url

@phanikumv phanikumv added this to the 1.3.0 milestone Oct 26, 2022
@utkarsharma2 utkarsharma2 modified the milestones: 1.3.0, 1.2.1 Oct 27, 2022
utkarsharma2 added a commit that referenced this issue Oct 28, 2022
# Description
## What is the current behavior?
Currently, we were not able to connect to minio(https://min.io/) as a
replacement for S3.
Impact: load_file(), export_file() and minio as custom xcom backend

related: #750
and slack thread -
https://astronomer.slack.com/archives/C02B8SPT93K/p1666289425280609


## What is the new behavior?
Now we are passing all the required parameters to open_smart() to be
able to communicate to minio server.

## Does this introduce a breaking change?
Nope

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
@sunank200
Copy link
Contributor

@mohittalele this PR resolves the issue. Please check on your side.

utkarsharma2 added a commit that referenced this issue Nov 4, 2022
# Description
## What is the current behavior?
Currently, we were not able to connect to minio(https://min.io/) as a
replacement for S3.
Impact: load_file(), export_file() and minio as custom xcom backend

related: #750
and slack thread -
https://astronomer.slack.com/archives/C02B8SPT93K/p1666289425280609


## What is the new behavior?
Now we are passing all the required parameters to open_smart() to be
able to communicate to minio server.

## Does this introduce a breaking change?
Nope

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority/high High priority product/python-sdk Label describing products
Projects
None yet
Development

No branches or pull requests

7 participants