Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new import logs senario #315

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 38 additions & 0 deletions scenarios/import_td_logs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Workflow: Import Treasure Data Logs from Data Landing Area
This example shows how you can use workflow to ingest Treasure Data Logs From Data Landing Areas to your Treasure Data account.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a statement;
"This is Opt-in feature. Contact your Customer Success rep or Technical Support if you have an interest in this feature."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Will fix.

This is Opt-in feature. Please contact your Customer Success rep or Technical Support if you have an interest in this feature.

# How to Run
## Requirement
The workflow requires that Data Landing Areas feature is enabled in your Treasure Data account and then you've got your User ID to access to it.

## Steps
First, edit configurations. You can find the following settings in the `import_td_logs.dig` file.

| Parameter | Description |
| ---- | ---- |
| api_endpoint | The endpoint of the Treasure Data API. See this [document](https://docs.treasuredata.com/display/public/PD/Sites+and+Endpoints). (e.g. https://api.treasuredata.com) |
| dla_host | The hostname of the Data Landing Area (e.g. dla1.treasuredata-co.jp) |
| user_id | Your user_id received from TD when you enabled Data Landing Areas feature |
| site | The site of your account (e.g. aws, aws-tokyo, eu01, ap02) |
| account_id | Your TD account_id |
| query_logs_table | The table name where query logs are stored (e.g. query_logs) |
| workflow_logs_table | The table name where workflow logs are stored (e.g. workflow_logs) |
| users_table | The table name where users data are stored (e.g. users) |

Next, upload the workflow to Treasure Data.

# Upload
$ td wf push import_td_logs

Set secrets with your private key that is the rest of public key you gave to TD when you enabled Data Landing Areas feature.

$ td wf secrets --project import_td_logs --set sftp.dla_secret_key_file=@~/.ssh/id_rsa_dla
$ td wf secrets --project import_td_logs --set td.apikey

You can trigger the session manually to watch it execute.

# Run
$ td wf start import_td_logs import_td_logs --session now

If you have any questions, contact to support@treasuredata.com.
43 changes: 43 additions & 0 deletions scenarios/import_td_logs/config/query_log.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
in:
type: sftp
host: ${dla_host}
user: ${user_id}
secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"}
path_prefix: "/treasure-data-logs/${site}/${account_id}/query_logs/v1/data.csv"
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ","
quote: "\""
escape: "\""
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: date, type: string}
- {name: account_id, type: string}
- {name: user_id, type: string}
- {name: job_id, type: long}
- {name: created_at, type: string}
- {name: scheduled_at, type: string}
- {name: start_at, type: string}
- {name: end_at, type: string}
- {name: queued_sec, type: long}
- {name: running_sec, type: long}
- {name: result_type, type: string}
- {name: load_type, type: string}
- {name: records, type: long}
- {name: type, type: string}
- {name: query_status, type: string}
- {name: result_size, type: long}
- {name: query_id, type: string}
- {name: split_hours, type: double}
- {name: average_hive_cores, type: double}
- {name: project_name, type: string}
- {name: workflow_name, type: string}
- {name: task_id, type: string}
- {name: time, type: long}
out:
mode: replace
41 changes: 41 additions & 0 deletions scenarios/import_td_logs/config/workflow_log.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
in:
type: sftp
host: ${dla_host}
user: ${user_id}
secret_key_file: {"content": "${secret:sftp.dla_secret_key_file}"}
path_prefix: "/treasure-data-logs/${site}/${account_id}/workflow_logs/v1/data.csv"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you block access to the other customer data?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's systematically separated on our Data Landing Areas feature that is using AWS Transfer Family.
So, a customer can only access to the space of the corresponding to the published user_id by TD.
https://treasure-data.atlassian.net/wiki/spaces/EN/pages/1740343892/ERD+Data+Landing+Areas+SFTP
https://docs.aws.amazon.com/ja_jp/transfer/latest/userguide/how-aws-transfer-works.html

parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ","
quote: "\""
escape: "\""
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: account_id, type: string}
- {name: project_id, type: string}
- {name: workflow_id, type: string}
- {name: session_id, type: string}
- {name: attempt_id, type: string}
- {name: task_id, type: string}
- {name: user_id, type: string}
- {name: project_name, type: string}
- {name: workflow_name, type: string}
- {name: timezone, type: string}
- {name: session_time, type: string}
- {name: attempt_created_at, type: string}
- {name: attempt_finished_at, type: string}
- {name: task_name, type: string}
- {name: task_start_at, type: string}
- {name: task_end_at, type: string}
- {name: attempt_running_sec, type: string}
- {name: task_running_sec, type: string}
- {name: state, type: string}
- {name: date, type: string}
- {name: time, type: long}
out:
mode: replace
43 changes: 43 additions & 0 deletions scenarios/import_td_logs/import_td_logs.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
timezone: UTC

schedule:
daily>: 03:00:00

_export:
td:
database: treaure-data-logs
api_endpoint: https://api.treasuredata.com
dla_host: dla1.treasuredata-co.jp
user_id: abcdefg012345
site: aws
account_id: 1
query_logs_table: query_logs
workflow_logs_table: workflow_logs
users_table: users

+create_databases:
td_ddl>:
create_databases: [${td.database}]

+create_table:
td_ddl>:
create_tables: [${query_logs_table}, ${workflow_logs_table}, ${users_table}]

+import:
+query_logs:
td_load>: config/query_log.yml
table: ${query_logs_table}

+workflow_logs:
td_load>: config/workflow_log.yml
table: ${workflow_logs_table}

+users:
_env:
TD_API_KEY: ${secret:td.apikey}
py>: script.import_td_users.import_users
database: ${td.database}
table: ${users_table}
api_endpoint: ${api_endpoint}
docker:
image: "digdag/digdag-python:3.9"
20 changes: 20 additions & 0 deletions scenarios/import_td_logs/script/import_td_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
import sys
os.system(f"{sys.executable} -m pip install -U pandas requests pytd==1.3.0")
import pandas as pd
import pytd
import requests

td_apikey = os.getenv("TD_API_KEY")


def import_users(database, table, api_endpoint):
# get users data
headers = {'Authorization': 'TD1 {}'.format(td_apikey)}
r = requests.get('{}/v3/user/list'.format(api_endpoint), headers=headers)

# write users data
df = pd.json_normalize(r.json(), record_path=['users'])
client = pytd.Client(apikey=td_apikey, database=database)
client.load_table_from_dataframe(
df, table, writer='bulk_import', if_exists='overwrite')