A simple PySpark pipeline that transforms insurance contracts and claims into transaction records.
Takes CSV files with contracts and claims data, joins them together, and outputs transaction records with some business logic applied (like mapping claim types and generating hash IDs).
# Copy environment template
cp .env.example .env
# Run everything with Docker
make docker-run
# Run tests
make docker-test
# Development commands
make docker-build # Build the image
make docker-shell # Interactive shell for debugging
make docker-logs # View container logs
make docker-clean # Clean up containers and images# If you want to run without Docker
cp .env.example .env
make run # Run pipeline locally
make test # Run tests locally- Docker
- Input files:
data/input/Contract.csvanddata/input/Claim.csv
Edit .env for basic settings like file paths and API URL.
Creates transaction records in data/output/TRANSACTIONS/ folder. Invalid data gets saved separately so it doesn't break the pipeline.
The base DataPipeline class defines the standard flow: extract → validate → transform → load. Each concrete pipeline (like ContractClaimPipeline) implements the specific steps while following the same structure.
Creates the right components based on configuration:
DataExtractorFactory.create_extractor()- handles different file formats (CSV, JSON, etc.)LoaderFactory.create_loader()- supports different output destinationsTransformerFactory- provides business-specific transformers
This makes it easy to add new formats or change behavior without touching existing code.
- Uses external API for generating hash IDs (with retry logic)
- Handles malformed data gracefully
- Configurable via environment variables
- Includes basic tests for reliability