## Imports

In [1]:
import pandas as pd
import polars as pl
import plotnine as pn
import dask.dataframe as dd
from dask_ml.xgboost import XGBRegressor
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
from dask_ml.metrics import mean_squared_error



In [3]:
from dask.distributed import Client
client = Client()

## Data 1

In [4]:
files = ['./data/{0}.gz.parquet'.format(x) for x in range(2015,2023)]
df = dd.read_parquet(files)

In [5]:
df = df[(df['Vehicle Year'] >= 1970) & (df['Vehicle Year'] <= 2022)]

X = df[[
  'Vehicle Year', 
  'cafe_count',
  'Closest_school_dist',
]]

y = df['Violation Price']

In [6]:
X_train, X_test, y_train, y_test = train_test_split(
  X.to_dask_array(lengths=True), 
  y.to_dask_array(lengths=True)
)
X_train

Unnamed: 0,Array,Chunk
Bytes,236.10 MiB,34.43 MiB
Shape,"(10315521, 3)","(1504087, 3)"
Count,56 Tasks,8 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 236.10 MiB 34.43 MiB Shape (10315521, 3) (1504087, 3) Count 56 Tasks 8 Chunks Type float64 numpy.ndarray",3  10315521,

Unnamed: 0,Array,Chunk
Bytes,236.10 MiB,34.43 MiB
Shape,"(10315521, 3)","(1504087, 3)"
Count,56 Tasks,8 Chunks
Type,float64,numpy.ndarray


## First model (Vehicle year, cafe count, closest school -> Violation Price)

In [7]:
baseline = LinearRegression()
baseline.fit(X_train[:, []], y_train)

In [8]:
y_baseline = baseline.predict(X_test[:, []])
mse_baseline = mean_squared_error(y_test, y_baseline)
mse_baseline

1040.199571078144

In [9]:
lr = LinearRegression()
lr.fit(X_train, y_train)

In [10]:
y_predict = lr.predict(X_test).compute()
mse = mean_squared_error(y_test, y_predict)
mse

1038.2641562161646

## Data 2

In [11]:
files = ['./data/{0}.gz.parquet'.format(x) for x in range(2015,2023)]
df = dd.read_parquet(files)

In [12]:
data = df[[
    'temp',
    'humidity',
    'snowdepth',
    'Violation Price'
]]

In [13]:
train, test = data.random_split([0.8, 0.2])

In [14]:
train_labels = train['Violation Price']
test_labels = test['Violation Price']

In [15]:
del train['Violation Price']
del test['Violation Price']

## Second model (Temperature, humidity, snow depth -> Violation Price above or below 72)

In [16]:
est = XGBRegressor(max_depth=30, learning_rate=0.05, n_estimators=300)
#max_depth=3, learning_rate=0.1, n_estimators=100, verbosity=1, silent=None, objective='reg:linear', booster='gbtree', n_jobs=1, nthread=None, gamma=0, min_child_weight=1, max_delta_step=0, subsample=1, colsample_bytree=1, colsample_bylevel=1, colsample_bynode=1, reg_alpha=0, reg_lambda=1, scale_pos_weight=1, base_score=0.5, random_state=0, seed=None, missing=None, importance_type='gain', **kwargs
est.fit(train, train_labels)

Exception in thread Thread-9:
Traceback (most recent call last):
  File "/cvmfs/sling.si/modules/el7/software/Anaconda3/2021.11/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/cvmfs/sling.si/modules/el7/software/Anaconda3/2021.11/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/d/hpc/home/jg0665/.local/lib/python3.9/site-packages/dask_xgboost/tracker.py", line 365, in join
    while self.thread.isAlive():
AttributeError: 'Thread' object has no attribute 'isAlive'



[19:13:04] Tree method is automatically selected to be 'approx' for distributed training.
[19:13:04] Tree method is automatically selected to be 'approx' for distributed training.
[19:13:04] Tree method is automatically selected to be 'approx' for distributed training.



In [17]:
prediction = est.predict(test)

In [18]:
rmse_test = mean_squared_error(test_labels.to_dask_array(), prediction)



In [19]:
rmse_test

1123.0348900618067