# Introduction

This notebook illustrates the steps I undertook to obtain, format, and store 9 different climate normal datasets onto a Postgresql database.

These climate normals were calculated by the National Centers for Environmental Information (NCEI) for weather stations located across the United States. Climate Normals act as a way to compare present weather and future forecasts to climatological events. The normals are generated every 10 years for each of the previous 30 year period. The climate normals time periods are the 1981-2010, 1991-2020, and the 15-year period climate normal 2006-2020.

For each of these time periods, hourly, daily, and monthly datasets were generated. In total, 9 different datasets were obtained and stored into a Postgresql database.

The climate normals can be located at the following links:

* 1981-2010
    * [hourly](https://www.ncei.noaa.gov/data/normals-hourly/1981-2010/archive/), [daily](https://www.ncei.noaa.gov/data/normals-daily/1981-2010/archive/), [monthly](https://www.ncei.noaa.gov/data/normals-monthly/1981-2010/archive/)
    
* 1991-2020
    * [hourly](https://www.ncei.noaa.gov/data/normals-hourly/1991-2020/archive/), [daily](https://www.ncei.noaa.gov/data/normals-daily/1991-2020/archive/), [monthly](https://www.ncei.noaa.gov/data/normals-monthly/1991-2020/archive/)
    
* 2006-2020
    * [hourly](https://www.ncei.noaa.gov/data/normals-hourly/2006-2020/archive/), [daily](https://www.ncei.noaa.gov/data/normals-daily/2006-2020/archive/), [monthly](https://www.ncei.noaa.gov/data/normals-monthly/2006-2020/archive/)

In [5]:
import os
from pathlib import Path
import importlib
import multiprocessing
from functools import partial

import numpy as np
import pandas as pd
pd.set_option('display.max_colwidth', None)
pd.set_option('colheader_justify', 'left')
pd.set_option('display.max_rows', 20)

import psycopg2 as psql

import climate_normal_scripts as cns
importlib.reload(cns) # reloads script if script was changed after notebook kernel started

dataFolder = cns.dataFolder
txtFiles_Folder = cns.txtFiles_Folder
# normals_dict = cns.normals_dict
normals_dict = {'1981-2010': {'normals-hourly': { },'normals-daily': { }, 'normals-monthly': { }},
                '1991-2020': {'normals-hourly': { },'normals-daily': { }, 'normals-monthly': { }},
                '2006-2020': {'normals-hourly': { },'normals-daily': { }, 'normals-monthly': { }},
                }

In [2]:
def combine_Files(combined_file, files_toCombine):
    if os.path.exists(combined_file):
        return print(f'{combined_file} exists...')
    
    with open(combined_file, 'w') as outfile:
        initial = 1
        for file in files_toCombine:
            with open(file, 'r') as infile:
                header = infile.readline()
                if initial:
                    outfile.write(f'{header}')
                    initial = 0
                outfile.write(infile.read())
    return print(f'{combined_file} created...')

In [6]:
# get climate normals files
normal_periods = ['1981-2010','1991-2020','2006-2020']
normal_types = ['normals-hourly','normals-daily', 'normals-monthly']
main_url = 'https://www.ncei.noaa.gov/data/'

for normal_period, normal_types in normals_dict.items():
    for normal_type in normal_types:
        files_location = f'{dataFolder}{normal_period}/{normal_type}/'
        cns.get_climate_normals(main_url, normal_type, normal_period, files_location)
        normals_dict[normal_period][normal_type] = {'files_location': files_location,
                                                    'inventory_file': f'{txtFiles_Folder}station_inventory/station-inventory-{normal_period}_{normal_type}.csv',
                                                    'variables_file': f'{txtFiles_Folder}variables/variables-{normal_period}-{normal_type}.csv'}

cns.generate_header_files()

Getting 1981-2010 normals...
1981-2010 normals-hourly already downloaded...
Getting 1981-2010 normals...
1981-2010 normals-daily already downloaded...
Getting 1981-2010 normals...
1981-2010 normals-monthly already downloaded...
Getting 1991-2020 normals...
1991-2020 normals-hourly already downloaded...
Getting 1991-2020 normals...
1991-2020 normals-daily already downloaded...
Getting 1991-2020 normals...
1991-2020 normals-monthly already downloaded...
Getting 2006-2020 normals...
2006-2020 normals-hourly already downloaded...
Getting 2006-2020 normals...
2006-2020 normals-daily already downloaded...
Getting 2006-2020 normals...
2006-2020 normals-monthly already downloaded...
Headers files generated at ./txt_files/csv_headers/


In [None]:
def format_date(df, normal_type):
    '''
    Function which reads in the DATE field and compares to the expected Full time range. If there are any missing time records, those missing times are added
    into the record, with nan values for the corresponding weather climate normals for those missing times. Populates the month,day,hour fields based off the new DATE field. 

    Years are dummy values. For Daily files, the year is set to a leap year to also include the Leap day. Leap day is needed in hours and months files only.
    '''

    if normal_type == 'normals-hourly':
        full_range = pd.date_range(start= '1900-01-01 00:00:00', end = '1900-12-31 23:00:00', freq = 'H') # timeindex for all hours 
        dateOUT_format = '%b-%d %H:%M'
        df.DATE = pd.to_datetime(df.DATE, format = '%m-%dT%X')                                            # reformats DATE column into timeindex for comparison
        
    elif normal_type == 'normals-daily':
        full_range = pd.date_range(start= '1904-01-01', end = '1904-12-31', freq = 'D')
        dateOUT_format = '%b-%d'
        df.DATE = pd.to_datetime(df.DATE + '-1904', format = '%m-%d-%Y')                                  # reformats DATE column, adds year column to avoid error for leap year
    
    elif normal_type == 'normals-monthly':
        full_range = pd.date_range(start= '1900-01', end = '1900-12', freq = 'MS')
        dateOUT_format = '%b'
        df.DATE = pd.to_datetime(df.DATE, format = '%m')                                                  # reformats DATE column

    df = df.set_index('DATE').reindex(full_range)                                                     # sets DATE column to index then adds rows to record based off missing dates
    df.reset_index(inplace=True,drop=True)                                                            # resets index back to default dropping old DATE column
    df['DATE'] = full_range.strftime(dateOUT_format)
    
    df.month = full_range.month
    df.day = full_range.day
    df.hour = full_range.hour

    return df

In [None]:
def format_variables(df, headers):
    tenths = [header.split(',')[0] for header in headers if header.split(',')[1] == 'Tenths']
    hundredths = [header.split(',')[0] for header in headers if header.split(',')[1] == 'Hundredths']
    wind_dir = [header.split(',')[0] for header in headers if header.split(',')[1] == 'Wind_Direction']
    wind_dir_labels = {1.0:'N', 2.0:'NE', 3.0:'E', 4.0:'SE', 5.0: 'S', 6.0:'SW', 7.0:'W', 8.0:'NW'} 

    df[tenths] = df[tenths].divide(10)
    df[hundredths] = df[hundredths].divide(100)
    df[wind_dir] = df[wind_dir].replace(wind_dir_labels)
    return df

def format_file(raw_file,headers,normal_type):
    formatted_File = f'{files_location}formatted_files/{raw_file.name}'
    if not os.path.exists(formatted_File):
        df = pd.read_csv(raw_file)        

        colsName = [x.split(',')[0] for x in headers]
            
            ## standardize headers: adds missing columns(if any) and reorders columns based off headers
        df = df.reindex(columns=colsName) 
        df = df.replace(to_replace=[-9999.0, -7777.0, -6666.0, -4444.0, ' '], value= np.nan) 
        df = format_date(df, normal_type)
        df = format_variables(df, headers[5:])
        df = df.reindex(columns=colsName) # ensures columns in correct order after formatting file
        df = df.replace(to_replace=[' ',''], value= np.nan) 
        df = df.fillna(value= {'STATION': df.STATION.mode()[0]}) # station metadata to replace NaN in the metadata column
        df.to_csv(formatted_File,index=False)

In [None]:
%%time
# standardizing normal files
for normal_period, normal_types in normals_dict.items():
    for normal_type in normal_types:
        headerFile = f'{txtFiles_Folder}csv_headers/headers-{normal_period}-{normal_type}.txt'
        normals_dict[normal_period][normal_type]['headerFile'] = headerFile
        headers = []
        with open(headerFile, 'r') as hfile:
            lines = hfile.readlines()
            for line in lines:
                headers.append(line.strip('\n'))

        files_location = f'{normals_dict[normal_period][normal_type]["files_location"]}'    
        if not os.path.exists(f'{files_location}formatted_files/'):
                os.makedirs(f'{files_location}formatted_files/')

        print(f'Reformatting files: {normal_period} - {normal_type}...')
        normal_files = list(Path(f'{files_location}station_files/').glob('*.csv'))
        normals_dict[normal_period][normal_type]['Total_Stations'] = len(normal_files)

        part_format = partial(format_file, headers = headers, normal_type = normal_type) # make into partial function to be applied to map 
        pool = multiprocessing.Pool()
        pool.map(part_format, normal_files)
        pool.close()
        
        # print(f'{normal_period} - {normal_type} files formatted...\n')

Now to combine all the normal files together for the same type and same time period.

In [None]:
%%time
for normal_period, normal_types in normals_dict.items():
    for normal_type in normal_types:
        files_location = f'{normals_dict[normal_period][normal_type]["files_location"]}'
        files_toCombine = Path(f'{files_location}formatted_files').glob('*.csv')
        combined_file = f'{files_location}combined_{normal_period}-{normal_type}.csv'
        combine_Files(combined_file, files_toCombine)
        normals_dict[normal_period][normal_type]['combined_file'] = combined_file


Connect to postgres server to create a database for the climate normals: climate_normals_db. 

The tables will be:
- states
- HSI_1981_2010      : 1981-2010 hourly station inventory
- HVAR_1981_2010     : 1981-2010 hourly variables : 
- HNORMALS_1981_2010 : 1981-2010 hourly normals
- DSI_1981_2010      : 1981-2010 daily station inventory
- DVAR_1981_2010     : 1981-2010 daily variables
- DNORMALS_1981_2010 : 1981-2010 daily normals
- MSI_1981_2010      : 1981-2010 monthly station inventory
- MVAR_1981_2010     : 1981-2010 monthly variables
- MNORMALS_1981_2010 : 1981-2010 monthly normals
- HSI_1991_2020      : 1991-2020 hourly station inventory
- HVAR_1991_2020     : 1991-2020 hourly variables
- HNORMALS_1991_2020 : 1991-2020 hourly normals
- DSI_1991_2020      : 1991-2020 daily station inventory
- DVAR_1991_2020     : 1991-2020 daily variables
- DNORMALS_1991_2020 : 1991-2020 daily normals
- MSI_1991_2020      : 1991-2020 monthly station inventory
- MVAR_1991_2020     : 1991-2020 monthly variables
- MNORMALS_1991_2020 : 1991-2020 monthly normals
- HSI_2006_2020      : 2006-2020 hourly station inventory
- HVAR_2006_2020     : 2006-2020 hourly variables
- HNORMALS_2006_2020 : 2006-2020 hourly normals
- DSI_2006_2020      : 2006-2020 daily station inventory
- DVAR_2006_2020     : 2006-2020 daily variables
- DNORMALS_2006_2020 : 2006-2020 daily normals
- MSI_2006_2020      : 2006-2020 monthly station inventory
- MVAR_2006_2020     : 2006-2020 monthly variables
- MNORMALS_2006_2020 : 2006-2020 monthly normals

In [None]:
from configparser import ConfigParser

def config(filename='./sql/database.ini', section='postgresql'):
    # create a parser
    parser = ConfigParser()
    # read config file
    parser.read(filename)

    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, filename))

    return db

def connect_db(db_Name):
    """ Connect to the database"""
    try:
        # read connection parameters
        params = config()

        # connect to the PostgreSQL server
        print(f'Connecting to the {db_Name} database...')
        conn = psql.connect(database=db_Name, **params)

    except psql.OperationalError as e:
        print(f'There is no database named {db_Name}...')
        create_db(db_Name)
        conn.close()
        return connect_db(db_Name)
    else:
        print(f'***Connected: {db_Name}***')
        return conn
        
def create_db(db_Name):
    # read connection parameters
    params = config()
    print(f'Connecting to postgres database to create new database:{db_Name}')
    conn = psql.connect(database='postgres',**params)
    conn.autocommit = True
    cursor = conn.cursor()
    sql=f'CREATE database {db_Name}'

    try:
        cursor.execute(sql)
        cursor.close()
    except psql.errors.lookup('42P04'): #psql error code for duplicatedatabase
        conn.close()
        cursor.close()
        return print(f"Database: [{db_Name}] already exists....")
    else:
        conn.close()
        cursor.close()
        return print(f"***Database: {db_Name} created successfully***")

def execute_query(connection, query):
    connection.autocommit = True
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        print("Query executed successfully")
    except OperationalError as e:
        print(f"The error '{e}' occurred")

def execute_read_query(connection, query):
    cursor = connection.cursor()
    result = None
    try:
        cursor.execute(query)
        result = cursor.fetchall()
        return result
    except OperationalError as e:
        print(f"The error '{e}' occurred")

In [None]:
conn = connect_db('climate_normals_db')

In [None]:
create_state_table = """
DROP TABLE states;
CREATE TABLE IF NOT EXISTS states(
    statelong TEXT NOT NULL,
    stateabbr CHAR(2) PRIMARY KEY
)
"""
execute_query(conn, create_state_table)

cur = conn.cursor()
with open(f'{txtFiles_Folder}states.csv', 'r') as csv:
    next(csv) # skips header
    cur.copy_from(csv, 'states', sep=',')

conn.commit()
cur.close()

In [None]:
# creation of variable tables
for normal_period, normal_types in normals_dict.items():
    for normal_type in normal_types:
        if normal_type == 'normals-hourly': temp = 'hly'
        elif normal_type == 'normals-daily': temp = 'dly'
        elif normal_type == 'normals-monthly': temp = 'mly'      
        tableName = f'var_{temp}_{normal_period}'.replace('-','_')   # Replaces - with _ to play nicer with postgres. 

        create_variable_table = f"""
        DROP TABLE IF EXISTS {tableName};
        CREATE TABLE IF NOT EXISTS {tableName}(
            variable TEXT PRIMARY KEY,
            description TEXT NOT NULL,
            units TEXT NOT NULL
        )
        """
        execute_query(conn, create_variable_table)
        
        cur = conn.cursor()
        with open(normals_dict[normal_period][normal_type]['variables_file'], 'r') as csv:
            cur.copy_from(csv, tableName, sep=',')
        conn.commit()
        cur.close()     

In [None]:
# creation of station inventory tables
for normal_period, normal_types in normals_dict.items():
    for normal_type in normal_types:
        if normal_type == 'normals-hourly': temp = 'hly'
        elif normal_type == 'normals-daily': temp = 'dly'
        elif normal_type == 'normals-monthly': temp = 'mly'        
        tableName = f'si_{temp}_{normal_period}'.replace('-','_')
        
        # create_SI_tables = f"""
        # DROP TABLE IF EXISTS {tableName};"""
        # execute_query(conn, create_SI_tables)

        create_SI_tables = f"""
        DROP TABLE IF EXISTS {tableName} CASCADE;
        CREATE TABLE IF NOT EXISTS {tableName}
        (
            "stationID" TEXT PRIMARY KEY,
            latitude  NUMERIC NOT NULL,
            longitude NUMERIC NOT NULL,
            elevation NUMERIC NOT NULL,
            state CHAR(2) NOT NULL,
            "stationName" TEXT NOT NULL,
            network TEXT,
            "wmoID" TEXT
        )
        """

        # execute_query(conn, create_SI_tables)
        # cur = conn.cursor()
        # with open(normals_dict[normal_period][normal_type]['inventory_file'], 'r') as csv:
        #     next(csv) #skips header
        #     cur.copy_from(csv, tableName, sep=',')

        # conn.commit()

        # alter_SI_tables = f"""
        # ALTER TABLE {tableName} ADD COLUMN coordinates GEOMETRY(POINT, 4326);
        # UPDATE {tableName} SET coordinates = ST_SetSRID(ST_MakePoint(longitude, latitude), 4326);
        # """
        # execute_query(conn, alter_SI_tables)
        # cur.close()

In [None]:
# creation of normals tables
for normal_period, normal_types in normals_dict.items():
    for normal_type in normal_types:
        if normal_type == 'normals-hourly': temp = 'hly'
        elif normal_type == 'normals-daily': temp = 'dly'
        elif normal_type == 'normals-monthly': temp = 'mly'        
        tableName = f'normals_{temp}_{normal_period}'.replace('-','_')

        normals_list = '"stationID" TEXT NOT NULL, date TEXT NOT NULL, month SMALLINT NOT NULL, day SMALLINT NOT NULL, hour SMALLINT NOT NULL,'

        with open(normals_dict[normal_period][normal_type]['headerFile'], 'r') as header:
            lines = header.readlines()[5:]

        for count, line in enumerate(lines):
            var = line.strip('\n').split(',')[0]
            if '1STDIR' in var or '2NDDIR' in var:
                normals_list += f'"{var}" TEXT,'
                continue

            if normal_period == '1981-2010':
                if count % 2 == 0:
                    normals_list += f'"{var}" NUMERIC,'
                else:
                    normals_list += f'"{var}" TEXT,'
                continue

            if count % 4 == 0 or count % 4 == 3:
                normals_list += f'"{var}" NUMERIC,'
            else:
                normals_list += f'"{var}" TEXT,'
                
        normals_list = normals_list.rstrip(',')
            
        parentTable = f'si_{temp}_{normal_period}'.replace('-','_')
        create_normals_tables = f"""
        DROP TABLE IF EXISTS {tableName};
        CREATE TABLE IF NOT EXISTS {tableName}
        (
            {normals_list},
            CONSTRAINT "fk_stationID"
                FOREIGN KEY("stationID")
                    REFERENCES {parentTable}("stationID")
                    ON DELETE CASCADE
        )
        """
        # execute_query(conn, create_normals_tables)

        # cur = conn.cursor()
        # with open(normals_dict[normal_period][normal_type]['combined_file'], 'r') as csv:
        #     next(csv) #skips header
        #     cur.copy_from(csv, tableName, sep=',', null="")
        # print(f'{tableName} created and filled...')
        # conn.commit()
        # cur.close()

In [None]:
normals_dict[normal_period][normal_type]

In [None]:
cur.close()

In [None]:
conn.close()
