#### Loading Libraries

In [None]:
# !pip install alive_progress

In [None]:
import os
import shutil
import tarfile
import gzip
import random

import pandas as pd
import numpy as np
from alive_progress import alive_bar
import multiprocessing

---

#### Gathering Data from `.gzip` Files

In [None]:
def gzip_to_dataframe(gz_file):

    try:
        # read in file as fixed-width file (FWF)
        df = pd.read_fwf(gzip.open(gz_file))

        # focus on observations with recorded date
        df = df[df['YEARMODA'].notna()]

        # locate and filter out coded missing values (e.g. 9999, 99.99, 99999.9)
        df_without_year = df.drop(columns='YEARMODA').astype('str'
        ).replace(to_replace=r'[9]+[.]+[9]*', value=np.nan, regex=True
        ).dropna(axis=1, how='all')
        
        df = pd.concat([df_without_year, df['YEARMODA']], axis=1)

        # remove attributes with unnamed columns (these columns strictly count how many
        # observations were used when calculating the observed mean)
        df = df.drop(columns=df.columns[df.columns.str.startswith('Unnamed')]).fillna(0.0)

        # extract date information and save into separate columns
        df[['year', 'month', 'day']] = pd.to_datetime(df['YEARMODA'], format='%Y%m%d'
        ).astype('str').str.split('-', expand=True).astype('int64')

        if df['FRSHTT'].astype('int64').sum() != 0:
            df[['fog', 'rain', 'snow', 'hail', 'thunder', 'tornado']] = df['FRSHTT'].astype('int64'
            ).replace({0:'000000'}).astype(str).apply(list).apply(pd.Series
            ).fillna(0).astype(int)
        else:
            df[['fog', 'rain', 'snow', 'hail', 'thunder', 'tornado']] = np.zeros(shape=(df.shape[0], 6)).astype(int)

        # extract precipitation values and flags and store in respective columns
        df[['precip_in', 'precip_flag']] = df['PRCP'].str.extract('(?P<precip_in>[\d.]{4})(?P<precip_flag>[A-Z]{1})'
        ).fillna({'precip_in':0, 'flag':np.nan})
        
        df['precip_in'] = df['precip_in'].astype('float64')

        # replace pointless flags fro temperature columns
        df['max_temp_frnht'] = df['MAX'].apply(lambda x: str(x).replace('*', '')).astype('float64')
        df['min_temp_frnht'] = df['MIN'].apply(lambda x: str(x).replace('*', '')).astype('float64')
        
        # remove columns we just extracted data from and rename columns
        df.drop(columns=['FRSHTT', 'YEARMODA', 'MAX', 'MIN', 'PRCP', 'WBAN'], inplace=True)
        df.rename(columns={
            'STN---':'station_num',
            'TEMP':'temp_ft',
            'DEWP':'dewpt_ft',
            'SLP':'slp_mb',
            'VISIB':'visib_mi',
            'GUST':'max_gust_knt',
            'SNDP':'snow_depth_in',
            'WDSP':'wind_knt',
            'MXSPD':'maxwind_knt'
        }, inplace=True)

        # reorder columns and return dataframe
        beg_cols = ['station_num', 'year', 'month', 'day']
        end_cols = ['precip_in', 'precip_flag']
        mid_cols = list(df.columns[~df.columns.isin(beg_cols + end_cols)])

        return df#[beg_cols + mid_cols + end_cols]
    
    except:
      pass


---

#### File Management

In [None]:
def spew_out_year_directory(year):
    gsod_dir = f'.'
    year_tar_dir = list(filter(lambda x: str(year) in x, os.listdir(gsod_dir)))[0]
    if tarfile.is_tarfile(f'{gsod_dir}/{year_tar_dir}'):
        with tarfile.open(f'{gsod_dir}/{year_tar_dir}') as tar:
            tar.extractall(path=f'./{year}_data')

In [None]:
def gather_year_directory_data(year, n_samples):
    
    spew_out_year_directory(year)
    assert os.path.isdir(f'./{year}_data'), f'./{year}_data not valid directory'
    
    df_dict = {}
    dir_files = os.listdir(f'./{year}_data')
    if n_samples > len(dir_files):
      n_samples = len(dir_files)
    
    with alive_bar(n_samples, title=f'Working on {year} ...') as bar:
        for sample in random.sample(population=range(len(dir_files)), k=n_samples):
            try:
                gz_file = dir_files[sample]
                df_dict[gz_file[:gz_file.find('-')]] = gzip_to_dataframe(f'./{year}_data/{gz_file}')
                os.remove(f'./{year}_data/{gz_file}')
                bar()
            except:
                continue
    
    return df_dict

In [None]:
def write_to_csv(year, n_samples):
    
    # gather dataframes for given year
    agg_df_dict = gather_year_directory_data(year, n_samples)

    for file in os.listdir(f'./{year}_data'):
      if file.endswith('.gz'):
        os.remove(f'./{year}_data/{file}')
    
    # write valid stations to text file for retrieval later on
    with open(f'./{year}_data/{year}stations.txt', 'w') as st_file:
        total_stations = 0
        for st in agg_df_dict.keys():
            total_stations += 1
            st_file.write(st + '\n')
    
    # write to concatenated dataframe to corresponding year folder
    pd.concat(
        agg_df_dict.values()
        ).to_csv(f'./{year}_data/{year}weatherdata.csv')

    # log results
    if os.path.isfile(f'./{year}_data/{year}stations.txt'):
        print(f'>> Total stations: {total_stations}/{n_samples}')
    if os.path.isfile(f'./{year}_data/{year}weatherdata.csv'):
        print(f'>> Success: {year}weatherdata.csv written to ./{year}_data/')
    else:
        print(f'!! Failure: {year}weatherdata.csv NOT written to ./{year}_data/')


---

#### Running ( + Trusting) the Process

In [None]:
start_year = 2001
end_year = 2007
n_samples = 10000

for year in range(start_year, end_year + 1):
  if year in [2002, 2003, 2004]:
    continue
  if f'{year}_data' in os.listdir():
    shutil.rmtree(f'{year}_data')
  write_to_csv(year, n_samples)

Working on 2001 ... |████████████████████████████████████████| 9008/9008 [100%] in 18:42.6 (8.02/s) 
>> Total stations: 8999/10000
>> Success: 2001weatherdata.csv written to ./2001_data/
Working on 2005 ... |████████████████████████████████████████| 10000/10000 [100%] in 22:07.1 (7.54/s) 
>> Total stations: 9009/10000
>> Success: 2005weatherdata.csv written to ./2005_data/
Working on 2006 ... |████████████████████████████████████████| 9463/9463 [100%] in 21:36.7 (7.30/s) 
>> Total stations: 9409/10000
>> Success: 2006weatherdata.csv written to ./2006_data/
Working on 2007 ... |████████████████████████████████████████| 9766/9766 [100%] in 22:05.5 (7.37/s) 
>> Total stations: 9634/10000
>> Success: 2007weatherdata.csv written to ./2007_data/
