This repository demonstrates how to create ZenML pipelines for executing SQL scripts in sequence, with both simple and advanced approaches.
- Execute multiple SQL scripts in a specific order
- Track SQL execution results and performance
- Secure database credential management
- Advanced SQL query visualization and metadata extraction
pip install -r requirements.txt
zenml initRun the basic example:
python simple_sql_pipeline.pyThis demonstrates:
- Sequential SQL script execution
- Error handling and validation
- Basic result tracking
- ZenML secrets integration (optional)
Run the advanced example:
python sql_pipeline.pyThis demonstrates:
- Custom materializer for rich visualizations
- Advanced metadata extraction
- Query performance analysis
- Dashboard integration
Perfect for straightforward SQL script execution:
from zenml import step, pipeline
from zenml.client import Client
@step
def execute_sql_script(script_name: str, query: str) -> dict:
"""Execute a SQL script and return results."""
# Get database credentials from ZenML secrets
client = Client()
try:
db_secret = client.get_secret("db_credentials")
# Use credentials to connect to database
except:
# Fallback to mock execution
pass
# Execute SQL and return results
return {"status": "success", "rows_affected": 100}
@pipeline
def simple_sql_pipeline():
"""Execute multiple SQL scripts in sequence."""
# Define your SQL scripts
scripts = [
{"name": "create_tables", "query": "CREATE TABLE users (...)"},
{"name": "insert_data", "query": "INSERT INTO users (...)"},
{"name": "update_records", "query": "UPDATE users SET ..."},
]
# Execute in sequence
for script in scripts:
result = execute_sql_script(script["name"], script["query"])
return resultsFor rich visualizations and metadata tracking:
from zenml import step, pipeline
from sql_materializer import SQLQueryMaterializer
from sql_executor import SQLQuery
@step(output_materializers=SQLQueryMaterializer)
def create_sql_query() -> SQLQuery:
"""Create a SQL query with metadata."""
return SQLQuery(
query="SELECT * FROM users WHERE created_at > '2023-01-01'",
name="user_analytics",
description="Get recent users"
)
@step
def execute_query(query: SQLQuery) -> dict:
"""Execute the SQL query."""
return query.execute()
@pipeline
def advanced_sql_pipeline():
"""Pipeline with custom materializer for rich visualizations."""
query = create_sql_query()
result = execute_query(query)
return resultzenml secret create db_credentials \
--host=your-db-host \
--username=your-username \
--password=your-password \
--database=your-databasezenml secret create bigquery_credentials \
--project_id=your-project-id \
--client_email=your-service-account@project.iam.gserviceaccount.com \
--private_key=@path/to/private-key.jsonOr run the setup script:
python setup_secrets.pyβββ simple_sql_pipeline.py # Basic SQL pipeline example
βββ sql_pipeline.py # Advanced pipeline with custom materializer
βββ sql_executor.py # SQLQuery class with execution logic
βββ sql_materializer.py # Custom materializer for visualizations
βββ setup_secrets.py # Helper to set up credentials
βββ requirements.txt # Python dependencies
βββ README.md # This file
- Sequential Execution: Scripts run in order with dependency tracking
- Error Handling: Pipeline stops if any script fails
- Result Validation: Validates execution results
- Mock Execution: Works without database connection for testing
- Secrets Integration: Secure credential management
- Custom Materializer: Rich visualizations in ZenML dashboard
- Metadata Extraction: Query complexity, keywords, performance metrics
- HTML Visualizations: Interactive query results display
- Performance Analysis: Query optimization recommendations
- Artifact Tracking: Full query lineage and versioning
The advanced pipeline with custom materializer provides rich visualizations in the ZenML dashboard:
This shows:
- Pipeline DAG: Visual representation of SQL query execution flow
- SQL Query Visualization: Interactive display of your SQL queries with syntax highlighting
- Execution Results: Status, execution time, and rows affected
- Query Metadata: Descriptions, parameters, and performance metrics
- Result Preview: Sample data from query execution
The simple pipeline includes examples for:
- Table creation
- Data insertion
- Record updates
- Data querying
- Cleanup operations
Edit the scripts list in simple_sql_pipeline.py:
scripts = [
{
"name": "your_script_name",
"query": "YOUR SQL QUERY HERE"
},
# Add more scripts...
]Replace the mock execution in execute_sql_script() with your database connection logic:
# Example for PostgreSQL
import psycopg2
def execute_sql_script(script_name: str, query: str) -> dict:
client = Client()
db_secret = client.get_secret("db_credentials")
conn = psycopg2.connect(
host=db_secret.secret_values["host"],
username=db_secret.secret_values["username"],
password=db_secret.secret_values["password"],
database=db_secret.secret_values["database"]
)
cursor = conn.cursor()
cursor.execute(query)
# Return actual results
return {"status": "success", "rows_affected": cursor.rowcount}- Clone this repository
- Install dependencies:
pip install -r requirements.txt - Initialize ZenML:
zenml init - Set up credentials (optional):
python setup_secrets.py - Run simple example:
python simple_sql_pipeline.py - Run advanced example:
python sql_pipeline.py
- Adapt the SQL scripts for your use case
- Set up real database connections
- Add error handling and retry logic
- Implement data validation steps
- Explore ZenML dashboard visualizations
This example provides a solid foundation for SQL pipeline execution in ZenML. Modify the scripts and connection logic to match your specific requirements!
