A comprehensive, production-ready Python library for Salesforce integration with support for any Salesforce object, flexible data mapping, and powerful ETL pipelines.
-
π Multiple Authentication Methods
- JWT Bearer Flow (recommended for production)
- OAuth 2.0 Password Flow
- Environment-based configuration
-
π Complete CRUD Operations
- Works with any Salesforce object (standard or custom)
- Create, Read, Update, Delete, Upsert
- Bulk operations via Composite API
- Query with automatic pagination
-
πΊοΈ Flexible Field Mapping
- Simple field renaming
- Value transformations with custom functions
- Default values
- Nested field access (dot notation)
- Conditional mapping
-
π ETL Pipeline Framework
- Configuration-driven sync pipelines
- Multiple sync modes (INSERT, UPDATE, UPSERT, DELETE)
- Batch processing
- Progress tracking with callbacks
- Comprehensive error handling
-
π Production-Ready Logging
- File and console output
- Automatic log rotation
- Colored console output
- Contextual logging
- Configurable log levels
-
π οΈ Command-Line Interface
- Query, create, update, delete from terminal
- Run sync pipelines from YAML config
- Describe Salesforce objects
- Test authentication
pip install salesforce-toolkitgit clone https://github.com/yourusername/salesforce-toolkit.git
cd salesforce-toolkit
pip install -e .# Database support
pip install salesforce-toolkit[database]
# Data manipulation (pandas, numpy)
pip install salesforce-toolkit[data]
# Development tools
pip install salesforce-toolkit[dev]Create a .env file in your project root:
# JWT Authentication (Recommended)
SF_CLIENT_ID=3MVG9...
SF_USERNAME=user@example.com.sandbox
SF_PRIVATE_KEY_PATH=/path/to/server.key
SF_LOGIN_URL=https://test.salesforce.com
# Logging
LOG_DIR=./logs
LOG_LEVEL=INFOfrom salesforce_toolkit import JWTAuthenticator, SalesforceClient
# Authenticate
auth = JWTAuthenticator.from_env()
session = auth.authenticate()
# Create client
client = SalesforceClient(session)
# Create a record
account_id = client.create("Account", {
"Name": "ACME Corporation",
"Industry": "Technology"
})
# Query records
accounts = client.query("SELECT Id, Name FROM Account LIMIT 10")
# Update a record
client.update("Account", account_id, {"Phone": "555-1234"})
# Delete a record
client.delete("Account", account_id)from salesforce_toolkit import (
JWTAuthenticator,
SalesforceClient,
FieldMapper,
SyncPipeline,
SyncMode
)
# Authenticate
auth = JWTAuthenticator.from_env()
session = auth.authenticate()
client = SalesforceClient(session)
# Define field mapping
mapper = FieldMapper({
"customer_name": "Name",
"customer_email": "Email",
"industry_code": ("Industry", lambda x: x.title()) # Transform
})
# Create pipeline
pipeline = SyncPipeline(
client=client,
sobject="Account",
mapper=mapper,
mode=SyncMode.INSERT,
batch_size=200
)
# Sync data
source_data = [
{"customer_name": "ACME", "customer_email": "info@acme.com"},
{"customer_name": "Globex", "customer_email": "contact@globex.com"}
]
result = pipeline.sync(source_data)
print(f"Synced {result.success_count}/{result.total_records} records")# Test authentication
sf-toolkit auth --method jwt
# Query Salesforce
sf-toolkit query "SELECT Id, Name FROM Account LIMIT 10"
# Create a record
sf-toolkit create Account --data '{"Name": "ACME Corp"}'
# Run a sync pipeline
sf-toolkit sync --config sync_config.yaml
# Describe an object
sf-toolkit describe Account --fieldsfrom salesforce_toolkit import JWTAuthenticator
# From environment variables
auth = JWTAuthenticator.from_env()
# Or manual configuration
auth = JWTAuthenticator(
client_id="3MVG9...",
username="user@example.com",
private_key_path="/path/to/server.key",
login_url="https://test.salesforce.com"
)
session = auth.authenticate()from salesforce_toolkit import OAuthAuthenticator
# From environment variables
auth = OAuthAuthenticator.from_env()
# Or manual configuration
auth = OAuthAuthenticator(
client_id="3MVG9...",
client_secret="1234567890ABCDEF",
username="user@example.com",
password="your_password",
security_token="ABC123",
login_url="https://login.salesforce.com"
)
session = auth.authenticate()# Single record
account_id = client.create("Account", {
"Name": "ACME Corp",
"Industry": "Technology"
})
# Batch create (up to 200 records)
results = client.create_batch("Contact", [
{"FirstName": "John", "LastName": "Doe"},
{"FirstName": "Jane", "LastName": "Smith"}
])# SOQL query with automatic pagination
accounts = client.query(
"SELECT Id, Name, Industry FROM Account WHERE Industry = 'Technology'"
)
# Query first result
account = client.query_one(
"SELECT Id, Name FROM Account WHERE Name = 'ACME Corp'"
)
# Get by ID
account = client.get("Account", "001XXXXXXXXXXXX")
# Count records
total = client.count("Account")
tech_count = client.count("Account", "Industry = 'Technology'")# Update by ID
client.update("Account", "001XXXXXXXXXXXX", {
"Phone": "555-9999",
"Industry": "Manufacturing"
})
# Upsert (requires External ID field)
account_id = client.upsert(
"Account",
"External_Key__c",
"EXT-12345",
{"Name": "ACME Corp", "Industry": "Tech"}
)client.delete("Account", "001XXXXXXXXXXXX")from salesforce_toolkit import FieldMapper
mapper = FieldMapper({
"first_name": "FirstName",
"last_name": "LastName",
"email": "Email"
})
source = {"first_name": "John", "last_name": "Doe", "email": "john@example.com"}
target = mapper.transform(source)
# Result: {"FirstName": "John", "LastName": "Doe", "Email": "john@example.com"}mapper = FieldMapper({
# Simple rename
"customer_name": "Name",
# With transformation
"email": ("Email", lambda x: x.lower()),
# With default value
"status": ("Status__c", None, "Active"),
# With both transformation and default
"created_at": (
"CreatedDate",
lambda x: x.strftime("%Y-%m-%d") if x else None,
datetime.now().strftime("%Y-%m-%d")
),
# Nested field access
"address.city": "BillingCity",
"address.state": "BillingState"
})# Available via YAML configuration
transforms = [
"lowercase", # Convert to lowercase
"uppercase", # Convert to uppercase
"strip", # Strip whitespace
"int", # Convert to integer
"float", # Convert to float
"bool", # Convert to boolean
"date_iso", # Format date as YYYY-MM-DD
"datetime_iso" # Format datetime as ISO 8601
]from salesforce_toolkit import SyncPipeline, SyncMode
pipeline = SyncPipeline(
client=client,
sobject="Account",
mapper=mapper,
mode=SyncMode.INSERT,
batch_size=200,
stop_on_error=False
)
result = pipeline.sync(source_data)def on_record_success(record, salesforce_id):
print(f"β Synced: {record['name']} -> {salesforce_id}")
def on_record_error(record, error):
print(f"β Failed: {record['name']} - {error}")
def on_batch_complete(batch_num, total_batches, result):
print(f"Batch {batch_num}/{total_batches} done")
pipeline = SyncPipeline(
client=client,
sobject="Account",
mapper=mapper,
mode=SyncMode.INSERT,
callbacks={
"on_record_success": on_record_success,
"on_record_error": on_record_error,
"on_batch_complete": on_batch_complete
}
)# sync_config.yaml
source:
type: json
path: data/accounts.json
pipeline:
sobject: Account
mode: upsert
external_id_field: External_Key__c
batch_size: 200
mapping:
customer_name: Name
customer_email: Email
industry_code:
target: Industry
transform: uppercaseimport yaml
from salesforce_toolkit import SyncPipeline
with open("sync_config.yaml") as f:
config = yaml.safe_load(f)
pipeline = SyncPipeline.from_config(config["pipeline"], client)
result = pipeline.sync(source_data)from salesforce_toolkit.logging import setup_logger
import logging
logger = setup_logger(
name="my_app",
log_dir="./logs",
log_level=logging.INFO,
console_colors=True
)
logger.info("Application started")
logger.error("An error occurred", exc_info=True)from salesforce_toolkit.logging import ContextLogger, setup_logger
base_logger = setup_logger("my_app")
context_logger = ContextLogger(base_logger, context={
"transaction_id": "TX-12345",
"user_id": "user@example.com"
})
context_logger.info("Processing record")
# Logs: "Processing record [transaction_id=TX-12345, user_id=user@example.com]"from salesforce_toolkit.utils import (
sanitize_soql,
build_soql_query,
validate_salesforce_id,
format_datetime_for_sf,
generate_external_id,
batch_records
)
# Sanitize SOQL
safe_name = sanitize_soql("O'Brien & Associates")
# Build SOQL query
query = build_soql_query(
sobject="Account",
fields=["Id", "Name", "Industry"],
where="Industry = 'Technology'",
limit=100
)
# Validate Salesforce ID
if validate_salesforce_id("001XXXXXXXXXXXXXXX"):
print("Valid ID")
# Format datetime
sf_datetime = format_datetime_for_sf(datetime.now())
# Generate external ID
ext_id = generate_external_id("CUST", timestamp=True)
# Returns: "CUST-20251205-103000-abc123"
# Batch records
batches = batch_records(records, batch_size=200)The examples/ directory contains comprehensive examples:
- 01_basic_authentication.py - Authentication methods
- 02_crud_operations.py - CRUD operations
- 03_data_sync_pipeline.py - Data synchronization
Run an example:
cd examples
python 01_basic_authentication.pyCopy config/.env.example to .env and configure:
# Salesforce
SF_CLIENT_ID=your_consumer_key
SF_USERNAME=user@example.com
SF_PRIVATE_KEY_PATH=/path/to/server.key
SF_LOGIN_URL=https://test.salesforce.com
# Logging
LOG_DIR=./logs
LOG_LEVEL=INFOSee config/sync_config_example.yaml for pipeline configuration.
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=salesforce_toolkit --cov-report=html
# Run linter
flake8 salesforce_toolkit/
# Run type checker
mypy salesforce_toolkit/
# Format code
black salesforce_toolkit/SalesforceSession- Authenticated session objectSalesforceClient- Main API client for CRUD operationsJWTAuthenticator- JWT Bearer Flow authenticationOAuthAuthenticator- OAuth Password Flow authenticationFieldMapper- Field mapping and transformation engineSyncPipeline- ETL pipeline for data synchronization
salesforce_toolkit.auth- Authentication providerssalesforce_toolkit.core- Core client and session managementsalesforce_toolkit.mapping- Field mapping enginesalesforce_toolkit.pipeline- Sync pipeline frameworksalesforce_toolkit.logging- Logging systemsalesforce_toolkit.utils- Utility functions
- Support for Bulk API 2.0 (async bulk operations)
- Metadata API support (deploy/retrieve)
- Streaming API (PushTopic, Generic Streaming)
- Built-in retry mechanism with exponential backoff
- Dry-run mode for pipelines
- Performance monitoring and metrics
- Integration with popular ORMs (SQLAlchemy, Django)
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
git clone https://github.com/yourusername/salesforce-toolkit.git
cd salesforce-toolkit
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -e ".[dev]"This project is licensed under the MIT License - see the LICENSE file for details.
Antonio Trento
- GitHub: @antoniotrento
- LinkedIn: Antonio Trento
- Portfolio: Salesforce Toolkit Case Study
- Inspired by Simple Salesforce
- Built with Requests
- Powered by PyJWT
Made with β€οΈ by Antonio Trento
