# Data Engineering Project Roadmap

In [None]:
import requests
import urllib.parse

base_url = "https://coastwatch.pfeg.noaa.gov/erddap/griddap/erdMWcflhmday.json"
params = {
    "latitude": "32<=latitude<=49",
    "longitude": "235<=longitude<=243",
    "time": "2023-01-01<=time<=2023-01-02"
}

query_string = urllib.parse.urlencode(params, safe="<=,")
url = f"{base_url}?time,latitude,longitude,flh&{query_string}"

print(f"Query URL: {url}")

response = requests.get(url)
if response.status_code != 200:
    print(f"Error: {response.status_code} - {response.text}")
else:
    print(response.json())


### Project Structure

In [None]:
""" 
📁 noaa_pipeline/
│
├── 📂 .docker/
│   ├── 📄 Dockerfile.airflow
│   ├── 📄 Dockerfile.graphql
│   └── 📄 Dockerfile.spark
│
├── 📂 dags/
│   ├── 📄 noaa_etl_dag.py
│   ├── 📄 __init__.py
│   └── 📂 scripts/
│       ├── 📄 extract_noaa_data.py
│       ├── 📄 transform_data.py
│       └── 📄 upload_to_gcp.py
│
├── 📂 graphql/                       # Renamed from graphql_server
│   ├── 📂 resolvers/
│   │   ├── 📄 __init__.py
│   │   ├── 📄 pmn_resolver.py
│   │   ├── 📄 buoy_resolver.py
│   │   └── 📄 climate_resolver.py
│   │
│   ├── 📂 spark_utils/
│   │   ├── 📄 __init__.py
│   │   ├── 📄 spark_session.py
│   │   └── 📄 hdfs_helpers.py
│   │
│   ├── 📂 utils/
│   │   ├── 📄 __init__.py
│   │   ├── 📄 api_requests.py
│   │   └── 📄 logging_setup.py
│   │
│   ├── 📄 schema.py
│   ├── 📄 app.py
│   └── 📄 requirements.txt
│
├── 📂 spark_jobs/
│   ├── 📄 transform_pmn.py
│   ├── 📄 transform_buoy.py
│   └── 📄 transform_climate.py
│
├── 📂 data/
│   ├── 📂 raw/
│   ├── 📂 transformed/
│   ├── 📂 postgres_data/
│   └── 📂 hadoop_data/
│
├── 📄 docker-compose.yaml
├── 📄 .dockerignore
├── 📄 .env
├── 📄 requirements.txt
├── 📄 .gitignore
└── 📄 README.md
 """

## Phase 1: Project Setup and Environment Preparation


### Goal: Set up the tools, environment, and infrastructure needed to run your project.

1. **Set Up AWS Resources:**
   - **S3 Buckets**:
     - Create two S3 buckets: 
       - **Raw Data Storage** (e.g., `s3://your-bucket/raw-data/`)
       - **Processed Data Storage** (e.g., `s3://your-bucket/processed-data/`)
   - **IAM Roles and Policies**:
     - Create an **IAM Role** with:
       - **S3 read/write access**
       - **Glue and Redshift permissions**
     - Attach the role to your **Glue jobs** and **Redshift** COPY operations.

2. **Set Up PostgreSQL (Optional for Metadata Storage)**:
   - Install PostgreSQL or use **Amazon RDS** to store metadata or staging data.

3. **Install and Configure Airflow**:
   - Use **Docker** or **Local Setup** to install Airflow:


In [None]:
!pip install apache-airflow boto3


4. **Install Required Packages**:


In [None]:
!pip install boto3 pandas sqlalchemy

## Phase 2: Data Extraction and Loading


### Goal: Ingest data into S3 and set up the initial pipeline to load data into PostgreSQL and Redshift.

Below is a Python code example for extracting data from S3 using **Boto3**:


In [None]:

import boto3

def extract_data_from_s3():
    # Download data from S3 to local storage.
    s3 = boto3.client('s3')
    s3.download_file('noaa-nexrad-level2', '2022/01/01/sample.csv', './data/sample.csv')
    print("Data downloaded from S3.")


## Phase 3: AWS Glue Development and ETL Pipeline


### Goal: Set up AWS Glue for data transformation and write PySpark code for ETL tasks.

Below is a PySpark code example for temperature conversion.


In [None]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NOAA_ETL").getOrCreate()
df = spark.read.csv("s3://your-bucket/raw-data/sample.csv", header=True, inferSchema=True)

df_transformed = df.withColumn(
    'temp_celsius', (df['temperature'] - 32) * 5.0 / 9.0
)
df_transformed.write.parquet("s3://your-bucket/processed-data/")
print("Data transformed and written to S3.")


## Phase 4: Data Transformation and Redshift Integration


