Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

375 filesystem storage staging #451

Merged
merged 67 commits into from
Jul 15, 2023
Merged

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Jun 23, 2023

´

@netlify
Copy link

netlify bot commented Jun 23, 2023

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit 4ae7fe6
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/64b2cb94d2d5780008fe8a52
😎 Deploy Preview https://deploy-preview-451--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@sh-rp sh-rp force-pushed the d#/375-filesystem-storage-staging branch from 41ec238 to 1d42d4b Compare June 23, 2023 17:36
@sh-rp sh-rp force-pushed the d#/375-filesystem-storage-staging branch from 89aa188 to 6af9a5c Compare June 25, 2023 16:44
@sh-rp sh-rp force-pushed the d#/375-filesystem-storage-staging branch from 651fdeb to 876ec3f Compare June 26, 2023 14:25
@sh-rp sh-rp force-pushed the d#/375-filesystem-storage-staging branch from 5d4c6e3 to fed914b Compare June 26, 2023 15:05
dlt/common/destination/capabilities.py Outdated Show resolved Hide resolved
dlt/common/destination/capabilities.py Outdated Show resolved Hide resolved
dlt/common/destination/reference.py Show resolved Hide resolved
dlt/destinations/bigquery/bigquery.py Outdated Show resolved Hide resolved
dlt/destinations/bigquery/bigquery.py Outdated Show resolved Hide resolved
@@ -113,6 +115,8 @@ def pipeline(
pipelines_dir = get_dlt_pipelines_dir()

destination = DestinationReference.from_name(destination or kwargs["destination_name"])
staging = DestinationReference.from_name(staging or kwargs["staging_name"]) if staging is not None else None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you could also check if the spec support the staging client configuration (is a subclass of)( see my remarks above)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it done?

@@ -186,15 +186,17 @@ def __init__(
progress: _Collector,
must_attach_to_local_pipeline: bool,
config: PipelineConfiguration,
runtime: RunConfiguration
runtime: RunConfiguration,
staging: DestinationReference = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after destination I'd say

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think run method takes destination so it should take staging

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one important thing: presence of staging must be serialized to state. see _state_to_props and _props_to_state way below
you must test this: if after restore state from destination staging is funvctional

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what else you need to do: maybe we should all staging information to LoadInfo? in reference.py? so when you print it is is clear that staging was used.

also the dlt pipeline ... info command should display staging information

same for built in streamlit app

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the pipeline info automatically picked up the staging info after I changed the props to state stuff, no?

dlt/pipeline/pipeline.py Show resolved Hide resolved
dlt/pipeline/pipeline.py Outdated Show resolved Hide resolved
dlt/pipeline/pipeline.py Outdated Show resolved Hide resolved
@sh-rp sh-rp force-pushed the d#/375-filesystem-storage-staging branch from 77d2e22 to 1d4b648 Compare July 10, 2023 18:37
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go for DestinationClientDwhWithStagingConfiguration which is a destination that contains staging config we must go full ahead and refactor the whole thing.
ie.

  • DestinationClientStagingConfiguration is not needed. the embedded config is always staging
  • no need for a staging parameter in the Pipeline class. it is simply part of the destination.

you also de facto force the usage of filesystem as staging (which is fine maybe?)

let's discuss it before I move forward with review

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I like it now. I suggested some simplifications that we can made. also there will be less code :) thanks for the docs update. I'll review that and push my changes still to this branch



@configspec
class AwsCredentialsPure(CredentialsConfiguration, CredentialsWithDefault):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that should be like that from the very start. I'd just keep the convention we have in GcpCredentials

  1. you can keep it in one file
  2. AwsCredentialsPure -> rename to AwsCredentialsWithoutDefaults
  3. AwsCredentialsPure -> do not inherit from CredentialsWithDefault
  4. AwsCredentials -> inherit from CredentialsWithDefault

there are several places (ie. dbt helper) which checks for CredentialsWithDefault inheritance and then calls the CredentialsWithDefault interface. so it should be implemented where we really do that (check and retrieve default credentials)

...

@configspec(init=True)
class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfiguration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I like this idea! so let's double down on it:

  1. seems you do not need the full config just staging credentials!
  2. put those credentials to DestinationClientDwhConfiguration as staging_credentials, use generic credentials type (see the credentials in base class)
  3. another big +1: user can fill those credentials explicitly (it will not work fully now - we will deal with that later)
  4. we do not need DestinationClientDwhWithStagingConfiguration

dlt/common/destination/reference.py Show resolved Hide resolved
destination_file_name = LoadFilesystemJob.make_destination_filename(file_name, schema_name, load_id)
fs_client.put_file(local_path, posixpath.join(dataset_path, destination_file_name))
self.destination_file_name = LoadFilesystemJob.make_destination_filename(file_name, schema_name, load_id)
fs_client.put_file(local_path, posixpath.join(dataset_path, self.destination_file_name))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use make_remote_path instead of posixpath.join that will work with fsspec (90% sure ;>)

@@ -58,6 +59,38 @@ def is_sql_job(file_path: str) -> bool:
return os.path.splitext(file_path)[1][1:] == "sql"


class CopyFileLoadJob(LoadJob, FollowupJob):
def __init__(self, table: TTableSchema, file_path: str, sql_client: SqlClientBase[Any], forward_staging_credentials: bool = True, staging_config: Optional[DestinationClientStagingConfiguration] = None) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we do not need forward_staging_credentials. we forward when staging credentials are present and that's it!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed!

return "completed"

@staticmethod
def get_bucket_path(file_path: str) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is still not moved. I think those method are way more generic to be in copy job class

files_clause = ""
stage_file_path = ""

# create storage integration stage for reference jobs if defined, for now gcs only
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is AFAIK outdated

credentials_clause = f"""CREDENTIALS=(AWS_KEY_ID='{aws_access_key}' AWS_SECRET_KEY='{aws_secret_key}')"""
from_clause = f"FROM '{bucket_path}'"
# create a stage if so defined
elif stage_name:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd always assume that stage exists if name is provided. so we can drop this part and simplify if condition

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure why this is done this way, this was like this before I started working on snowflake, so I don't understand the reasoning tbh. I will keep it for now and if you insist I will change it :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh when we wrote the code I didn't know exact how stages work in snowflake. there's no use case where creating a named stage makes sense.

  • if stage_name is set we assume that it exist and use it
  • if it is not set and we have a bucket url (external files) then we fail the load (terminal exception) with a user friendly message
  • for local load and no stage we use built in table stage (we do it now)

else:
# Use implicit table stage by default: "SCHEMA_NAME"."%TABLE_NAME"
stage_name = client.make_qualified_table_name('%'+table_name)

if not bucket_path: # this means we have a local file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

theoretically someone can use file:// with staging. do you think it makes sense to handle it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, how would file:// end up in this job? We either have reference jobs to buckets coming in or the parquet files that come out of the normaliser stage with the right file_path present. Or am I missing something here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use filesystem with file

staging_client = self._get_staging_client(self.default_schema)
# inject staging config into destination config, TODO: Not super clean I think?
if isinstance(client.config, DestinationClientDwhWithStagingConfiguration):
client.config.staging_config = staging_client.config # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd set it if staging config is not already set

@sh-rp
Copy link
Collaborator Author

sh-rp commented Jul 13, 2023

Not sure where you want def get_bucket_path(file_path: str) -> str: to be, I moved it into CopyFileLoadJob, but maybe you prefer it to be in the reference job?

# Conflicts:
#	docs/website/docs/dlt-ecosystem/destinations/redshift.md
@rudolfix
Copy link
Collaborator

Not sure where you want def get_bucket_path(file_path: str) -> str: to be, I moved it into CopyFileLoadJob, but maybe you prefer it to be in the reference job?

yes we should. there's a review comment to do that hanging for some time :)

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just 2 small things

dataset_name: str = None,
default_schema_name: Optional[str] = None,
as_staging: bool = False,
bucket_url: str = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not present in the init

return "completed"

@staticmethod
def get_bucket_path(file_path: str) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve_remote_path would be better name IMO

@rudolfix rudolfix force-pushed the d#/375-filesystem-storage-staging branch from faddc8a to d8f2f2c Compare July 15, 2023 15:40
@rudolfix rudolfix force-pushed the d#/375-filesystem-storage-staging branch from d8f2f2c to 4ae7fe6 Compare July 15, 2023 16:38
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sh-rp I've fixed a few things and this is ready to go! thanks, that was a lot of work,

here are my final fixes: 4ae7fe6
let's go through them on monday there's a few learnings there

@rudolfix rudolfix merged commit fb9dabf into devel Jul 15, 2023
35 checks passed
@rudolfix rudolfix deleted the d#/375-filesystem-storage-staging branch July 15, 2023 19:45
codingcyclist pushed a commit to codingcyclist/dlt that referenced this pull request Aug 23, 2023
* add creation of reference followup jobs

* copy job

* add parquet format

* make staging destination none by default

* add automatic resolving of correct file format

* renaming staging destination to staging

* refactor redshift job and inject fs config info

* small cleanup and full parquet file loading test

* add redshift test

* fix resuming existing jobs

* linter fixes and something else i forgot

* move reference job follow up creation into job

* add bigquery staging with gcs

* add jsonl loading for bigquery staging

* better supported file format resolution

* move to existing bigquery load job

* change pipeline args order

* add staging run arg

* some more pipeline fixes

* configure staging via config

* enhance staging load tests

* fix merge disposition on redshift

* add comprehensive staging tests

* fix redshift jsonl loading

* add doc page (not in hierarchy for now)

* move redshift credentials testing to redshift loading location

* change rotation test

* change timing test

* implement snowflake file staging

* switch to staging instead of integration

* add s3 stage (which does not currently work)

* filter out certain combinations for tests

* update docs for snowflake staging

* forward staging config to supported destination configuration types

* move boto out of staging credentials

* authentication to s3 with iam role for redshift

* verify support for snowflake s3 stage and update docs for this

* adds named put stage to snowflake, improves exception messages, fixes and re-enables staging tests

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants