# Predicción de la temperatura en Morelia Michoacán usando Dask

In [1]:
import os
import glob
import zipfile
import pandas as pd
import time
import re
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask_ml.linear_model import LinearRegression
from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.metrics import mean_absolute_error
#Install dask-ml

In [2]:
Path = "/home/michell21/Datasets/Ruoa_Dask_minuto/"

In [3]:
#client = Client(n_workers=8)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 41521 instead


In [3]:
df = dd.read_csv(Path+'*.csv')

In [4]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 10 entries, TIMESTAMP to Rad_Avg
dtypes: object(1), float64(9)

In [4]:
df["Temp_Avg"].describe().compute()

count    3.225764e+06
mean     1.745081e+01
std      4.972629e+00
min      5.080000e-01
25%      1.571000e+01
50%      1.924000e+01
75%      2.479000e+01
max      3.369000e+01
Name: Temp_Avg, dtype: float64

In [11]:
df["TIMESTAMP"].describe().compute()

unique                3225764
count                 3225764
top       2015-08-18 15:02:00
freq                        1
Name: TIMESTAMP, dtype: object

In [4]:
df = df.dropna(subset=["Temp_Avg"]) #usar Rad_Avg

In [5]:
df = df.fillna(0)

In [6]:
df.npartitions

76

In [7]:
def month(TimeStamp):
    #possible_months = {"01":"January", "02":"February", "03":"March", "04":"April", "05":"May", "06":"June",
     #   "07":"July", "08":"August", "09":"September", "10":"October", "11":"November", "12":"December"}
    date = re.findall(r'-\d{2}',TimeStamp)[0]
    date = date.replace("-", "")
    return int(date)

def day_night(TimeStamp):
    hour = re.findall(r' \d{2}',TimeStamp)[0]
    hour = hour.replace(" ", "")
    hour = int(hour)
    if hour < 12:
        return 0 #AM
    return 1 #PM

In [8]:
def find_month(df):
    df["Mes"] = df.apply(lambda Row: month(Row["TIMESTAMP"]), axis=1)
    df["Dia/Noche"] = df.apply(lambda Row: day_night(Row["TIMESTAMP"]), axis=1)
    #df1 = dd.from_pandas(df, npartitions=76)
    return df

"""def find_day(df):
    df["Dia/Noche"] = df.apply(lambda Row: day_night(Row["TIMESTAMP"]), axis=1)
    df = dd.from_pandas(df, npartitions=76)
    return df"""

'def find_day(df):\n    df["Dia/Noche"] = df.apply(lambda Row: day_night(Row["TIMESTAMP"]), axis=1)\n    df = dd.from_pandas(df, npartitions=76)\n    return df'

In [34]:
df1 = df.copy()

In [9]:
df["Mes"] = 0
df["Dia/Noche"] = 0

In [10]:
%time df = df.map_partitions(find_month, meta=df).compute(scheduler='processes')

CPU times: user 1.42 s, sys: 942 ms, total: 2.37 s
Wall time: 25 s


In [11]:
df = dd.from_pandas(df, npartitions=76)

In [12]:
type(df)

dask.dataframe.core.DataFrame

In [13]:
len(df.index)

3225764

In [14]:
df.head()

Unnamed: 0,TIMESTAMP,Temp_Avg,RH_Avg,WSpeed_Avg,WSpeed_Max,WDir_Avg,WDir_SD,Rain_Tot,Press_Avg,Rad_Avg,Mes,Dia/Noche
0,2020-06-01 00:00:00,16.15,70.69,1.004,1.116,100.3,4.682,0.0,806.0,-1.337,6,0
0,2020-11-01 00:00:00,16.89,87.8,1.702,1.946,242.5,16.94,0.0,809.553,-1.429,11,0
0,2016-03-01 00:00:00,14.39,75.55,1.002,0.0,262.0,8.47,0.0,806.0,0.0,3,0
0,2021-02-01 00:00:00,14.17,56.79,2.154,2.529,267.8,7.473,0.0,808.148,-1.106,2,0
0,2020-12-01 00:00:00,16.63,70.85,2.391,3.379,2.498,5.615,0.0,807.0,-1.475,12,0


In [15]:
y = df["Temp_Avg"]

In [22]:
y

Dask Series Structure:
npartitions=76
0        float64
574          ...
          ...   
43677        ...
44639        ...
Name: Temp_Avg, dtype: float64
Dask Name: getitem, 152 tasks

