# GSOD ETL Harvester Framework - Downoad Station Data

This notebook will focus on initial ETL techniques downloading various data sets, initially starting with weather

## Weather Station Data - Global Surface Summary of the Day (GSOD)

Here, I use GSOD_directory.txt (which is just copied from the web directory HTML page at NOAA) and NOAA_GSOD_stations_clean.txt, an additional file that I found on NOAA’s website, which has a list of all of the weather stations in the GSOD database. Based on some earlier versions of this script, I also exclude a few specific stations because they have a lot of missing data or other issues.

The station locations is a fixed-width file, which makes reading it very tedious. Nevertheless, we end up with a great plot of all of the GSOD weather stations in the world!

We will be gathering data from NOAA Global Surface Summary of the Day - GSOD. 

We will be download data from the https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/2020

# Changelog / To-Do  

 * **2020-06-26**: Setup mongodb, storing some raw gsod records 
 * **2020-06-11**: Initial creation of framework
 * **2020-06-05**: Initial download of GSOD data file list for 2020
 * **2020-05-15**: Project started - covid fusion

**To-do**

* Looking to create a separate gsod library
* don't re-download gsod data, if data is in dir (check file list size against dir list size)
* print out message every  100, or 500 downloads, also print number of lines in filelist
* put a pause in the download (DON"T DDOS the site)
* Ultimately, the goal is to create a gsod_etl_harvester framework
* clean up checking for file in path... define one function
* check to see if you can clean up and use global varaibles.. 

## Table of Contents


In [None]:
import requests
from bs4 import BeautifulSoup
import csv
import os.path
from os import path
import logging
import sys
import pymongo
import pandas as pd
import json

covid_fusion_year = ""
gsod_noaa_url = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/2020/"
gsod_file_ext_type = "csv"
gsod_data_dir = "../../data/interim/weather/gsod/"
gsod_directory_list_file = "gsod-url-file-list-2020.txt"
gsod_file_number_from_filelist = 0
gsod_file_number_from_dir = 0
gsod_mongo_collection_name = "covid-fusion-gsod-data"

########################################################    
#
def setup_custom_logger(name):
    formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(funcName)s - %(module)s - %(message)s')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)
    return logger

########################################################    
#
def initialize_etl_harvester():
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.INFO)

    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(levelname)s %(module)s %(funcName)s %(message)s',
#                    handlers=[logging.FileHandler("my_log.log", mode='w'),
#                              stream_handler])
                        handlers=[stream_handler])
        
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# handler = logging.StreamHandler(sys.stdout)
# handler.setFormatter(formatter)

    log = logging.getLogger()
# log.setLevel(logging.DEBUG)

########################################################    
#
def shutdown_etl_harvester():
    # remember to close the handlers
    for handler in logger.handlers:
        handler.close()

def get_gsod_station_location_file(url_pathname, local_pathname):
    r = requests.get(url_pathname)
 
    if r.status_code != 404:
        with open(local_pathname, 'wb') as fp:
            fp.write(r.content)
    else:
        logger.error("Response 404: Extracting gsod data file: (" + url_pathname + ")")


########################################################    
# This function reads a list of files with a specific extention from a 
# remote server. 
#
def get_url_paths(url, ext='', params={}):
    response = requests.get(url, params=params)
    if response.ok:
        response_text = response.text
    else:
        return response.raise_for_status()
    soup = BeautifulSoup(response_text, 'html.parser')
    parent = [url + node.get('href') for node in soup.find_all('a') if node.get('href').endswith(ext)]
    return parent

########################################################    
#
def get_gsod_directory_filelist(url, ext, pathname):
    logger.info('get_gsod_directory_filelist: Extracting gsod file data list from: ' + url + ' file extension: ' + ext)
    result = get_url_paths(url, ext)
    # print(result)
    with open(pathname, 'w') as fp:
        fp.writelines("%s\n" % url for url in result)
    return

########################################################    
#
def gsod_filelist_exists(pathname):
    if path.exists(pathname):
         return True
    else:
         return False

########################################################    
#
def get_gsod_directory_filelist(url, ext, pathname):
    logger.info('get_gsod_directory_filelist: Extracting gsod station location file from: ' + url + ' file extension: ' + ext)
    result = get_url_paths(url, ext)
    # print(result)
    with open(pathname, 'w') as fp:
        fp.writelines("%s\n" % url for url in result)
    return
        
########################################################    
#
def gsod_station_location_file_exists(pathname):
    if path.exists(pathname):
         return True
    else:
         return False
        
########################################################    
#
def count_gsod_files_in_dir(dir):
    return len([1 for x in list(os.scandir(dir)) if x.is_file()])
        
########################################################    
#
def are_gsod_files_extracted(pathname, gsod_dir):
    global gsod_file_number_from_filelist
    global gsod_file_number_from_dir

    logger.info('Checking if gsod files have been downloaded already')
    gsod_file_number_from_filelist = sum(1 for line in open(pathname))
    gsod_file_number_from_dir = count_gsod_files_in_dir(gsod_dir)

    logger.info('filelist count: %d', gsod_file_number_from_filelist)
    logger.info('dirlist count: %d', gsod_file_number_from_dir)

    logger.info('Checking if gsod files have been downloaded already')

    if gsod_file_number_from_filelist == gsod_file_number_from_dir:
        return True
    else:
        return False
    
        
