# Scaling up Data Science Workflow/Pipeline with Ray Environment Through NOAA-GHCN Dataset

In this tutorial, we will be using primarily NumS, Modin, Sklearn, and Ray to show how we can **scale** dataframe manipulation and classical machine learning algorithms to analyze a big dataset. 

Once the autoscaler script has finished running and initialzing, we can start running cells to demonstrate the scalability. We'll explain the scalability as well as the high interpretability we can gain from classical machine learning models.

We will split up the notebook into 5 main sections:
* Setting Up and Loading Datasets with Modin and S3
* Dataframe Manipulation and Cleaning with Modin
* Visualizations and Understanding the Data
* Modeling Weather by Space within Ray Environment (NumS, XGBoost, Sklearn)
* Modeling Weather by Time with Time Series (NumS)

## Setting Up and Loading Datasets with Modin and S3
Here are the imports for our libraries. The main imports are NumS, Modin, Sklearn, and Ray. There are also additional libraries such as NumPy and Pandas to compare performance as well as to support other libraries aren't compatibile with NumS and Modin. We will have to use intermediate calls to transform them into the correct type/data structure (e.g. Sklearn only accepts NumPy arrays).

The most important thing after setting up Ray is to make sure that the Ray Python package can also recognize and connect to the cluster setup if there is one. Uncommenting the second line will enable single node setup, while uncommenting the third line will enable cluster setup. Additionally, the cell below where NumS cluster shape is configured will also confirm whether or not there are head and worker nodes.

In [1]:
import ray
import os
ray.init(ignore_reinit_error=True, num_cpus=32, _temp_dir="/home/brian/external/aws-asdi/ray_temp"); # ray.init() config for single node setup
#ray.init(ignore_reinit_error=True, address="auto", _redis_password='5241590000000000'); # ray.init() config for cluster setup
import modin.pandas as pd
import pandas
from nums import numpy as nps
from nums.core import settings
from nums.experimental import nums_modin
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import plotly.express as px
import plotly.graph_objects as go
import os
import warnings
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import gzip
import sys
from io import StringIO
#warnings.filterwarnings("ignore")

2021-09-04 08:58:54,430	INFO services.py:1265 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [2]:
# NumS settings
if len(ray.nodes()) > 1:
    settings.cluster_shape = (len(ray.nodes())-1, 1)
settings.cluster_shape

(1, 1)

Next, we will load some datasets to give an idea how fast Modin compared to Pandas. Not only will we be comparing on a single node, we will also show how much more speed you can achieve at scale in Modin.