In [23]:
#X = df.copy()

In [16]:
X = df.drop(['TIMESTAMP', "Temp_Avg"], axis=1)

In [17]:
X.head()

Unnamed: 0,RH_Avg,WSpeed_Avg,WSpeed_Max,WDir_Avg,WDir_SD,Rain_Tot,Press_Avg,Rad_Avg,Mes,Dia/Noche
0,70.69,1.004,1.116,100.3,4.682,0.0,806.0,-1.337,6,0
0,87.8,1.702,1.946,242.5,16.94,0.0,809.553,-1.429,11,0
0,75.55,1.002,0.0,262.0,8.47,0.0,806.0,0.0,3,0
0,56.79,2.154,2.529,267.8,7.473,0.0,808.148,-1.106,2,0
0,70.85,2.391,3.379,2.498,5.615,0.0,807.0,-1.475,12,0


## Scalar

In [18]:
scaler = MinMaxScaler() 
X_scaler = scaler.fit_transform(X)

In [19]:
X_scaler = X_scaler.to_dask_array()
y = y.to_dask_array()

In [20]:
X_train, X_test, y_train, y_test = train_test_split(X_scaler.compute_chunk_sizes(), y.compute_chunk_sizes(), test_size = 0.3, shuffle=True)

In [21]:
X_train

Unnamed: 0,Array,Chunk
Bytes,172.27 MiB,2.27 MiB
Shape,"(2258000, 10)","(29754, 10)"
Count,608 Tasks,76 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 172.27 MiB 2.27 MiB Shape (2258000, 10) (29754, 10) Count 608 Tasks 76 Chunks Type float64 numpy.ndarray",10  2258000,

Unnamed: 0,Array,Chunk
Bytes,172.27 MiB,2.27 MiB
Shape,"(2258000, 10)","(29754, 10)"
Count,608 Tasks,76 Chunks
Type,float64,numpy.ndarray


In [22]:
lr = LinearRegression()

In [23]:
with ProgressBar():
    lr.fit(X_train, y_train)

[########################################] | 100% Completed |  1.3s
[########################################] | 100% Completed | 10.3s
[########################################] | 100% Completed | 12.2s
[########################################] | 100% Completed | 11.9s
[########################################] | 100% Completed | 11.9s
[########################################] | 100% Completed | 11.4s
[########################################] | 100% Completed | 12.0s
[########################################] | 100% Completed | 12.3s
[########################################] | 100% Completed | 12.0s
[########################################] | 100% Completed | 11.3s
[########################################] | 100% Completed | 11.2s
[########################################] | 100% Completed | 11.7s
[########################################] | 100% Completed | 12.4s
[########################################] | 100% Completed | 12.0s
[########################################] | 100

In [24]:
y_pred = lr.predict(X_test)

In [32]:
y_pred

Unnamed: 0,Array,Chunk
Bytes,7.38 MiB,99.62 kiB
Shape,"(967758,)","(12752,)"
Count,761 Tasks,76 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.38 MiB 99.62 kiB Shape (967758,) (12752,) Count 761 Tasks 76 Chunks Type float64 numpy.ndarray",967758  1,

Unnamed: 0,Array,Chunk
Bytes,7.38 MiB,99.62 kiB
Shape,"(967758,)","(12752,)"
Count,761 Tasks,76 Chunks
Type,float64,numpy.ndarray


In [25]:
y_pred_df = y_pred.to_dask_dataframe()

In [26]:
y_pred_df.head()

0    13.470033
1    14.790290
2    16.708427
3    12.621100
4    13.936094
dtype: float64

In [27]:
y_test.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,7.38 MiB,99.62 kiB
Shape,"(967758,)","(12752,)"
Count,608 Tasks,76 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.38 MiB 99.62 kiB Shape (967758,) (12752,) Count 608 Tasks 76 Chunks Type float64 numpy.ndarray",967758  1,

Unnamed: 0,Array,Chunk
Bytes,7.38 MiB,99.62 kiB
Shape,"(967758,)","(12752,)"
Count,608 Tasks,76 Chunks
Type,float64,numpy.ndarray


In [28]:
y_test_df = y_test.to_dask_dataframe()
y_test_df.head()

0    14.19
1    20.04
2    16.28
3     3.47
4     9.75
dtype: float64

In [29]:
lr.score(X_test, y_test)

0.6325708831502763

In [30]:
mean_absolute_error(y_test, y_pred)

2.3587257394940804