A real-time ETL pipeline that processes visitor kiosk interactions from Liverpool Museum of Natural History (LMNH), validates the data, and stores it in a PostgreSQL database with accompanying Tableau dashboards for stakeholder analysis.
This project implements an end-to-end data pipeline that:
- Consumes live kiosk interaction data from a Kafka stream
- Validates and cleans data based on museum business rules
- Stores processed data in AWS RDS PostgreSQL database
- Provides interactive Tableau dashboards for different stakeholder needs
Kafka Stream → ETL Pipeline (EC2) → PostgreSQL (RDS) → Tableau Dashboard
↓ ↓ ↓ ↓
Live Kiosks Data Validation Clean Storage Stakeholder Insights
pipeline_live/
├── .env # Environment variables (not in repo)
├── consumer.py # Standalone Kafka consumer for testing
├── pipeline.py # Main ETL pipeline script
├── load_master_data.py # One-time exhibition data loader
├── schema.sql # Database schema definition
├── reset_database.sh # Script to reset transactional data
├── data/ # Exhibition JSON files from S3
└── .terraform/ # Terraform infrastructure files
All the required Python extensions and modules can be found in the requirements.txt and found using
pip3 install -r requirements.txt- AWS Account with S3, RDS and EC2 access
- Terraform for infrastructure management
- PostgreSQL database
- Kafka cluster access (Confluent Cloud)
- Tableau Online account
Create a .env file in the project root with the following variables:
# Kafka Configuration
BOOTSTRAP_SERVERS=your-kafka-cluster-endpoint
SECURITY_PROTOCOL=SASL_SSL
SASL_MECHANISM=PLAIN
USERNAME=your-kafka-username
PASSWORD=your-kafka-password
# Database Configuration
DATABASE_NAME=museum
DATABASE_USERNAME=postgres
DATABASE_PASSWORD=your-rds-password
DATABASE_IP=your-rds-endpoint.rds.amazonaws.com
DATABASE_PORT=5432# Deploy AWS infrastructure
terraform init
terraform plan
terraform apply# Create database schema
psql -h your-rds-endpoint -U postgres -d postgres -f schema.sql
# Load master data (exhibitions)
python3 load_master_data.py# Test Kafka connection
python3 consumer.py
# Run pipeline locally
python3 pipeline.py# SSH into EC2 instance
ssh -i your-key.pem ec2-user@ec2-ip-address
# Install dependencies
sudo yum update -y
sudo yum install python3 python3-pip git postgresql15-devel gcc python3-devel -y
pip3 install confluent-kafka psycopg2-binary python-dotenv
# Transfer files to EC2
scp -i your-key.pem pipeline.py .env ec2-user@ec2-ip-address:~
# Run pipeline in background
nohup python3 pipeline.py > pipeline.log 2>&1 &# Start the ETL pipeline locally
python3 pipeline.py# On EC2 instance, run in background
nohup python3 pipeline.py > pipeline.log 2>&1 &
# Monitor logs
tail -f pipeline.log
# Check process status
ps aux | grep pipelineThe pipeline validates incoming kiosk interactions against these business rules:
- Operating Hours: Only interactions between 8:45 AM - 6:15 PM
- Rating Values: Must be between -1 and 4 (where -1 = assistance request)
- Button Types: 0 (assistance) or 1 (emergency)
- Valid Exhibitions: Only exhibitions 0-5 (cross-referenced with master data)
- Data Integrity: Required fields (timestamp, site, value) must be present
Invalid data is logged and rejected, ensuring only clean data reaches the database.
exhibitions- Master data for museum exhibitionskiosk_interactions- Visitor interaction recordsrating_types- Lookup table for rating descriptionsbutton_types- Lookup table for assistance/emergency types
- Referential integrity with foreign keys
- Check constraints for data validation
- Separation of ratings vs assistance requests (NULL handling)
Three specialized dashboards serve different stakeholder needs:
- Overview Dashboard - General museum metrics and KPIs
- Exhibition Rating Data - Visitor satisfaction analysis for Angela (Exhibition Manager)
- Exhibition Security Data - Assistance/emergency patterns for Rita (Security Manager)
- Real-time data updates from live pipeline
- Interactive filters for date ranges and exhibitions
- Drill-down capabilities by time and location
./reset_database.sh# Check if pipeline is running
ps aux | grep pipeline
# View recent logs
tail -n 100 pipeline.log
# Monitor database row count
psql -h rds-endpoint -U postgres -d museum -c "SELECT COUNT(*) FROM kiosk_interactions;"This pipeline was developed as part of a data engineering coursework project, demonstrating real-world ETL pipeline implementation with modern cloud technologies. It showcases integration of Kafka streaming, PostgreSQL databases, AWS infrastructure, and business intelligence tools.