# DTSA 5741 Final Project: Modeling Climate Anomalies with Statistical Analysis
 
---

## Introduction

This Jupyter Notebook presents the final project for DTSA 5741, focused on analyzing climate anomalies using statistical methods. The primary objective of this project is to explore and analyze historical climate data for a selected region within the United States, leveraging publicly available datasets to uncover trends and anomalies in air temperature, precipitation, and groundwater levels.

For this analysis, I selected Minnesota as the study area. This region is notable for its cold weather and diverse climate patterns, making it an interesting case study for climate analysis. By examining historical climate data for Minnesota, we aim to identify any significant anomalies or trends that may have occurred over the past few decades.

---

## Data Sources

1. **[NOAA Climate Data Online Portal](https://www.ncei.noaa.gov/cdo-web/):**
   - Provides access to a wide range of historical weather and climate data from observation stations across the United States.
   - Used to retrieve data on air temperature and precipitation for the selected region.

2. **[USGS National Water Dashboard](https://dashboard.waterdata.usgs.gov/):**
   - Offers real-time data and trends related to water resources such as groundwater levels and streamflow.
   - Supplemented NOAA data with hydrological information for the selected region.

---

## Objectives

The goals of this analysis include:  
1. **Data Acquisition:** Importing and preprocessing datasets to ensure consistency and usability.  
2. **Exploratory Data Analysis (EDA):** Understanding historical trends in climate variables and identifying any anomalies.  
3. **Statistical Analysis:** Performing in-depth analysis to interpret patterns in air temperature, precipitation, and groundwater levels.  
4. **Visualization:** Presenting findings through clear and insightful visualizations to support conclusions.  
5. **Reporting:** Documenting the process and results in a structured and reproducible manner.

## Notebook Structure

1. **Data Import and Cleaning:**  
   - Importing datasets 
   - Cleaning and wrangling data to address missing values, inconsistencies, or outliers.

2. **Exploratory Data Analysis:**  
   - Visualizing historical trends in air temperature, precipitation, and groundwater levels.  
   - Identifying seasonal patterns and anomalies.

3. **Statistical Analysis:**  
   - Applying statistical methods to investigate climate anomalies.  
   - Comparing long-term trends across the selected time periods.

4. **Results and Discussion:**  
   - Highlighting significant findings from the analysis.  
   - Discussing the implications of identified climate anomalies on the selected region.

5. **Conclusion:**  
   - Summarizing key insights and potential areas for further study.

---

In [11]:
# All Project Imports
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import gzip
import shutil
import datetime
import time 
from tqdm import tqdm
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sqlalchemy import create_engine
from dotenv import load_dotenv

# NWIS API Call Import
import dataretrieval.nwis as nwis

# 1. **Data Import and Cleaning:**  

The Global Historical Climatology Network-Daily (GHCN-Daily) is an extensive database that compiles daily climate records from over 100,000 land-based stations worldwide. It encompasses more than 40 meteorological elements, including daily maximum and minimum temperatures, precipitation, snowfall, snow depth, evaporation, wind movement, soil temperature, and cloudiness. The dataset aggregates approximately 1.4 billion data values, with records dating back to the 19th century. Where feasible, station records are updated daily and are typically accessible one to two days after observation.

Users can access the data in various formats:
	1.	GHCN-Daily Form (PDF): Provides five core values and, when available, additional elements such as temperature at observation time, evaporation, 24-hour wind movement, and soil temperatures. Units are presented in either metric or standard, based on user preference.
	2.	Custom GHCN-Daily CSV: Offers data optimized for spreadsheet use, allowing users to select preferred units, include flags, station names, geographic locations, and specify desired elements.
	3.	Custom GHCN-Daily ASCII: Delivers data in ASCII text format with options similar to the CSV format, enabling customization of included information.

The GHCN-Daily dataset serves as a replacement for older datasets maintained by the National Climatic Data Center (NCDC) and functions as the official archive for daily data from the Global Climate Observing System (GCOS) Surface Network. It is particularly suited for monitoring and assessing the frequency and magnitude of climate extremes.

Some data are sourced under the World Meteorological Organization’s World Weather Watch Program, adhering to WMO Resolution 40 (Cg-XII). This permits member countries to impose restrictions on the commercial use or re-export of their data outside the receiving country. Consequently, data summaries and products are intended for unrestricted use in research, education, and other non-commercial activities. For non-U.S. locations, data or any derived products should not be provided to other users or used for the re-export of commercial services.

For detailed information on data formats, observation definitions, and access methods, please refer to the GHCN-Daily documentation. ￼

https://www.ncei.noaa.gov/cdo-web/datasets



https://www.ncei.noaa.gov/pub/data/ghcn/daily/by_station/


The "station".csv files contain all daily elements for that GHCN station for its entire period of record. 
Each element-day is provided on a separate line and all files are updated daily for the entire period of record.

The following information serves as a definition of each field for all element-day records. 
Each field described below is separated by a comma ( , ) and follows the order below:

ID = 11 character station identification code
YEAR/MONTH/DAY = 8 character date in YYYYMMDD format (e.g. 19860529 = May 29, 1986)
ELEMENT = 4 character indicator of element type 
DATA VALUE = 5 character data value for ELEMENT 
M-FLAG = 1 character Measurement Flag 
Q-FLAG = 1 character Quality Flag 
S-FLAG = 1 character Source Flag 
OBS-TIME = 4-character time of observation in hour-minute format (i.e. 0700 =7:00 am); if no ob time information 
is available, the field is left empty

See section III of the GHCN-Daily readme.txt file (ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt)
for an explanation of ELEMENT codes and their units as well as the M-FLAG, Q-FLAG and S-FLAG.

The OBS-TIME field is populated with the observation times contained in NOAA/NCEI's HOMR station history database.  


#### Extract the data from the following link: https://www.ncei.noaa.gov/pub/data/ghcn/daily/by_station/


#### NOAA Data Extraction

In [2]:
# Base URL for the dataset
BASE_URL = "https://www.ncei.noaa.gov/pub/data/ghcn/daily/by_station/"

# Directory to save files
DATA_DIR = "ghcn_us_data"
os.makedirs(DATA_DIR, exist_ok=True)

# Output file
OUTPUT_FILE = "ghcn_daily_combined.csv"

#### Inspect the html of the webpage to find the links to the data files

In [3]:
# Find the correct html references for the requests to download the data
def fetch_and_inspect():
    response = requests.get(BASE_URL)
    response.raise_for_status()
    print("HTML Response:")
    print(response.text[:1000]) 

fetch_and_inspect()

HTML Response:
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
 <head>
  <title>Index of /pub/data/ghcn/daily/by_station</title>
 </head>
 <body>
<h1>Index of /pub/data/ghcn/daily/by_station</h1>
  <table>
   <tr><th><a href="?C=N;O=D">Name</a></th><th><a href="?C=M;O=A">Last modified</a></th><th><a href="?C=S;O=A">Size</a></th><th><a href="?C=D;O=A">Description</a></th></tr>
   <tr><th colspan="4"><hr></th></tr>
<tr><td><a href="/pub/data/ghcn/daily/">Parent Directory</a></td><td>&nbsp;</td><td align="right">  - </td><td>&nbsp;</td></tr>
<tr><td><a href="ACW00011604.csv.gz">ACW00011604.csv.gz</a></td><td align="right">2024-12-09 07:28  </td><td align="right">3.9K</td><td>&nbsp;</td></tr>
<tr><td><a href="ACW00011647.csv.gz">ACW00011647.csv.gz</a></td><td align="right">2024-12-09 07:28  </td><td align="right"> 39K</td><td>&nbsp;</td></tr>
<tr><td><a href="AE000041196.csv.gz">AE000041196.csv.gz</a></td><td align="right">2024-12-09 07:28  </td><td align="right">250K</td><t

Based on above the data is in the  'a' tag and the data is in the form of csv files. The data is extracted from the csv files and the data is cleaned and stored in the form of dataframes. The data is then used for further analysis.

#### Function for downloading the daily NOAA data for the US sites

In [None]:
# Worker function to download and extract a single file
def process_file(us_file):
    file_url = BASE_URL + us_file
    file_path = os.path.join(DATA_DIR, us_file)
    extracted_path = file_path.replace(".gz", "")

    # Download the file if not already downloaded
    if not os.path.exists(file_path):
        try:
            response = requests.get(file_url, timeout=30)
            response.raise_for_status()
            with open(file_path, 'wb') as f:
                f.write(response.content)
        except Exception as e:
            return f"Error downloading {file_url}: {e}"

    # Extract the file
    if not os.path.exists(extracted_path):
        try:
            with gzip.open(file_path, 'rb') as f_in:
                with open(extracted_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
        except Exception as e:
            return f"Error extracting {file_path}: {e}"

    return extracted_path


# Download and extract files with parallelization
def download_and_extract_us_files():
    # Fetch the directory listing
    response = requests.get(BASE_URL)
    response.raise_for_status()

    # Parse the HTML response
    soup = BeautifulSoup(response.text, 'html.parser')

    # Find all anchor tags
    links = soup.find_all('a')

    # Extract and filter links ending with ".csv.gz"
    us_files = [link.get('href') for link in links if link.get('href') and link.get('href').startswith('US') and link.get('href').endswith('.csv.gz')]

    print(f"Found {len(us_files)} files to process.")

    extracted_files = []
    errors = []

    # Use ThreadPoolExecutor for parallel processing
    with ThreadPoolExecutor(max_workers=8) as executor: 
        futures = {executor.submit(process_file, us_file): us_file for us_file in us_files}

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing files", unit="file"):
            try:
                result = future.result()
                if isinstance(result, str) and result.startswith("Error"):
                    errors.append(result)
                else:
                    extracted_files.append(result)
            except Exception as e:
                errors.append(f"Unexpected error: {e}")

    print(f"Processed {len(extracted_files)} files successfully.")
    print(f"Encountered {len(errors)} errors.")
    
    # Log errors 
    if errors:
        with open("error_log.txt", "w") as log_file:
            for error in errors:
                log_file.write(error + "\n")
        print("Errors logged to 'error_log.txt'.")

    return extracted_files

#### Function for consolidating data from multiple files


In [5]:
# Get all the data in one csv file 
def consolidate_to_csv(files, output_file, batch_size=1000):
    # Open the output file for writing
    with open(output_file, "w") as f_out:
        # Use tqdm for an overall progress bar
        with tqdm(total=len(files), desc="Consolidating files", unit="file") as pbar:
            for i in range(0, len(files), batch_size):
                batch_files = files[i : i + batch_size]
                combined_data = []
                for file in batch_files:
                    try:
                        # Read data into a DataFrame
                        data = pd.read_csv(
                            file,
                            header=None,
                            names=["ID", "DATE", "ELEMENT", "DATA_VALUE", "M_FLAG", "Q_FLAG", "S_FLAG", "OBS_TIME"],
                            dtype={
                                "ID": str,
                                "DATE": str,
                                "ELEMENT": str,
                                "DATA_VALUE": "float64",
                                "M_FLAG": str,
                                "Q_FLAG": str,
                                "S_FLAG": str,
                                "OBS_TIME": str,
                            },
                        )
                        combined_data.append(data)
                    except Exception as e:
                        pbar.write(f"Error processing {file}: {e}")
                    finally:
                        pbar.update(1)

                # Concatenate and write the batch to the output file
                if combined_data:
                    batch_df = pd.concat(combined_data, ignore_index=True)
                    batch_df.to_csv(f_out, index=False, header=(i == 0), mode="a")  # Write header only for the first batch

#### Download and Extract File paths 

In [None]:
if __name__ == "__main__":
    # Download and extract files
    print("Downloading and extracting files...")
    extracted_files = download_and_extract_us_files()

    # Save the list of extracted files to a text file
    with open("extracted_files.txt", "w") as f:
        for file in extracted_files:
            f.write(file + "\n")

    print(f"Extracted file paths saved to 'extracted_files.txt'.")

Downloading and extracting files...
Found 74429 files to process.


Processing files: 100%|██████████| 74429/74429 [00:01<00:00, 52603.01file/s]

Processed 74429 files successfully.
Encountered 0 errors.
Extracted file paths saved to 'extracted_files.txt'.





#### Consolidate Data into a Single CSV

In [None]:
# Add data to one csv file
if __name__ == "__main__":
    # Consolidate data into a single CSV
    if os.path.exists(OUTPUT_FILE):
        print(f"Output file '{OUTPUT_FILE}' already exists. Skipping consolidation.")
    else:
        print("Reading extracted file paths...")
        try:
            with open("extracted_files.txt", "r") as f:
                extracted_files = [line.strip() for line in f.readlines()]
        except FileNotFoundError:
            print("Error: 'extracted_files.txt' not found. Please run Step 1 first.")
            exit(1)

        print(f"Found {len(extracted_files)} files to consolidate.")
        print("Consolidating data into a single CSV file...")

        
        consolidate_to_csv(extracted_files, OUTPUT_FILE)

        print(f"Data consolidated into '{OUTPUT_FILE}'.")

Reading extracted file paths...
Found 74429 files to consolidate.
Consolidating data into a single CSV file...


Consolidating files: 100%|██████████| 74429/74429 [56:54<00:00, 21.80file/s]   


Data consolidated into 'ghcn_daily_combined.csv'.


# Create a database to pull from for the analysis

In [21]:
# keep credentials in .env file
load_dotenv()
file_path = "./ghcn_daily_combined.csv"
# Create a connection to the PostgreSQL database
DATABASE_URI = f"postgresql://{os.environ['PG_USER']}:{os.environ['PG_PASS']}@localhost/us_data"
engine = create_engine(DATABASE_URI)

#### Database Creation

In [25]:
from sqlalchemy import create_engine, text
# Function to create the table
def create_table():
    create_table_query = text("""
    CREATE TABLE IF NOT EXISTS large_table (
        ID TEXT,
        DATE TEXT,
        ELEMENT TEXT,
        DATA_VALUE DOUBLE PRECISION,
        M_FLAG TEXT,
        Q_FLAG TEXT,
        S_FLAG TEXT,
        OBS_TIME TEXT
    );
    """)
    with engine.connect() as conn:
        conn.execute(create_table_query)
        print("Table 'large_table' created or already exists.")

#### Import into postgresql

In [23]:
# Load data into the table
def import_csv_to_postgres(file_path):
    # Read and load in chunks to handle large files
    chunk_size = 1_000_000
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        try:
            # Append data to the table
            chunk.to_sql("large_table", engine, if_exists="append", index=False)
            print(f"Loaded a chunk of {len(chunk)} rows.")
        except Exception as e:
            print(f"Error loading data: {e}")

#### Execute the creation of the database

In [None]:
# Execute the steps
if __name__ == "__main__":
    create_table()
    import_csv_to_postgres(file_path)
    print("CSV file imported successfully into PostgreSQL.")

Table 'large_table' created or already exists.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.
Loaded a chunk of 1000000 rows.


#### Test Connection

In [None]:
# Test connection
try:
    with engine.connect() as conn:
        print("Connection successful!")
except Exception as e:
    print(f"Connection failed: {e}")

# 2. **Exploratory Data Analysis:**  

- Air temperature
- Precipitation
- Groundwater level 

   - Visualizing historical trends in air temperature, precipitation, and groundwater levels.  
   - Identifying seasonal patterns and anomalies.

# 3. **Statistical Analysis:** 


   - Applying statistical methods to investigate climate anomalies.  
   - Comparing long-term trends across the selected time periods.

# 4. **Results and Discussion:**  

   - Highlighting significant findings from the analysis.  
   - Discussing the implications of identified climate anomalies on the selected region.

# 5. **Conclusion:**  
   - Summarizing key insights and potential areas for further study.