# RISE Camp Code Snippet

In [None]:
# TODO: Filter out any imports that are not necessary
import ray
ray.init(ignore_reinit_error=True, num_cpus=32, _temp_dir="/home/brian/external/aws-asdi/ray_temp"); # ray.init() config for single node setup
#ray.init(ignore_reinit_error=True, address="auto", _redis_password='5241590000000000'); # ray.init() config for cluster setup
import modin.pandas as pd
from nums import numpy as nps

from nums.core import settings
from nums.core import linalg
from nums.core import application_manager
from nums.experimental.nums_modin import from_modin
nps_app_inst = application_manager.instance()

from tqdm.auto import tqdm
import matplotlib.pyplot as plt;
import plotly.express as px
import plotly.graph_objects as go
import os
import warnings
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import gzip
import sys
warnings.filterwarnings("ignore");

In [None]:
# NumS cluster setting
if len(ray.nodes()) > 1:
    settings.cluster_shape = (len(ray.nodes())-1, 1)
settings.cluster_shape

In [None]:
#s3://noaa-ghcn-pds/
inventory = pd.read_fwf('../ghcnd-inventory.txt', widths=[12, 9, 10, 4, 5, 5], header=None, names=["ID", "LATITUDE", "LONGITUDE", "ELEMENT", "FIRSTYEAR", "LASTYEAR"])
inventory.head()

In [None]:
#s3://noaa-ghcn-pds/
stations = pd.read_fwf('../ghcnd-stations.txt', widths=[12, 9, 10, 7, 3, 31, 4, 4, 6], header=None, names=["ID", "LATITUDE", "LONGITUDE", "ELEVATION", "STATE", "NAME", "GSN FLAG", "HCN/CRN FLAG", "WMO ID"])
stations.head()

In [None]:
# Global variables
elements = ["PRCP", "SNOW", "SNWD", "TMAX", "TMIN"]
years = list(range(1763, 2022))
local = True

Working implementation

In [None]:
def df_loader(year, local=False):
    if local:
        df = pd.read_csv('../data/' + str(year) + '.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
    else:
        df = pd.read_csv('s3://noaa-ghcn-pds/csv/' + str(year) + '.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
    df["YEAR/MONTH/DAY"] = pd.to_datetime(df["YEAR/MONTH/DAY"], format="%Y%m%d")
    return df

In [None]:
def design_matrix(years, elements, target=None, convert_nps=False, local=False):
    """
    Set target to your "y" predictor. If y has NaNs or missing values, we will drop the data row.
    """
    df_design = pd.DataFrame()
    
    for year in tqdm(years):
        if local:
            df = df_loader(year, local=local)
        else:
            df = df_loader(year)
            
        if target[0] not in df["ELEMENT"].unique():
            continue

        df = df[df['ELEMENT'].isin(elements)]
        df = pd.pivot_table(df, index=["ID", "YEAR/MONTH/DAY"], columns="ELEMENT", values="DATA VALUE").reset_index(level=[0,1])
        df = df.merge(stations[["ID", "LATITUDE", "LONGITUDE", "ELEVATION"]], how='inner', on='ID')
        
        if target:
            df = df.dropna(subset=target)
        df = df.dropna()
        
        
        df["YEAR/MONTH/DAY"] = df["YEAR/MONTH/DAY"].apply(lambda x: pd.Period(x, freq='D').day_of_year)
        df["TMAX"] = df["TMAX"] / 10
        df["TMIN"] = df["TMIN"] / 10
        df["TAVG"] = (df["TMAX"] + df["TMIN"]) / 2
        df["TRANGE"] = df["TMAX"] - df["TMIN"]
        
        if df_design.empty:
            df_design = df
        else:
            df_design = df_design.append(df)
    
    if convert_nps:
        return from_modin(result)
    return df_design


In [None]:
df = design_matrix([2020], ['PRCP', 'TMAX', 'TMIN'], target=['PRCP'], local=True)

In [None]:
df = df[['YEAR/MONTH/DAY', 'PRCP', 'TMAX', 'TMIN', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'TAVG', 'TRANGE']].astype(float)
df.head()

In [None]:
from nums import api

In [None]:
df.to_csv("2020.csv", index_label=False)

In [None]:
%%time
import modin.pandas as pd
pd.read_csv("2020.csv")

In [None]:
%%time
api.read_csv("2020.csv", has_header=True)

Notes on data transformation


CSVs by year. Data is scattered as logs -> (with modin) -> data clean to a dataframe of features with merging and adding location as well -> shuffle by station ids to prepare for modelling -> (from_modin) -> convert to NumS in a convenient fasion, split on train test for cross validation. -> run a large scale model of choice (GLM), xgboost...


In [None]:
X = from_modin(df[['YEAR/MONTH/DAY', 'TMAX', 'TMIN', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'TAVG', 'TRANGE']]).astype(float)
y = from_modin(df[['PRCP']]).astype(float).reshape(-1)

In [None]:
split = int(X.shape[0] * 0.8)
X_train = X[:split]
X_test = X[split:]
y_train = y[:split]
y_test = y[split:]

In [None]:
X = nps.random.rand(1000, 1000)
X.touch();

In [None]:
from nums.models.glms import LinearRegression

model = LinearRegression()

In [None]:
%%time
model.fit(X_train, y_train)

In [None]:
%%time
training_results = model.predict(X_train).get()

In [None]:
%%time
test_results = model.predict(X_test).get()

In [None]:
print("Training RMSE:", rmse(training_results, y_train.get()))
print("Testing RMSE", rmse(test_results, y_test.get()))

In [None]:
split = int(X.shape[0] * 0.8)
X_train_np = X_train.get()
X_test_np = X_test.get()
y_train_np = y_train.get()
y_test_np = y_test.get()

In [None]:
from sklearn.linear_model import LinearRegression

model = LinearRegression()

In [None]:
%%time
model.fit(X_train_np, y_train_np)

In [None]:
%%time
training_results = model.predict(X_train_np)

In [None]:
%%time
test_results = model.predict(X_test_np)

In [None]:
print("Training RMSE:", rmse(training_results, y_train_np))
print("Testing RMSE", rmse(test_results, y_test_np))