This project implements a comprehensive data pipeline for processing large-scale meteorological data using Hadoop and Spark on cloud infrastructure.
The pipeline processes weather datasets (e.g., Kaggle Weather History, NOAA data) through the following stages:
- Data Ingestion: Download and upload datasets to cloud storage (S3, GCS) or HDFS
- Preprocessing: Clean and transform raw weather data
- Batch Analytics: Statistical analysis using Hadoop MapReduce and Spark
- Advanced Analytics: Machine learning predictions and anomaly detection with Spark MLlib
# Create EMR cluster using AWS CLI
aws emr create-cluster \
--name "Weather Data Pipeline" \
--release-label emr-6.15.0 \
--instance-type m5.xlarge \
--instance-count 3 \
--applications Name=Hadoop Name=Spark Name=Hive \
--ec2-attributes KeyName=your-key-pair \
--log-uri s3://your-bucket/logs/# Create Dataproc cluster
gcloud dataproc clusters create weather-pipeline \
--region=us-central1 \
--num-workers=2 \
--worker-machine-type=n1-standard-4 \
--master-machine-type=n1-standard-4 \
--image-version=2.1-debian11 \
--project=your-project-id# Create HDInsight cluster via Azure Portal or CLI
az hdinsight create \
--resource-group your-resource-group \
--name weather-pipeline \
--type Spark \
--location eastus \
--cluster-tier Standard# Install Hadoop and Spark
# Download Hadoop: https://hadoop.apache.org/releases.html
# Download Spark: https://spark.apache.org/downloads.html
# Set environment variables
export HADOOP_HOME=/path/to/hadoop
export SPARK_HOME=/path/to/spark
export PATH=$PATH:$HADOOP_HOME/bin:$SPARK_HOME/bin- Create Kaggle Account (if needed): https://www.kaggle.com/account/login
- Install Kaggle API:
pip install kaggle
- Download Dataset:
# Configure Kaggle API (requires API token from Kaggle account settings) mkdir -p ~/.kaggle # Place kaggle.json in ~/.kaggle/ # Download weather dataset kaggle datasets download -d budincsevity/szeged-weather --unzip
# Use the download script
python scripts/download_noaa_data.py --start-date 2020-01-01 --end-date 2023-12-31python scripts/download_weather_data.py.
├── README.md
├── requirements.txt
├── config/
│ ├── core-site.xml # Hadoop core configuration
│ ├── hdfs-site.xml # HDFS configuration
│ ├── spark-defaults.conf # Spark configuration
│ └── application.properties # Application settings
├── scripts/
│ ├── download_weather_data.py # Dataset download script
│ ├── upload_to_hdfs.sh # HDFS upload script
│ ├── upload_to_s3.sh # S3 upload script
│ └── setup_cluster.sh # Cluster setup script
├── src/
│ ├── ingestion/
│ │ └── data_ingestion.py # Data ingestion module
│ ├── preprocessing/
│ │ └── data_preprocessing.py # Data cleaning and transformation
│ ├── mapreduce/
│ │ ├── temperature_stats.py # MapReduce temperature analysis
│ │ ├── humidity_analysis.py # MapReduce humidity analysis
│ │ └── weather_mapper.py # MapReduce utilities
│ ├── spark/
│ │ ├── batch_analytics.py # Spark batch processing
│ │ ├── data_analysis.py # Spark DataFrame analytics
│ │ └── streaming_analytics.py # Spark Streaming (optional)
│ └── ml/
│ ├── weather_prediction.py # ML weather prediction
│ └── anomaly_detection.py # Anomaly detection
├── jobs/
│ ├── mapreduce_jobs.sh # Submit MapReduce jobs
│ └── spark_jobs.sh # Submit Spark jobs
└── results/
└── (analysis outputs will be saved here)
python scripts/download_weather_data.py --source kaggle --output data/weather.csv# Start Hadoop services first
hdfs namenode -format # Only first time
start-dfs.sh
start-yarn.sh
# Upload data
bash scripts/upload_to_hdfs.sh data/weather.csv /user/hadoop/weather/aws s3 cp data/weather.csv s3://your-bucket/weather-data/weather.csv
# Or use the script:
bash scripts/upload_to_s3.sh data/weather.csv s3://your-bucket/weather-data/gsutil cp data/weather.csv gs://your-bucket/weather-data/weather.csv# Using Spark for preprocessing
spark-submit \
--master yarn \
--deploy-mode cluster \
src/preprocessing/data_preprocessing.py \
--input s3://your-bucket/weather-data/weather.csv \
--output s3://your-bucket/weather-data/processed/# Temperature statistics
hadoop jar target/weather-analysis-1.0.jar \
TemperatureStats \
/user/hadoop/weather/processed \
/user/hadoop/weather/results/temperature_stats
# View results
hdfs dfs -cat /user/hadoop/weather/results/temperature_stats/part-00000# Spark batch analytics
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 4 \
--executor-memory 4g \
src/spark/batch_analytics.py \
--input s3://your-bucket/weather-data/processed \
--output s3://your-bucket/weather-data/analytics/# Weather prediction model
spark-submit \
--master yarn \
--packages org.apache.spark:spark-mllib_2.12:3.5.0 \
src/ml/weather_prediction.py \
--input s3://your-bucket/weather-data/processed \
--output s3://your-bucket/weather-data/models/Option 1: Run Complete Pipeline (Recommended)
# Install dependencies
pip install -r requirements.txt
# Run complete pipeline with MongoDB integration
python scripts/run_full_pipeline.py \
--input data/weather.csv \
--mongodb "mongodb+srv://Cloud:cloud09@cluster0.rgpbs3r.mongodb.net/?appName=Cluster0"Option 2: Step-by-Step
# 1. Download dataset (or use your existing dataset)
python scripts/download_weather_data.py
# 2. Run preprocessing
spark-submit --master local[*] src/preprocessing/data_preprocessing.py \
--input data/weather.csv \
--output data/processed \
--format csv \
--output-format parquet
# 3. Run analytics with MongoDB export
spark-submit --master local[*] src/spark/batch_analytics.py \
--input data/processed \
--output results/analytics \
--format parquet \
--mongodb "mongodb+srv://Cloud:cloud09@cluster0.rgpbs3r.mongodb.net/?appName=Cluster0"
# 4. Train ML models with MongoDB export
spark-submit --master local[*] \
--packages org.apache.spark:spark-mllib_2.12:3.5.0 \
src/ml/weather_prediction.py \
--input data/processed \
--output results/models \
--format parquet \
--model all \
--mongodb "mongodb+srv://Cloud:cloud09@cluster0.rgpbs3r.mongodb.net/?appName=Cluster0"
# 5. Export full dataset to MongoDB (optional)
spark-submit --master local[*] src/database/mongodb_exporter.py \
--input data/processed \
--collection weather_data \
--format parquet \
--connection-string "mongodb+srv://Cloud:cloud09@cluster0.rgpbs3r.mongodb.net/?appName=Cluster0"# Submit job to EMR cluster
aws emr add-steps \
--cluster-id j-XXXXXXXXXXXXX \
--steps Type=Spark,Name="Weather Analytics",\
ActionOnFailure=CONTINUE,\
Args=[--deploy-mode,cluster,--class,WeatherAnalytics,\
s3://your-bucket/jars/weather-analysis.jar,\
s3://your-bucket/weather-data/processed,\
s3://your-bucket/weather-data/analytics]Edit config/application.properties to customize:
- Input/output paths
- Cluster settings
- Spark configuration
- Database connections (if needed)
Analytics results are saved in:
- HDFS:
/user/hadoop/weather/results/ - S3:
s3://your-bucket/weather-data/results/ - Local:
results/directory
Output formats:
- CSV files for tabular results
- JSON for structured data
- Parquet for optimized storage
- Partitioning: Partition data by date/location for better performance
- Caching: Cache frequently used DataFrames in Spark
- Cluster Sizing: Adjust executor memory and cores based on data size
- Storage Format: Use Parquet instead of CSV for better compression and query performance
- HDFS Connection Error: Ensure Namenode is running and accessible
- Memory Issues: Increase executor memory in Spark configuration
- Permission Errors: Check HDFS permissions with
hdfs dfs -ls - Cloud Storage Access: Verify AWS/GCP credentials and bucket permissions
MIT License - Feel free to use and modify for your projects.
Contributions are welcome! Please feel free to submit a Pull Request.