In [1]:
import dlt, requests
import pandas as pd

In [None]:
URL = "https://jaffle-shop.scalevector.ai/api/v1/customers"

req = requests.get(URL)

df = pd.json_normalize(req.json())

df.head()

In [None]:
# 1. dlt를 사용하여 S3의 CSV 파일을 데이터 소스로 읽어오는 코드 예시입니다.

import dlt
import pandas as pd
from dlt.sources.filesystem import filesystem

# dlt의 s3 파일 커넥터를 사용하여 S3의 CSV 파일을 읽어옵니다.
# dlt.sources.filesystem.filesystem_source를 사용합니다.

filesystem_source = filesystem(
  bucket_url="s3://lakeformation-demo-hyunsoo/dlt_dataset/",
  file_glob="*.csv"
)

# 데이터를 DataFrame으로 변환
emp_df = pd.DataFrame(list(filesystem_source))

# 데이터 확인
emp_df.head()


In [None]:
# S3에서 읽은 emp_df(DataFrame)를 PostgreSQL에 저장하는 dlt 파이프라인 생성
emp_pg_pipeline = dlt.pipeline(
    pipeline_name="emp_data_pg_pipeline3",
    destination="postgres",
    dataset_name="emp_data_pg"
)

# emp_df를 "employees" 테이블로 적재
emp_pg_load_info = emp_pg_pipeline.run(
    data=filesystem_source,
    table_name="employees3"
)

print("S3 CSV 데이터를 PostgreSQL에 저장한 결과:", emp_pg_load_info)


In [None]:
pipeline = dlt.pipeline(
    pipeline_name="customer_data_pipeline",
    destination="filesystem",
    dataset_name="customer_data"
)

load_info = pipeline.run(
    data=df, 
    table_name="customers"
    )

print("파이프라인 실행 결과:", load_info)

In [None]:
pd.read_parquet('./api_data_dir/customer_data/customers/')

In [None]:
import dlt

# PostgreSQL로 데이터를 저장하는 dlt 파이프라인 생성
# 동일 코드라도 데이터가 다르면 pipeline_name을 다르게 설정해야 함
pg_pipeline = dlt.pipeline(
    pipeline_name="customer_data_pg_pipeline2",
    destination="postgres",
    dataset_name="customer_data_pg"
)

# 데이터 적재 실행
pg_load_info = pg_pipeline.run(
    data=df,
    table_name="stores"
)

print("PostgreSQL 파이프라인 실행 결과:", pg_load_info)



In [None]:
import dlt

# AWS Athena로 데이터를 저장하는 dlt 파이프라인 생성
athena_pipeline = dlt.pipeline(
    pipeline_name="customer_data_athena_pipeline",
    destination="athena",
    dataset_name="customer_data_athena"
)

# 데이터 적재 실행
athena_load_info = athena_pipeline.run(
    data=df,
    table_name="customers"
)

print("Athena 파이프라인 실행 결과:", athena_load_info)

In [None]:


# Athena에서 S3 버킷에 접근 권한이 없어서 발생하는 에러입니다.
# Lake Formation 또는 S3 버킷의 권한 설정을 확인해야 합니다.
# 아래는 문제 해결을 위한 안내 코드와 설명입니다.

print("""
[문제 원인]
- Athena가 S3 버킷(s3://personal-golight-image-bucket/dataset/customer_data_athena/_dlt_pipeline_state)에 접근할 권한이 없습니다.
- Lake Formation 또는 S3 버킷의 권한 정책에서 Athena(및 Glue, Data Catalog) 역할/사용자에게 충분한 권한이 부여되어야 합니다.

[해결 방법]
1. AWS 콘솔에서 S3 버킷(s3://personal-golight-image-bucket)에 대해 다음 권한을 부여하세요.
   - s3:GetObject, s3:PutObject, s3:DeleteObject, s3:ListBucket 등
   - 권한을 부여할 주체: Athena, Glue, Data Catalog에서 사용하는 IAM Role 또는 User

2. Lake Formation을 사용하는 경우:
   - Lake Formation 콘솔에서 해당 S3 경로에 대해 Athena, Glue, Data Catalog에 '데이터 위치 권한'을 부여하세요.
   - '데이터 위치' 등록 및 '데이터 위치 권한' 부여 필요

3. 권한이 정상적으로 부여된 후 파이프라인을 다시 실행하세요.

[예시: S3 버킷 정책]
{
  "Effect": "Allow",
  "Principal": {
    "Service": [
      "athena.amazonaws.com",
      "glue.amazonaws.com"
    ],
    "AWS": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/<YOUR_ATHENA_GLUE_ROLE>"
  },
  "Action": [
    "s3:GetObject",
    "s3:PutObject",
    "s3:DeleteObject",
    "s3:ListBucket"
  ],
  "Resource": [
    "arn:aws:s3:::personal-golight-image-bucket/dataset/*",
    "arn:aws:s3:::personal-golight-image-bucket/dataset"
  ]
}

[참고]
- 권한 변경 후에도 문제가 지속되면, Lake Formation에서 '데이터 위치 권한'과 '테이블 권한'을 모두 확인하세요.
- 자세한 내용은 AWS 공식 문서(https://docs.aws.amazon.com/ko_kr/athena/latest/ug/lake-formation.html) 참고

""")


