Skip to content

Commit

Permalink
Merge pull request #9 from scale-vector/quickstart_iteration
Browse files Browse the repository at this point in the history
updating the getting started experience
  • Loading branch information
matthauskrzykowski authored Jun 16, 2022
2 parents 6f6d257 + 0220d3f commit 4004c86
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 75 deletions.
214 changes: 144 additions & 70 deletions QUICKSTART.md
Original file line number Diff line number Diff line change
@@ -1,136 +1,207 @@
Follow this quick guide to implement DLT in your project
# Quickstart Guide: Data Load Tool (DLT)

## Simple loading of two rows:
## **TL;DR: This guide shows you how to load a JSON document into Google BigQuery using DLT.**

### Install DLT
DLT is available in PyPi and can be installed with `pip install python-dlt`. You also need to install support for target warehouses in extra packages:
![](docs/DLT-Pacman-Big.gif)

`pip install python-dlt[gcp]` for BigQuery
or
`pip install python-dlt[redshift]` for Redshift
*Please open a pull request [here](https://github.com/scale-vector/dlt/edit/master/QUICKSTART.md) if there is something you can improve about this quickstart.*

## 1. Grab the demo

### 1. Configuraton: name your schema, table, pass credentials
a. Clone the example repository:
```
git clone https://github.com/scale-vector/dlt-quickstart-example.git
```

b. Enter the directory:
```
cd dlt-quickstart-example
```

c. Open the files in your favorite IDE / text editor:
- `data.json` (i.e. the JSON document you will load)
- `credentials.json` (i.e. contains the credentials to our demo Google BigQuery warehouse)
- `quickstart.py` (i.e. the script that uses DLT)

## 2. Set up a virtual environment

a. Ensure you are using either Python 3.8 or 3.9:
```
python3 --version
```

b. Create a new virtual environment:
```
python3 -m venv ./env
```

c. Activate the virtual environment:
```
source ./env/bin/activate
```

## 3. Install DLT and support for the target data warehouse

a. Install DLT using pip:
```
pip3 install python-dlt
```

b. Install support for Google BigQuery:
```
pip3 install python-dlt[gcp]
```

## 4. Configure DLT

a. Import necessary libaries
```
import base64
import json
from dlt.common.utils import uniq_id
from dlt.pipeline import Pipeline, GCPPipelineCredentials
```

# 1. configuration: name your schema, table, pass credentials
b. Create a unique prefix for your demo Google BigQuery table
```
schema_prefix = 'demo_' + uniq_id()[:4]
```

c. Name your schema
```
schema_name = 'example'
parent_table = 'my_json_doc'
# gcp_credential_json_file_path = "/Users/adrian/PycharmProjects/sv/dlt/temp/scalevector-1235ac340b0b.json"
gcp_credentials_json = {
"type": "service_account",
"project_id": "zinc-mantra-353207",
"private_key": "XFhETkYxMSY7Og0jJDgjKDcuUz8kK1kAXltcfyQqIjYCBjs2bDc3PzcOCBobHwg1TVpDNDAkLCUqMiciMD9KBBEWJgIiDDY1IB09bzInMkAdMDtCFwYBJ18QGyR/LBVEFQNQOjhIB0UXHhFSOD4hDiRMYCYkIxgkMTgqJTZBOWceIkgHPCU6EiQtHyRcH0MmWh4xDjowBkcMGSY8I38cLgk6NVYAGEU3ExcvPVQvBUYyIS5BClkyHB4MPkATM0BCeFwcFS9dNg8AJA40B0pYJUUxAjkbCzhZQj9mODk6f0Y6JRUBJyQhZysEWkU8MwU1LCsELF4gBStNWzsHAh4PXTVAOxA3PSgJUksFFgAwVxkZGiMwJT4UEgwFEn8/FRd/O1UmKzYRH19kCjBaLCAGIB0VUVk+Bh0zJzQtElJKOBIFAGULRQY7BVInOSAoGBdaMCYgIhMnCBhfNQsDFABFIH8+MD0JBjM0PEQxBwRGXwAiIBkoExgcFCYQQzE6AUAHCCQzSjpdKwcYFAIkHg1CG0o3NSBMEztEBQRYCgB9NwQofw8FOAohDzgCbBQ7MzQoJigUEyQzJlsWNRk7CxYDJS43Jj5BIj5IQQ8UPUtELURCRjBHFRcZMzs+MVAgAmQfGyJ/JjcTHgVWBzBJXEQ6TRgHXD0YCUI7fDQVAiUCMCALM1MbBxw8LCkCJQEySwIZNTJDSyBBJCE0OgsBIkBGSwkfEH8DUjlKM1E+H30nGxwAMxYpG0IpMARoA08dDQFWExs/Lh06VT0hHicQNlsiQQIHDE4UAV4ABAAjMkMFPTB9ISU3fws2GysuBBo1GR84OCJQWgdLBCg3R0Y8FwIYDUwACyAmOR1GIUYgBw86DDIFKkcRXkE9Exo6ERIxACIFHHxGRUJ/XicRPh0GIRBnRQwrQyc7JRRNNB0ieScTO0UYJzwRFRAdIH0WGjVDEVYGSkNSRyBvEk80OzkWDCtfLSc4dEYbJn84JD83ACYzREw6XR9EHxofFiEJQgR0BTBIMQQRBzccJjFMZQERRhsGGTo4NgYjMBkiMisDGyVAJCwbGExmRw48fyEgEUUdKREZBh0UOT89ITJcJSsZHhwjEyckBhURHAAuRhtkPBEEExkvPUNFEzslexlDJx4TIB5GIBZKNxwqGSN/HwAxEjwbXQNGB0YXGwIAASYDWBwibh0UJgZfFiEkJCQbW3kwESk7ODAFKhsACiFhADknNwwSEwoZEDNbYwM5SH8xUwobMCUGDnlBAzQwXiIPKwE5MUxDCjNCJCIhDCI3ThUnfCYkRxkoUiIbMxsfNWEpNzJGPDc4FAElJUxqHxIkfytbKAoMEjhBTkIhNkMsJ1spMydBI08aNwMHJw8aNxk5ARdbFBM9Fj8bPT4ZLhMsdTE9JCImFy8/OwoAGm8XAyF/MS8vJxsLAUZ9KjIrPwxVWwoNJB0OfDo3QR0vVwUWESBHFX1cMl5NDjskPUFOCltnB0cLDyg3ET1fKgoGfAY+O38/EA40MCBGBFgEPTMSLTsOJiAmHSNjNBQVHTwCIBQuUEoGRB4aGQ0YKBxHPg8GIUoaFEAcCikkNT4ONUNgBSBHfyMZAipBNyIBHyEnNx8vTD0kIggqN3g7FAgAAjUDCTI0JRcUMB8DNwo7DBhHOBhBRzcHBBI8EQERGQ5ZGHRBPjt/USwsHDBTAw5XET5AHgYSI0YNBQQmbkYhOiAuFjghQycCAWkpFUceOFUIEgEsBTVOGD8lEVFQLgc1DjU2bDoyBX8FEQpHHyUwW3cQEScNOUgGPhJRRzZmSkUdIj4UCRlCVxUsSRJBIk0lIjsWRAYoFWULHEcBRhclJw0RWSFnNj82fwFQM0EeUgoBWwBCAy0wNQU+Jzk7OFRAMhMCXQYsKyIRPFteGRdHRj4XBwNDBCYCXAkVKzA9GgkKJhAmGh8aLxt/DS4OIRtSFDl4ETEUGFtXMgEAJzYXSikkFQMkUBgVQ1A0QV4XGAA7BSIENDYgPQBUKS4jJhM6EwQsUBMHYTQsQn8oUjM2PBNdEmowHEA4HxFaNj4lQDd8CjxJPyA6ChtAUEZHHT0iOAVeCDMXFSAzXxUxMkMSIAg+RzwqKzVURkE2fxEQB0IyDQgzHBA5KDcDOS8aRSZZQ0BDMAkkEwIgMQwkKwx8JRkEFjgkWwkyJkUfdEAsSBMtGyA4RiVKBENDJCd/WzUvIzc2IBN6HTgcOQsJODYhUEVBRwQUe1hETkZeMS82VH0hPyc0PSZLODE4X1kAXlt7", # noqa
"client_email": "data-load-tool-public-demo@zinc-mantra-353207.iam.gserviceaccount.com",
}
# we do not want to have this key verbatim in repo so we decode it here
gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8")
credential = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)
```

# if you re-use an edited schema, then uncomment this part, so you can save it to file
# schema_file_path = "examples/schemas/quickstart.yml"
d. Name your table
```
parent_table = 'json_doc'
```

e. Specify your schema file location
```
schema_file_path = 'schema.yml'
```

### 2. Create a pipeline
f. Load credentials
```
with open('credentials.json', 'r') as f:
gcp_credentials_json = json.load(f)
# Private key needs to be decoded (because we don't want to store it as plain text)
gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8")
credentials = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)
```
pipeline = Pipeline(schema_name)
pipeline.create_pipeline(credential)

# Optionally uncomment to re-use a schema
# schema = Pipeline.load_schema_from_file(schema_file_path)
# pipeline.create_pipeline(credential, schema=schema)
## 5. Create a DLT pipeline

a. Instantiate a pipeline
```
pipeline = Pipeline(schema_name)
```

### 3. Pass the data to the pipeline and give it a table name.
b. Create the pipeline with your credentials
```
pipeline.create_pipeline(credentials)
```

rows = [{"name":"Ana", "age":30, "id":456, "children":[{"name": "Bill", "id": 625},
{"name": "Elli", "id": 591}
]},
## 6. Load the data from the JSON document

{"name":"Bob", "age":30, "id":455, "children":[{"name": "Bill", "id": 625},
{"name": "Dave", "id": 621}
]}
]
a. Load JSON document into a dictionary
```
with open('data.json', 'r') as f:
data = json.load(f)
```

pipeline.extract(iter(rows), table_name=parent_table)
## 7. Pass the data to the DLT pipeline

# Optionally the pipeline to un-nest the json into a relational structure
pipeline.unpack()
a. Extract the dictionary into a table
```
pipeline.extract(iter(data), table_name=parent_table)
```

# Optionally save the schema for manual edits/future use.
# schema = pipeline.get_default_schema()
# schema_yaml = schema.as_yaml()
# f = open(data_schema_file_path, "a")
# f.write(schema_yaml)
# f.close()
b. Unpack the pipeline into a relational structure
```
pipeline.unpack()
```

c. Save schema to `schema.yml` file
```
schema = pipeline.get_default_schema()
schema_yaml = schema.as_yaml(remove_default_hints=True)
with open(schema_file_path, 'w') as f:
f.write(schema_yaml)
```


### 4. Load
## 8. Use DLT to load the data

a. Load
```
pipeline.load()
```

### 5. Error capture - print, raise or handle.

b. Make sure there are no errors
```
# now enumerate all complete loads to check if we have any failed packages
# complete but failed job will not raise any exceptions
completed_loads = pipeline.list_completed_loads()
# print(completed_loads)
# now enumerate all complete loads if we have any failed packages
# complete but failed job will not raise any exceptions
for load_id in completed_loads:
print(f"Checking failed jobs in {load_id}")
for job, failed_message in pipeline.list_failed_jobs(load_id):
print(f"JOB: {job}\nMSG: {failed_message}")
```

c. Run the script:
```
python3 quickstart.py
```
### 6. Use your data

d. Inspect `schema.yml` that has been generated:
```
with pipeline.sql_client() as c:
query = f"SELECT * FROM `{schema_prefix}_example.my_json_doc`"
df = c._execute_sql(query)
print(query)
print(list(df))
print()
vim schema.yml
```

## 9. Query the Google BigQuery table

query = f"SELECT * FROM `{schema_prefix}_example.my_json_doc__children` LIMIT 1000"
a. Run SQL queries
```
def run_query(query):
df = c._execute_sql(query)
print(query)
print(list(df))
print()
# and we can join them via auto generated keys
with pipeline.sql_client() as c:
# Query table for parents
query = f"SELECT * FROM `{schema_prefix}_example.json_doc`"
run_query(query)
# Query table for children
query = f"SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000"
run_query(query)
# Join previous two queries via auto generated keys
query = f"""
select p.name, p.age, p.id as parent_id,
c.name as child_name, c.id as child_id, c._pos as child_order_in_list
from `{schema_prefix}_example.my_json_doc` as p
left join `{schema_prefix}_example.my_json_doc__children` as c
from `{schema_prefix}_example.json_doc` as p
left join `{schema_prefix}_example.json_doc__children` as c
on p._record_hash = c._parent_hash
"""
df = c._execute_sql(query)
print(list(df))
run_query(query)
```

table: my_json_doc
b. See results like the following

table: json_doc
```
{ "name": "Ana", "age": "30", "id": "456", "_load_id": "1654787700.406905", "_record_hash": "5b018c1ba3364279a0ca1a231fbd8d90"}
{ "name": "Bob", "age": "30", "id": "455", "_load_id": "1654787700.406905", "_record_hash": "afc8506472a14a529bf3e6ebba3e0a9e"}
```

table: my_json_doc__children
table: json_doc__children
```
# {"name": "Bill", "id": "625", "_parent_hash": "5b018c1ba3364279a0ca1a231fbd8d90", "_pos": "0", "_root_hash": "5b018c1ba3364279a0ca1a231fbd8d90",
# "_record_hash": "7993452627a98814cc7091f2c51faf5c"}
Expand All @@ -150,9 +221,12 @@ SQL result:
# { "name": "Bob", "age": "30", "parent_id": "455", "child_name": "Dave", "child_id": "621", "child_order_in_list": "1"}
```

## 10. Next steps

a. Replace `data.json` with data you want to explore

b. Check that the inferred types are correct in `schema.yml`

c. Set up your own Google BigQuery warehouse (and replace the credentials)

### 7. Run it yourself - plug your own iterator or generator.
Working example:
[quickstart.py](examples/quickstart.py)
d. Use this new clean staging layer as the starting point for a semantic layer / analytical model (e.g. using dbt)
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@

</p>

# DLT
DLT enables simple python-native data pipelining for data professionals.
# Data Load Tool (DLT)

DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run.
Data Load Tool (DLT) enables simple, python-native data pipelining for data professionals.

## [Quickstart guide](QUICKSTART.md)
It is an open source, scalable data loading framework that does not require any help from DevOps.

## [Example Sources](examples/README.md)
Learn more with the **[Quickstart Guide](QUICKSTART.md)** and check out **[Example Sources](examples/README.md)** to get started.

## How does it work?

Expand Down Expand Up @@ -109,4 +108,14 @@ In short, DLT does not require any specialist knowledge to use
Advanced, commercial-grade use of DLT requires only some configuration.


## Supported data warehouses

Google BigQuery:
```pip3 install python-dlt[gcp]```

Amazon Redshift:
```pip install python-dlt[redshift]```

## How to load very large sources?

You will want to use a generator instead of an iterator.

0 comments on commit 4004c86

Please sign in to comment.