### Goal: Write transformed data to Redshift and create analytical tables for Tableau.

Below is the SQL COPY command for loading data from S3 to Redshift.


In [None]:

COPY noaa_weather
FROM 's3://your-bucket/processed-data/'
IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftCopyRole'
FORMAT AS PARQUET;


## Phase 5: Data Validation and Monitoring


### Goal: Automate data validation and error handling for your pipeline.

Example Python code for data validation using Airflow:


In [None]:

import pandas as pd

def validate_data():
    df = pd.read_csv('./data/sample.csv')
    missing_count = df['temperature'].isnull().sum()
    if missing_count > 0:
        raise ValueError(f"{missing_count} missing values found!")
    print("Data validation passed.")


## Phase 6: LLM Integration for Documentation and Insights


Below is a Python function using OpenAI API to generate data documentation.


In [None]:

import openai

def generate_documentation():
    response = openai.Completion.create(
        model="gpt-3.5-turbo",
        prompt="Generate documentation for the NOAA dataset...",
        max_tokens=200
    )
    print(response.choices[0].text)


## Phase 7: Tableau Integration and Analysis


### Goal: Visualize the transformed data using Tableau dashboards.

1. **Export Transformed Data**:
   - Use the following code to export data from Redshift to CSV for Tableau.


In [None]:

import pandas as pd

def export_data_to_csv():
    df = pd.read_sql("SELECT * FROM noaa_weather", con=your_redshift_connection)
    df.to_csv("noaa_weather.csv", index=False)
    print("Data exported to CSV.")


## Phase 8: Deployment and Automation


### Goal: Deploy the entire pipeline and set up automation for production.

Example Airflow DAG snippet for automating the pipeline.


In [None]:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

with DAG('noaa_pipeline', start_date=datetime(2024, 10, 17), schedule_interval='@daily') as dag:
    extract_task = PythonOperator(task_id='extract_data', python_callable=extract_data_from_s3)
    validate_task = PythonOperator(task_id='validate_data', python_callable=validate_data)
    extract_task >> validate_task


## Phase 9: Testing, Optimization, and Future Planning


### Goal: Test the entire pipeline and plan for future extensions.

1. **Test End-to-End Execution**: Verify that each step works as expected.
2. **Optimize Performance**: Partition datasets and tune queries.
3. **Explore Databricks**: Consider Databricks for future ML-based projects.


1. NOAA PMN (Phytoplankton Monitoring Network) Dataset
API Access (ERDDAP Server):
PMN Dataset (in JSON):
https://coastwatch.pfeg.noaa.gov/erddap/tabledap/noaa_pmn.json

PMN Dataset Metadata and Parameter Documentation:
https://coastwatch.pfeg.noaa.gov/erddap/tabledap/noaa_pmn.html

Description of Key Parameters:

time: The timestamp of the observation.

latitude / longitude: Geographic location of the sample.

chlorophyll: Chlorophyll concentration in the water (mg/m³).

temperature: Sea surface temperature at the time of observation (°C).

salinity: Salinity levels (PSU - Practical Salinity Units).

2. NOAA Buoy Data (NDBC)

API Access:

Station Data (JSON):
https://www.ndbc.noaa.gov/data/realtime2/

Example: https://www.ndbc.noaa.gov/data/realtime2/41009.txt

Buoy Metadata and Documentation:
https://www.ndbc.noaa.gov/measdes.shtml

Description of Key Parameters:

YY / MM / DD / hh / mm: Year, month, day, hour, and minute of observation.

WDIR: Wind direction (° from true north).

WSPD: Wind speed (m/s).

GST: Wind gust (m/s).

WVHT: Wave height (meters).

DPD: Dominant wave period (seconds).

APD: Average wave period (seconds).

PRES: Atmospheric pressure at sea level (hPa).

ATMP: Air temperature (°C).

WTMP: Sea surface temperature (°C).

3. NOAA Climate Data (NCDC)

API Access:

NOAA Climate Data Online (CDO) API:
https://www.ncdc.noaa.gov/cdo-web/webservices/v2

API Token Registration:
To access the NOAA CDO API, you’ll need to register for a token here:
https://www.ncdc.noaa.gov/cdo-web/token

Endpoints:

Datasets: https://www.ncdc.noaa.gov/cdo-web/api/v2/datasets
Locations: https://www.ncdc.noaa.gov/cdo-web/api/v2/locations

Description of Key Parameters:
DATE: Date of observation.

TMAX / TMIN: Maximum and minimum temperatures (°C).

PRCP: Precipitation (mm).

SNOW: Snowfall (mm).

WIND: Wind speed (m/s).

EVAP: Evaporation (mm).

It may be useful to leverage ArcGIS in order to get a good visual analysis of the region we are running our analysis on.