In [None]:
import dlt, requests
import pandas as pd


def get_customer_data(url: str) -> pd.DataFrame:
    """API로부터 고객 데이터를 가져오는 함수"""
    response = requests.get(url)
    df = pd.json_normalize(response.json())
    return df
    
def load_to_s3(data: pd.DataFrame, **kwargs):
    """데이터를 S3에 적재하는 함수"""
    pipeline = dlt.pipeline(
        pipeline_name=kwargs["pipeline_name"],
        destination="filesystem", 
        dataset_name=kwargs["dataset_name"]
    )
    return pipeline

In [None]:
API_URL = "https://jaffle-shop.scalevector.ai/api/v1/customers"

# 데이터 가져오기
customer_df = get_customer_data(API_URL)

# S3에 데이터 적재
pipeline = load_to_s3(
    data=customer_df,
    pipeline_name="customer_data_pipeline",
    dataset_name="customer_data"
    )

In [None]:
pipeline.run(
    data=customer_df, 
    table_name="customers",
    write_disposition="append"
    )

In [None]:
import dlt, requests
import pandas as pd
from dlt.sources.filesystem import filesystem


def get_customer_data(url: str) -> pd.DataFrame:
    """API로부터 고객 데이터를 가져오는 함수"""
    response = requests.get(url)
    df = pd.json_normalize(response.json())
    return df

def load_to_s3(data: pd.DataFrame, **kwargs):
    """데이터를 S3에 적재하는 함수"""
    pipeline = dlt.pipeline(
        pipeline_name=kwargs["pipeline_name"],
        destination="filesystem", 
        dataset_name=kwargs["dataset_name"]
    )
    return pipeline

if __name__ == "__main__":
    # API URL 설정
    API_URL = "https://jaffle-shop.scalevector.ai/api/v1/customers"
    
    # 데이터 가져오기
    customer_df = get_customer_data(API_URL)
    
    # S3에 데이터 적재
    pipeline = load_to_s3(
        data=customer_df,
        pipeline_name="customer_data_pipeline",
        dataset_name="customer_data"
        )
    
    pipeline.run(data=customer_df, table_name="customers")
    
    print("파이프라인 실행 결과:", pipeline)




In [None]:
# GCS의 parquet 파일을 소스로 하여 PostgreSQL에 적재하는 dlt 파이프라인 예시입니다.

import dlt
import pandas as pd
from dlt.sources.filesystem import filesystem, read_parquet

source = filesystem(
    bucket_url="gs://hyunsoo_de_bucket/dataset/pokemon/",
    file_glob="*.parquet"
)

filesystem_pipe = (source | read_parquet())

filesystem_pipe

In [None]:
# dlt 파이프라인 생성 (destination: postgres)
pipeline = dlt.pipeline(
    pipeline_name="gcs_to_postgres_pipeline",
    destination="duckdb",
)
    # dataset_name="public"  # PostgreSQL의 스키마명

# 파이프라인 실행 (parquet 파일을 customers 테이블로 적재, 필요시 테이블명 변경)
load_info = pipeline.run(
    filesystem_pipe.with_name("pokemon"),
)
    # table_name="pokemon",
    # write_disposition="replace"  # 기존 테이블 덮어쓰기, append로 변경 가능

print("파이프라인 실행 결과:", load_info)