In [3]:
%%time
climate_2020 = pd.read_csv('data/2020.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
climate_2020["YEAR/MONTH/DAY"] = pd.to_datetime(climate_2020["YEAR/MONTH/DAY"], format="%Y%m%d")
del climate_2020 # delete from memory

CPU times: user 262 ms, sys: 47 ms, total: 309 ms
Wall time: 1.79 s


In [4]:
%%time
climate_2020 = pandas.read_csv('data/2020.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
climate_2020["YEAR/MONTH/DAY"] = pd.to_datetime(climate_2020["YEAR/MONTH/DAY"], format="%Y%m%d")
del climate_2020 # delete from memory

CPU times: user 14.7 s, sys: 3.06 s, total: 17.8 s
Wall time: 17.2 s


Additionally, we can also directly download from S3 bucket links. But given the connection speeds, performance may be inconsistent. It is reccomended to be near `us-east-1` server, as the public S3 bucket is located there.

In [5]:
%%time
climate_2020 = pd.read_csv('s3://noaa-ghcn-pds/csv/2020.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
climate_2020["YEAR/MONTH/DAY"] = pd.to_datetime(climate_2020["YEAR/MONTH/DAY"], format="%Y%m%d")
del climate_2020 # delete from memory

CPU times: user 2.1 s, sys: 447 ms, total: 2.55 s
Wall time: 15.2 s


In [6]:
%%time
climate_2020 = pandas.read_csv('s3://noaa-ghcn-pds/csv/2020.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
climate_2020["YEAR/MONTH/DAY"] = pd.to_datetime(climate_2020["YEAR/MONTH/DAY"], format="%Y%m%d")
del climate_2020 # delete from memory

CPU times: user 22.5 s, sys: 3.58 s, total: 26.1 s
Wall time: 1min 1s


## Dataframe Manipulation and Cleaning with Modin
Next, we will show how we can do dataframe manipulation to understand the data we are dealing with. We have downloaded and loaded onto memory the NOAA Global Historical Climatology Network Daily (GHCN-D) dataset which containes over 200+ years worth of historical weather data. Data consists of `csv` files dating back to 1763, majority of logs containing temperature maximum, temperature minimum, precipitation, and snowfall. We can first start off by showing the inventory log of each station by reading `noaa-ghcn-pds/ghcnd-inventory.txt` directly from S3, showing the years of data as well as the element of weather a station has recorded.

Sources:
* [Data](https://registry.opendata.aws/noaa-ghcn/)
* [Documentation](https://registry.opendata.aws/noaa-ghcn/)

In [7]:
inventory = pd.read_fwf('s3://noaa-ghcn-pds/ghcnd-inventory.txt', widths=[12, 9, 10, 4, 5, 5], header=None, names=["ID", "LATITUDE", "LONGITUDE", "ELEMENT", "FIRSTYEAR", "LASTYEAR"])
inventory

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEMENT,FIRSTYEAR,LASTYEAR
0,ACW00011604,17.1167,-61.7833,TMAX,1949,1949
1,ACW00011604,17.1167,-61.7833,TMIN,1949,1949
2,ACW00011604,17.1167,-61.7833,PRCP,1949,1949
3,ACW00011604,17.1167,-61.7833,SNOW,1949,1949
4,ACW00011604,17.1167,-61.7833,SNWD,1949,1949
...,...,...,...,...,...,...
705035,ZI000067983,-20.2000,32.6160,PRCP,1951,2020
705036,ZI000067983,-20.2000,32.6160,TAVG,1962,2020
705037,ZI000067991,-22.2170,30.0000,TMAX,1951,1990
705038,ZI000067991,-22.2170,30.0000,TMIN,1951,1990


We can also read `noaa-ghcn-pds/ghcnd-stations.txt` to output the stations to get more metadata such as elevation and the name of the station.

In [8]:
stations = pd.read_fwf('s3://noaa-ghcn-pds/ghcnd-stations.txt', widths=[12, 9, 10, 7, 3, 31, 4, 4, 6], header=None, names=["ID", "LATITUDE", "LONGITUDE", "ELEVATION", "STATE", "NAME", "GSN FLAG", "HCN/CRN FLAG", "WMO ID"])
stations



Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN FLAG,HCN/CRN FLAG,WMO ID
0,ACW00011604,17.1167,-61.7833,10.1,,ST JOHNS COOLIDGE FLD,,,
1,ACW00011647,17.1333,-61.7833,19.2,,ST JOHNS,,,
2,AE000041196,25.3330,55.5170,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0
3,AEM00041194,25.2550,55.3640,10.4,,DUBAI INTL,,,41194.0
4,AEM00041217,24.4330,54.6510,26.8,,ABU DHABI INTL,,,41217.0
...,...,...,...,...,...,...,...,...,...
118488,ZI000067969,-21.0500,29.3670,861.0,,WEST NICHOLSON,,,67969.0
118489,ZI000067975,-20.0670,30.8670,1095.0,,MASVINGO,,,67975.0
118490,ZI000067977,-21.0170,31.5830,430.0,,BUFFALO RANGE,,,67977.0
118491,ZI000067983,-20.2000,32.6160,1132.0,,CHIPINGE,GSN,,67983.0


Additionally, the ID per each station encodes its country in `noaa-ghcn-pds/ghcnd-countries.txt`. This will be useful later on if we want to do an analysis of data per country.

In [9]:
country_codes = pd.read_csv("s3://noaa-ghcn-pds/ghcnd-countries.txt", delimiter="\n", header=None)[0].str.extract('(?P<code>.{2})(?P<country>.{0,})')
country_codes

To request implementation, send an email to feature_requests@modin.org.


Unnamed: 0,code,country
0,AC,Antigua and Barbuda
1,AE,United Arab Emirates
2,AF,Afghanistan
3,AG,Algeria
4,AJ,Azerbaijan
...,...,...
214,WI,Western Sahara
215,WQ,Wake Island [United States]
216,WZ,Swaziland
217,ZA,Zambia


In [10]:
def df_loader(year, local=False):
    if local:
        df = pd.read_csv('data/' + str(year) + '.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
    else:
        df = pd.read_csv('s3://noaa-ghcn-pds/csv/' + str(year) + '.csv', header=None, names=["ID", "YEAR/MONTH/DAY", "ELEMENT", "DATA VALUE", "M-FLAG", "Q-FLAG", "S-FLAG", "OBS-TIME"], quoting=3)
    df["YEAR/MONTH/DAY"] = pd.to_datetime(df["YEAR/MONTH/DAY"], format="%Y%m%d")
    return df

def df_filter(df, _id, element):
    return df.loc[(df["ID"] == _id) & (df["ELEMENT"] == element)][["YEAR/MONTH/DAY", "DATA VALUE"]].set_index("YEAR/MONTH/DAY").sort_index()

# Same as df_filter(), but a vector of ALL the data
def df_filter_vector(_id, element, local=False, custom_years=None):
    df_vector = pd.DataFrame(columns=["DATA VALUE"])
    if custom_years:
        years = range(custom_years[0], custom_years[1] + 1)
    for year in tqdm(years):
        if local:
            df = df_filter(dfs[year], _id, element)
        else:
            df = df_filter(df_loader(year), _id, element)
            
        if df_vector.empty:
            df_vector = df
        else:
            df_vector = df_vector.append(df)
    return df_vector

def rmse(actual, expected):
    """
    Computes the root mean squared error to evaluate models/predictions. It can accept all the datatypes used
    in this notebook
    """
    if type(actual) != type(expected):
        raise TypeError("actual and expected must be the same types")
    if type(actual) == np.ndarray:
        return np.sqrt(np.mean((expected - actual) ** 2))
    elif type(actual) == type(nps.array([])):
        return nps.sqrt(nps.mean((expected - actual) ** 2)).get()
    elif type(actual) == modin.pandas.dataframe.DataFrame or type(actual) == pandas.core.frame.DataFrame:
        raise NotImplementedError
    else:
        raise TypeError
        
def design_matrix(years, elements, target=None, convert_nps=False, local=False):
    """
    Set target to your "y" predictor. If y has NaNs or missing values, we will drop the data row.
    """
    df_design = pd.DataFrame()
    
    for year in tqdm(years):
        if local:
            df = dfs[year]
        else:
            df = df_loader(year)
            

        if target[0] not in df["ELEMENT"].unique():
            continue

        df = df[df['ELEMENT'].isin(elements)]
        df = pd.pivot_table(df, index=["ID", "YEAR/MONTH/DAY"], columns="ELEMENT", values="DATA VALUE").reset_index(level=[0,1])
        df = df.merge(stations[["ID", "LATITUDE", "LONGITUDE", "ELEVATION"]], how='inner', on='ID')
        
        if target:
            df = df.dropna(subset=target)
        df = df.dropna() #TODO add a flag to toggle this
        
        
        df["YEAR/MONTH/DAY"] = df["YEAR/MONTH/DAY"].apply(lambda x: pd.Period(x, freq='D').day_of_year)
        df["TMAX"] = df["TMAX"] / 10
        df["TMIN"] = df["TMIN"] / 10
        df["AVG"] = (df["TMAX"] + df["TMIN"]) / 2
        df["RANGE"] = df["TMAX"] - df["TMIN"]
        #df = df.drop(["ID"], axis=1)
        #df = df.astype(float)
        
        if df_design.empty:
            df_design = df
        else:
            df_design = df_design.append(df)
        
        
    
    if convert_nps:
        return nps.array(result.to_numpy().astype(np.double))
    return df_design

def design_matrix_time_series_stack(_id, element, years, convert_nps=True, local=False):
    """
    Inputs are station ID and element
    Output is a design matrix of time series per year stacked on top of each other.
    rows are year of data collected, 
    cols are day of the year
    
    returns NumS array or Pandas DataFrame
    """
    df_design = pd.DataFrame(columns=pd.date_range(start="2020-01-01", end="2020-12-31").strftime('%m-%d'))
    station_name = stations.loc[stations['ID'] == _id]["NAME"].item()
    
    for year in tqdm(years):
        if local:
            df = dfs[year]
        else:
            try:
                df = df_loader(year)
            except ClientError:
                tqdm.write(str(year) + ".csv doesn't exist on remote, addition to design matrix is skipped.")
                continue
            
        df = df_filter(df, _id, element)
        if df.empty:
            tqdm.write(element + " data on " + str(year) + " for " + station_name + " with id: " + _id + " is empty. Addition to design matrix is skipped.")
            continue
        df.index = df.index.strftime('%m-%d')
        df.columns = [year]
        df = df.T
        df_design = df_design.append(df)


    df_design.index.name = None
    if convert_nps:
        #return nums_modin.from_modin(df_design) # Experimental version has some bugs, use manual conversion for now
        return nps.array(df_design.to_numpy().astype(np.double))
    return df_design

In [11]:
# Global variables
elements = ["PRCP", "SNOW", "SNWD", "TMAX", "TMIN"]
all_elements = list(inventory["ELEMENT"].unique())
years = list(range(1763, 2022))
local = True

Loading all the dataframes from local storage to memory gives us these times for a single node:

**modin**:
```
CPU times: user 35.5 s, sys: 7.8 s, total: 43.3 s
Wall time: 5min 20s
```

**pandas**
```
CPU times: user 24min 22s, sys: 5min 39s, total: 30min 2s
Wall time: 29min 32s
```

As we can see, Modin gives us ~6x speedup, which is impressive considering that ~100GB is being transferred and loaded onto memory.

In the Ray cluster setup, the speed up is even more with ~15x speedup (1 head, 4 workers):
```
CPU times: user 2min 50s, sys: 42.3 s, total: 3min 33s
Wall time: 1min 59s
```

In [None]:
%%time
dfs = [pd.DataFrame() for _ in range(2022)]
for year in tqdm(years):
    dfs[year] = df_loader(year, local=local)



  0%|          | 0/259 [00:00<?, ?it/s]

Now that we have quickly loaded our dataframes, we can further our analysis of the data through Pandas API calls through Modin. Modin is able to abstract away all the workings between distributed memory and communication between worker nodes to give us high performance in dataframe manipulation. In the next cell, we can plot a dataframe that stores the countries in columns and the year in the rows. Each cell contains the number of data points each country has collected in it's weather stations (regardless of what element it is).

In [None]:
station_data_freq = pd.DataFrame()
years.reverse() # Reverse, so NaNs can be filled in to missing/old data

for year in tqdm(years):
    station_data_freq[year] = dfs[year].groupby(dfs[year]["ID"].str.slice(stop=2))["DATA VALUE"].sum()

years.reverse()
station_data_freq = station_data_freq.fillna(0).T
station_data_freq

In [None]:
country_codes.iterrows()

We can also use this dataframe to plot out the growth of data points over the years. As earth becomes more civilized and technologically advanced, there is more data created and recorded. We see this observation here, especially for US.

In [None]:
for index, row in tqdm(country_codes.iterrows()): 
    if row[0] in station_data_freq.T.columns:
        fig = plt.figure(figsize=(10, 10))
        ax = station_data_freq.T[row[0]].plot();
        ax.set_xlabel("Years")
        ax.set_ylabel("Number of Data Points")

## Visualizations and Understanding the Data
In addition to fast and scalable dataframe manipulation and wrangling, we can also use Matplotlib to seamlessly plot directly from Modin dataframes. The benefit of plotting and using all the data points rather than sampling is that we can see more detail and have definitive view of our data.
We can also do a yearly global analysis of how many data points has been collected. Already, we can see near the 1950s is when weather data collection rapidly increased. The only downside to this visualization is that Matplotlib is not scalable and will run on the head node.

In [None]:
sample_points = []
for year in tqdm(years):
    sample_points.append(len(dfs[year].index))

In [None]:
plt.figure(figsize=(9, 3))
plt.plot(years, sample_points);
plt.title("Number of Data Points Collected Per Year")
plt.ticklabel_format(style='plain')
plt.xlabel("Year")
plt.ylabel("Number of Data Points");

We can also observe what type or element of data has been recorded using simple dataframe calls. Here we plot the top 25 most popular ones, most popular being the precipitation.

In [None]:
inventory["ELEMENT"].value_counts().head(25).plot(kind="bar");
plt.title("Top Weather Data Type Collected")
plt.xlabel("Type")
plt.ylabel("Count")

We can also use the longitude and latitude provided to show the spatial frequency of where data is being collected, which reveals a very similar outline of our earth!

We see that more developed countries and cities have higher spatial frequency of weather stations. It's also interesting to observe that inhabitable areas of land have lower or no weather stations. Some developed countries also have less scattered weather stations, such as China.

In [None]:
plt.figure(figsize=(20, 10))
plt.title("Scatterplot of Weather Stations")
plt.xlabel("Longitude")
plt.xlabel("Latitude")
plt.scatter(x=stations['LONGITUDE'], y=stations['LATITUDE'], s=0.9, alpha=0.7);

Although Matplotlib gives us a simple API to quickly plot graphs, with Modin seamlessly integrating with it, more advanced plotting libraries such as Plotly can give use better visualizations. We can use the metadata in our dataframes to our advantage and plot highly interpretable and visual plots on top of geographical maps provided by OpenStreetMap. First let's replicate the simple spatial plot we did in Matplotlib to Plotly on an overlay of the map.

Due to data constraints, we will save it to a file called `station_density.html`. Again, Plotly is also not scalable, so it'll run on the head node. We wish to see more scalable visualization tools for data science someday!

In [None]:
fig = px.density_mapbox(stations._to_pandas(), lat='LATITUDE', lon='LONGITUDE', radius=5,
                        center=dict(lat=0, lon=180), zoom=0,
                        mapbox_style="stamen-terrain")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.write_html("station_density.html")

In [None]:
df = design_matrix([2010], ["TMAX", "TMIN"], target=["TMAX"], local=True)
df["TMAX"] = df["TMAX"]
df["TMIN"] = df["TMIN"]
df

## Modeling Weather by Space within Ray Environment (NumS, XGBoost, Sklearn)
After exploratory data analysis, our next step is to answer or find another question to our problem and offer explanation and reasoning behind a solution or answer, primarily through modeling. With NumS, it's possible to do generalized linear modeling (GLM) to give us interpretable predictions and modeling at scale. We can also use Modin's experimental package of XGBoost to directly run XGBoost off of a Modin dataframe, a popular library for gradient boosted decision trees. And lastly, we can use SKlearn's Random Forest model with Ray by replacing the joblib backend with Ray actors.

Before we start, let's define a few problems. We know that inhabitable areas have little to no stations or data recorded. Is it possible to "learn" the features of the earth's geographical location and time to predict weather? 

Let's first create a design matrix. Here, we will make a design matrix that one hot encodes the day to the day of the year, precipitation, temperature maximum, temperature minimum, temperature average, temperature range, latitude, longitude, and elevation.

In [None]:
%%time
test_elements = ["PRCP", "TMAX", "TMIN"]
data = design_matrix(years[-10:], test_elements, target=["PRCP"], convert_nps=False, local=local)
data = data[data["PRCP"] >= 0]
data

Using our previous techniques of EDA, we can also plot the graph. Let's take a day such as Januaary 1st, 2020 and plot the precipitation onto our map.

In [None]:
plot = design_matrix([2020], test_elements, target=["PRCP"], convert_nps=False, local=local)
plot = plot[plot["PRCP"] >= 0]
plot = plot[(plot["YEAR/MONTH/DAY"] == 1) & (plot["PRCP"] > 0)]
fig = go.Figure(go.Densitymapbox(lat=plot.LATITUDE, lon=plot.LONGITUDE, z=plot.PRCP,
                                 radius=10))
fig.update_layout(mapbox_style="stamen-terrain", mapbox_center_lon=180)
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.write_html("prcp_dmatrix_density.html")

Next, we can perform a train-test split on the data. Because the data is so big, and there are multiple logs of repetitive data, doing a train-test split on raw data logs might take too long. Instead, we will train-test split based on station ID. This allows us to hold out a few stations for training, and allows us to do cross-validation to evaluate whether our model has "learned" the weather patterns of other stations from spatial and temporal locality.

In [None]:
ids = np.random.permutation(data["ID"].unique())
split = int(ids.shape[0] * .8)
train_ids = ids[:split]
test_ids = ids[split:]

In [None]:
train = data[data["ID"].isin(train_ids)]
test = data[data["ID"].isin(test_ids)]
#'TMAX', 'TMIN','AVG', 'RANGE',
X_train = train[['YEAR/MONTH/DAY', 'TMAX', 'TMIN','AVG', 'RANGE', 'LATITUDE', 'LONGITUDE', 'ELEVATION']].to_numpy().astype(np.double)
y_train = train['PRCP'].to_numpy().astype(np.double)
X_test = test[['YEAR/MONTH/DAY', 'TMAX', 'TMIN','AVG', 'RANGE', 'LATITUDE', 'LONGITUDE', 'ELEVATION']].to_numpy().astype(np.double)
y_test = test['PRCP'].to_numpy().astype(np.double)

# NumS arrays must maintain block shapes, code below ensures they are shaped correctly.
X_train = nps.array(X_train)
y_train = nps.array(y_train)
X_test = nps.array(X_test)
y_test = nps.array(y_test)
y_train = y_train.reshape(block_shape=(X_train.block_shape[0],))
y_test = y_test.reshape(block_shape=(X_test.block_shape[0],))

We can start off with something simple, such as Linear Regression to see if any of the features have a linear relationship. NumS provides a Sklearn-ish API through the `nums.models` package.

In [None]:
from nums.models.glms import LinearRegression

model = LinearRegression()
model.fit(X_train, y_train)

In [None]:
training_results = model.predict(X_train).get()
test_results = model.predict(X_test).get()

print("Training RMSE:", rmse(training_results, y_train.get()))
print("Testing RMSE", rmse(test_results, y_test.get()))

As we can see from the RMSE values, they do not look promising. We can also use Logistic Regression to model the precipitation in our earth. First, it'll require us to change our prediciton values to prediction labels for binary classification.

In [None]:
# convert y_train and y_test to binary labels
y_train_block_shape = y_train.block_shape
y_train = y_train.get()
y_train[y_train > 0] = 1.0
y_train = nps.array(y_train)
y_train = y_train.reshape(block_shape=y_train_block_shape)


y_test_block_shape = y_test.block_shape
y_test = y_test.get()
y_test[y_test > 0] = 1.0
y_test = nps.array(y_test)
y_test = y_test.reshape(block_shape=y_test_block_shape)

We will also see that if our matrix is singular. As a fix, we will add a small mask of floating point numbers to make it invertible without drastically changing the numbers of our design matrix.

In [None]:
from nums.models.glms import LogisticRegression

# since matrix could be singular, a mask is added on to X_train to help invert it
mask = nps.random.rand(X_train.shape[0], X_train.shape[1]) * 0.00001

model = LogisticRegression()
model.fit(X_train + mask, y_train)

We get some values. 

In [None]:
training_results = model.predict(X_train).get()
test_results = model.predict(X_test).get()

print("Training Accuracy:", np.sum(training_results == y_train.get()) / y_train.shape[0])
print("Test Accuracy:", np.sum(test_results == y_test.get()) / y_test.shape[0])

Another highly interpretable model we can use is decision trees. But the problem with that is that it fails to perform at scale. It's a recursive tree algorithm, which are not embarassingly parallel. But we can use boosting methods to parallelize it, such as XGBoost.

In [None]:
import modin.experimental.xgboost as xgb

dtrain = xgb.DMatrix(pd.DataFrame(X_train.get()), pd.DataFrame(y_train.get()))
dtest = xgb.DMatrix(pd.DataFrame(X_test.get()), pd.DataFrame(y_test.get()))

In [None]:
xgb_params = {
    "eta": 0.3,
    "max_depth": 3,
    "objective": "multi:softprob",
    "num_class": 2,
    "eval_metric": "mlogloss",
}
steps = 20

# Create dict for evaluation results
evals_result = dict()

# Run training
model = xgb.train(
    xgb_params,
    dtrain,
    steps,
    evals=[(dtrain, "train")],
    evals_result=evals_result
)

# Print evaluation results
print(f'Evals results:\n{evals_result}')

# Predict results
prediction = model.predict(dtest)

# Print prediction results
print(f'Prediction results:\n{prediction}')

In [None]:
y_result = prediction[1].to_numpy()
y_result[y_result >= 0.5] = 1
y_result[y_result < 0.5] = 0
np.sum(y_result == y_test.get()) / y_result.shape[0]

We can also perform random forests, another embarassingly parallel algorithm that can utilize running decision trees per Ray Actors across a cluster.

In [None]:
from sklearn.ensemble import RandomForestClassifier
import joblib
from ray.util.joblib import register_ray
register_ray()

In [None]:


with joblib.parallel_backend('ray'):
    model = RandomForestClassifier(n_jobs=32)


In [None]:
model.fit(X_train.get(), y_train.get())

In [None]:
with joblib.parallel_backend('ray'):
    print("Training Accuracy:", model.score(X_train.get(), y_train.get()))
    print("Test Accuracy:", model.score(X_test.get(), y_test.get()))

Recall the question we asked earlier, if we can "learn" weather patterns as well, especially in places where it is inhabitable and not much weather stations availiable. Could we generalize these sparse areas of data? The features of our previous design matrix involved day of year, precipitation, temperature maximum, temperature minimum, temperature average, temperature range, latitude, longitude, and elevation. But looking at new/unknown locations limits the amount of features we can deal with. As a starting point, we can create a meshgrid of longitudes and latitudes. Then train and predict precipitation based purely by location and day of year. We lose some motivating features such as temperature and elevation. Given terrain data for elevation, we could easily merge it via Modin and maybe get better preicitons.

In [None]:
nps.unique(y_test)

## Modeling Weather by Time with Time Series (NumS)
Next, we can model and predict our data to figure out what it has to do with time. Weather often has seasonal patterns. Let's focus in on a specific weather station for this tutorial, which will be station USC00040693. This weather station is located on UC Berkeley campus [(Google Maps)](https://www.google.com/maps/place/37°52'27.8%22N+122°15'38.2%22W/@37.8744024,-122.2618614,17z/data=!3m1!4b1!4m5!3m4!1s0x0:0x0!8m2!3d37.8744!4d-122.2606). Through Modin, we can quickly parse out what type of data this weather station records. Let's revisit our exploratory data analysis skills to unreveal what the data is able to provide us in terms of time.

In [None]:
inventory[inventory["ID"] == 'USC00040693']

Data will vary a lot between stations, some stations may record as little as just precipitation. Coincidentally, this weather station is located on UC Berkeley campus and has been recording data of various elements for quite a while. The more interesting data values recorded in this station other than temperature and precipitation are the WT** data types. According to the [documentation](https://docs.opendata.aws/noaa-ghcn-pds/readme.html) they are mapped to:

* **01 = Fog, ice fog, or freezing fog (may include heavy fog)**
* 02 = Heavy fog or heaving freezing fog (not always distinguished from fog)
* **03 = Thunder**
* **04 = Ice pellets, sleet, snow pellets, or small hail**
* **05 = Hail (may include small hail)**
* 06 = Glaze or rime
* 07 = Dust, volcanic ash, blowing dust, blowing sand, or blowing obstruction
* **08 = Smoke or haze**
* 09 = Blowing or drifting snow
* 10 = Tornado, waterspout, or funnel cloud
* **11 = High or damaging winds**
* 12 = Blowing spray
* 13 = Mist
* **14 = Drizzle**
* 15 = Freezing drizzl
* 16 = **Rain (may include freezing rain, drizzle, and freezing drizzle)**
* 17 = Freezing rain
* 18 = Snow, snow pellets, snow grains, or ice crystals
* 19 = Unknown source of precipitation
* 21 = Ground fog
* 22 = Ice fog or freezing fog

Unfortunately, plotting these special elements out shows us there is nothing interesting going on.

In [None]:
berkeley_weather_elements = inventory[inventory["ID"] == 'USC00040693'][["ELEMENT", "FIRSTYEAR", "LASTYEAR"]]
berkeley_weather_elements = berkeley_weather_elements[berkeley_weather_elements["ELEMENT"].isin(["TMAX", "TMIN", "PRCP"])] #override to save time
berkeley_time_series = {}

for _, rows in berkeley_weather_elements.iterrows():
    element, firstyear, lastyear = rows
    berkeley_time_series[element] = df_filter_vector('USC00040693', element, local=local, custom_years=(lastyear-20, lastyear - 1)) #only grabbing the last 20 years for effiency

We can plot the data in the last 3 years (Plotting all of the data will take a long time due to the bottleneck of Matplotlib's serial performance).

In [None]:
berkeley_time_series["TMAX"].tail(365 * 3).plot();

In [None]:
berkeley_time_series["TMIN"].tail(365 * 3).plot();

In [None]:
berkeley_time_series["PRCP"].tail(365 * 3).plot();

We can start modeling with simple linear regression. Although it wont give us the most accurate model and predictions due to weather data being seasonal and sinusoidal, it is at least a good starting point and baseline for other model's performance and accuracy.

In [None]:
from nums.models.glms import LinearRegression
y_train = berkeley_time_series["TMIN"].iloc[-4000:-1000].to_numpy().reshape(-1) 
y_test = berkeley_time_series["TMIN"].iloc[-1000:].to_numpy().reshape(-1) 

#convert to NumS arrays, assigning to y since we want X to be time, y to be TMIN (data value)
y_train = nps.array(y_train)
y_test = nps.array(y_test)

In [None]:
X_train = nps.arange(0, y_train.shape[0], 1).reshape(-1, 1).astype(np.double)
X_test = nps.arange(y_train.shape[0], y_train.shape[0] + y_test.shape[0], 1).reshape(-1, 1).astype(np.double)

In [None]:
plt.plot(X_train.get(), y_train.get());

In [None]:
model = LinearRegression()
model.fit(X_train, y_train)

print("Training RMSE:", rmse(y_train, model.predict(X_train)))
print("Test RMSE:", rmse(y_test, model.predict(X_test)))

Model also reveals to use that there is only a slight positive trend in data with the slope obtained from, indicating that there is indeed climate change.

In [None]:
model._beta.get()

We see that temperature has a sinusoudal pattern. Before doing more modeling, let's stack all the years on top of each other to get a design matrix of temperature data where the rows are year, columns are day of year.

In [None]:
berkeley_tmax_design_matrix = design_matrix_time_series_stack('USC00040693', 'TMAX', years, local=local)

In [None]:
for i in range(berkeley_tmax_design_matrix.shape[0]):
    plt.plot(berkeley_tmax_design_matrix[i].get());

With this design matrix, we can run Principal Component Analysis (PCA) with NumS's SVD solver to observe abnormalities in the data. Ideally, it should be reducable to one dimension since all the rows should be nearly identical, with some variance, as shown in the scree plot.

In [None]:
from nums.core import linalg
from nums.core import apaplication_manager
nps_app_inst = application_manager.instance()

U, S, V = linalg.svd(nps_app_inst, nps.nan_to_num(berkeley_tmax_design_matrix.T))

In [None]:
plt.plot(S[:25].get())

But when plotting the projection vectors, we observe something different. We see that near day 50, there is a sharp spike. This is usually the time that leap days occur, indicating that there is missing data or inconsistesies. We will see how this affects us later. (***TODO: fix after time series model are done***)

In [None]:
for i in range(25):
    plt.plot(U.T[i, :])

In [None]:
# TODO: (Daniel) Factor in autoregressive models once done.

## End
To stop and shutdown the notebook properly, we can free memory and shutdown Ray.

In [None]:
del dfs

In [None]:
!ray stop