In [1]:
!pip install pyarrow
!pip install fastcore
!pip install fastprogress

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [2]:
import pandas as pd

def choose_stations():
    first_year_data = "https://noaa-ghcn-pds.s3.amazonaws.com/csv/1950.csv"
    second_year_data = "https://noaa-ghcn-pds.s3.amazonaws.com/csv/2019.csv"

    first_df = pd.read_csv(first_year_data, header=None, usecols=[0], names=['station_id'])
    second_df = pd.read_csv(second_year_data, header=None, usecols=[0], names=['station_id'])

    stations = pd.read_fwf('http://noaa-ghcn-pds.s3.amazonaws.com/ghcnd-stations.txt',
                           colspecs=[(0, 11), (12, 20), (21, 30), (31, 37), (38, 40), (41, 71), (72, 75), (76, 79), (80, 85)],
                           header=None)
    stations.columns=['id', 'lat', 'lon', 'elevation', 'state', 'name', 'gsn_flag', 'hcn', 'wmo_id']

    persistent_stations = set(first_df.station_id).intersection(second_df.station_id)
    intl_airport_stations = set(stations[stations.name.str.contains('INTL')].id)
    southern_stations = set(stations[stations.lat<-55].sample(10).id)
    mountain_stations = set(stations[stations.elevation>3000].id)
    stations_to_keep  = persistent_stations.union(intl_airport_stations).union(southern_stations).union(mountain_stations)
    potential_keeps = stations[stations.id.apply(lambda s: s in stations_to_keep)]
    potential_keeps.loc['rounded_lat', :] = potential_keeps.lat.round(1)
    potential_keeps.loc['rounded_lon', :] = potential_keeps.lon.round(1)
    keeps = potential_keeps[['id', 'lat', 'lon', 'elevation', 'state', 'name', 'gsn_flag', 'wmo_id']]
    return keeps

stations_to_keep = choose_stations()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [3]:
def read_data_file(year):
    url = f"https://noaa-ghcn-pds.s3.amazonaws.com/csv/{year}.csv"
    df = pd.read_csv(url, 
                     header=None,
                     usecols=[0, 1, 2, 3, 4],
                     names=['station_id', 'date', 'measurement', 'value', 'measurement_flag'])
    df = df[df.station_id.isin(stations_to_keep.id)]
    df.set_index(['station_id', 'date'], inplace=True)
    
    max_temps = df.query('measurement == "TMAX"').value
    min_temps = df.query('measurement == "TMIN"').value
    precip = df.query('measurement == "PRCP"').value
    snow_depth = df.query('measurement == "SNWD"').value
    snowfall = df.query('measurement == "SNOW"').value
    out = pd.DataFrame({'max_temp_c': max_temps/10,
                   'min_temp_c': min_temps/10,
                   'precip_mm': precip,
                   'snow_depth_mm': snow_depth,
                   'snowfall': snowfall
                    })
    out.dropna(subset=['max_temp_c', 'min_temp_c'], inplace=True)
    return out

In [10]:
from fastcore.parallel import parallel
from time import time
a=time()
big_df = pd.concat(parallel(f=read_data_file,
                         items=range(1900, 2020),
                         n_workers=20,
                         progress=True)).reset_index()
b=time()
print(f"Data munging time was {int(b-a)} seconds")
big_df.to_parquet('weather_1900_to_2020.parquet', index=False)
big_df.to_csv('weather_1900_to_2020.csv', index=False)

Data munging time was 799 seconds


In [11]:
import sagemaker

s3_path_to_parquet_data = sagemaker.Session().upload_data(bucket='weather-bucket', 
                                                          path='weather_1900_to_2020.parquet',
                                                          key_prefix='weather_1900_to_2020.parquet')

s3_path_to_parquet_data = sagemaker.Session().upload_data(bucket='weather-bucket', 
                                                          path='weather_1900_to_2020.csv',
                                                          key_prefix='weather_1900_to_2020.csv')