########################################################    
#
def get_gsod_file(gsod_file_url, gsod_dir):
    head, gsod_filename = os.path.split(gsod_file_url)
    logger.debug('get_gsod_file: Extracting gsod data file: (' + gsod_file_url + ')')
    r = requests.get(gsod_file_url)
    
    if r.status_code != 404:
        with open(gsod_dir+'/'+gsod_filename, 'wb') as fp:
            fp.write(r.content)
        return True
    else:
        logger.warning('Response 404: Extracting gsod data file: (' + gsod_file_url + ')')
        return False
        
########################################################    
#
def extract_gsod_files(gsod_filelist_pathname, gsod_dir):
    logger.info('Extracting gsod data to dir: ' + gsod_dir)
    count = 0
    with open(gsod_filelist_pathname) as fp:
        for gsod_file_url in fp:
            if get_gsod_file(gsod_file_url.strip('\n'), gsod_dir) == True:
                count += 1

    logger.info('Extracted file count: %d', count)

########################################################    
# associated gsod data to county using lat/lon
# (use a hash per county, only using one station, or average all station data???)
#

########################################################    
#
def store_normalized_gsod_data(gsod_dir, gsod_collection_name):
    logger.info('Load gsod data from: ' + gsod_dir +  ' into datastore: ' + gsod_collection_name)
    
#    mg_client = pymongo.MongoClient('localhost', 27012)
    mg_client = pymongo.MongoClient()
    mg_db = mg_client['covid_fusion']
    collection_name = 'gsod_raw_data'
    db_cm = mg_db[collection_name]
    
    data = pd.read_csv(gsod_dir+"/00841599999.csv")
    data_json = json.loads(data.to_json(orient='records'))
    result = db_cm.delete_many({})
    logger.info('delete_many: %s', result.acknowledged)
    logger.info('delete_many: num docs deleted: %s', result.deleted_count)    
    result = db_cm.insert_many(data_json)
#    logger.info('insert: %s', result.acknowledged)
#    logger.info('insert: num docs inserted: %s', result.deleted_count)    

    logger.info('Finished loading gsod data')
    
    
#    mng_client = pymongo.MongoClient('localhost', 27017)
#    mng_db = mng_client['mongodb_name'] // Replace mongo db name
#    collection_name = 'collection_name' // Replace mongo db collection name
#    db_cm = mng_db[collection_name]
#    cdir = os.path.dirname(__file__)
#    file_res = os.path.join(cdir, filepath)

#    data = pd.read_csv(file_res)
#    data_json = json.loads(data.to_json(orient='records'))
#    db_cm.remove()
#    db_cm.insert(data_json)


    

########################################################    
#
# GSOD ETL Harvester POC
#  
########################################################
gsod_pathname = "../../data/interim/weather/gsod/"

logger = setup_custom_logger('GSOD-ETL-Station-Metadata')
initialize_etl_harvester()
logger.info('Weather GSOD ETL - Started')

gsod_filelist_pathname = gsod_data_dir + gsod_directory_list_file

# The filelist is extracted from 
# get file list, if it doesn't exist
if gsod_filelist_exists(gsod_pathname) == False:
    get_gsod_directory_filelist(gsod_noaa_url, gsod_file_ext_type, gsod_pathname)
else:
    logger.info('file exists: ' + gsod_pathname)
    
# get station data location file, if doesn't exist
if gsod_station_location_file_exists(gsod_filelist_pathname) == False:
    get_gsod_station_location_file(gsod_noaa_url, gsod_filelist_pathname)
else:
    logger.info('station location file exists: ' + gsod_filelist_pathname)

# setup to extract the data files
gsod_extract_dir = gsod_data_dir + covid_fusion_year

# see if files have been extracted
if are_gsod_files_extracted(gsod_filelist_pathname, gsod_extract_dir) == False:
    # extract the files
    logger.info('gsod files not extracted filelist count is: %d', gsod_file_number_from_filelist)
    extract_gsod_files(gsod_filelist_pathname, gsod_extract_dir)
else:
    logger.info('gsod files already extracted')
    
# move file data into mongo to prep for transformation to county lat/lon
store_normalized_gsod_data(gsod_extract_dir, gsod_mongo_collection_name)

logger.info('Weather GSOD ETL - Finished')

shutdown_etl_harvester()

2024-10-27 21:49:12,547 - INFO - <module> - 1145773803 - Weather GSOD ETL - Started
2024-10-27 21:49:12,547 - INFO - <module> - 1145773803 - Weather GSOD ETL - Started
2024-10-27 21:49:12,547 - INFO - <module> - 1145773803 - Weather GSOD ETL - Started
2024-10-27 21:49:12,547 - INFO - <module> - 1145773803 - Weather GSOD ETL - Started
2024-10-27 21:49:12,547 - INFO - <module> - 1145773803 - Weather GSOD ETL - Started
2024-10-27 21:49:12,547 - INFO - <module> - 1145773803 - Weather GSOD ETL - Started
2024-10-27 21:49:12,547 - INFO - <module> - 1145773803 - Weather GSOD ETL - Started
2024-10-27 21:49:12,547 INFO 1145773803 <module> Weather GSOD ETL - Started
2024-10-27 21:49:12,553 - INFO - <module> - 1145773803 - file exists: ../../data/interim/weather/gsod/
2024-10-27 21:49:12,553 - INFO - <module> - 1145773803 - file exists: ../../data/interim/weather/gsod/
2024-10-27 21:49:12,553 - INFO - <module> - 1145773803 - file exists: ../../data/interim/weather/gsod/
2024-10-27 21:49:12,553 - I

Save the file list of files first. (if file exists, then don't read in, for now), 
Need to read in each url, and write into mongo database. 