In [None]:
## GCS 를 source로 하는 파이프라인 생성
import dlt
from dlt.sources.filesystem import filesystem
from dlt.sources.filesystem import readers

@dlt.source
def gcs_pokemon_source():
    @dlt.resource(table_name="emp", write_disposition="replace")
    def get_parquet_data():
        yield from readers(
            bucket_url="gs://hyunsoo_de_bucket/dataset/",
            file_glob="emp.parquet"
        ).read_parquet()

    @dlt.resource(table_name="emp_csv", write_disposition="replace")
    def get_csv_data():
        yield from readers(
            bucket_url="gs://hyunsoo_de_bucket/dataset/",
            file_glob="emp.csv"
        ).read_csv()

    return get_csv_data

if __name__ == "__main__":
    pipeline = dlt.pipeline(
        pipeline_name="gcs_pokemon_pipeline",
        destination="postgres",
        dataset_name="docker_pg"
    )
    result = pipeline.run(gcs_pokemon_source())
    print("파이프라인 실행 결과:", result)


In [None]:
## Database를 source로 하는 파이프라인 생성
# https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database/configuration

import dlt
from dlt.sources.filesystem import filesystem
from dlt.sources.filesystem import readers

@dlt.source
def gcs_pokemon_source():
    @dlt.resource(table_name="emp_c", write_disposition="replace")
    def get_csv_data():
        yield from readers(
            bucket_url="gs://hyunsoo_de_bucket/dataset/",
            file_glob="emp.csv"
        ).read_csv()

    return get_csv_data

aa = gcs_pokemon_source()

In [None]:
aa.run(
    destination="postgres",
    dataset_name="docker_pg"
)

In [13]:
## PG 데이터베이스를 소스로 하는 파이프라인 생성
# uv add 'dlt[sql_database]'
import dlt
from dlt.sources.sql_database import sql_database

# 테이블 이름을 지정하면 해당 테이블만 불러옴
# 테이블 이름을 지정하지 않으면 데이터가 있는 모든 테이블을 일단 불러옴
source = sql_database(
    table_names=['ducklake_metadata', 'ducklake_schema']
)

pipeline = dlt.pipeline(
    pipeline_name="pg_to_gcs",
    destination='filesystem',
    dataset_name="pg_data"
)

In [16]:
source

<@dlt.source(name='sql_database', n_resources=21, resources=['ducklake_metadata', 'ducklake_snapshot', 'ducklake_snapshot_changes', 'ducklake_schema', 'ducklake_table', 'ducklake_view', 'ducklake_tag', 'ducklake_column_tag', 'ducklake_data_file', 'ducklake_file_column_statistics', 'ducklake_delete_file', 'ducklake_column', 'ducklake_table_stats', 'ducklake_table_column_stats', 'ducklake_partition_info', 'ducklake_partition_column', 'ducklake_file_partition_value', 'ducklake_files_scheduled_for_deletion', 'ducklake_inlined_data_tables', 'ducklake_column_mapping', 'ducklake_name_mapping'])>

In [None]:
# Run the pipeline
"""
pipeline.run(data=source) 이렇게만 적으면 데이터가 있는 모든 테이블을 로드함.
ROW가 0이면 로드하지 않음.
"""
info = pipeline.run(
    data=source, 
    )
    # table_name="ducklake_snapshot_changes",
    # write_disposition="replace"

# Print load info
print(info)

  - file_order
  - partition_id
  - encryption_key
  - partial_file_info
  - mapping_id
  - parent_column

Unless type hints are provided, these columns will not be materialized in the destination.
One way to provide type hints is to use the 'columns' argument in the '@dlt.resource' decorator.  For example:

@dlt.resource(columns={'file_order': {'data_type': 'text'}})

  - file_order
  - partition_id
  - encryption_key
  - partial_file_info
  - mapping_id
  - parent_column

Unless type hints are provided, these columns will not be materialized in the destination.
One way to provide type hints is to use the 'columns' argument in the '@dlt.resource' decorator.  For example:

@dlt.resource(columns={'file_order': {'data_type': 'text'}})



Pipeline pg_to_gcs load step completed in 3.46 seconds
1 load package(s) were loaded to destination filesystem and into dataset pg_data
The filesystem destination used gs://hyunsoo_de_bucket/dlt_destination location to store data
Load package 1752153853.246578 is LOADED and contains no failed jobs
