# ETL

Será feito por ano, gerando cada ano como um arquivo separado pra memória poder aguentar

In [1]:
import numpy as np
import pandas as pd
import json
import os
import matplotlib.pyplot as plt
import seaborn as sns
import re
from datetime import datetime
from tqdm.notebook import tqdm

In [2]:
forecast_path = 'forecasts/2.5'
gridpp_path = 'forecasts/1'
observation_path = 'observation'

In [3]:
def is_csv(x):
    if x[-4:] == '.csv':
        return True
    return False

def read_all_files(path, files):
    df = pd.DataFrame()
    for arquivo in tqdm(files):
        df_aux = pd.read_csv(f'{path}/{arquivo}')
        df = df.append(df_aux)

    return df

def filter_observation_list_by_year(year):
    observation_list = os.listdir(observation_path)
    return list(filter(lambda x: re.findall(f'\_{year}\_', x), observation_list))

def kelvin_to_celsius(k):
    return k - 273

def print_status(df):
    print(df.shape)
    print('='*10)
    print(df.info())

def str_to_datime(x):
    return datetime.strptime(x[:-5].replace('T', ' '), '%Y-%m-%d %H:%M:%S')

In [4]:
desired_columns = ['station_id', 'lat', 'long', 'forecast', 'gridpp', 'observations', 'year', 'month', 'day', 'hour']

In [5]:
filter_observation_list_by_year(2021)

['observation_2021_1.csv',
 'observation_2021_0.csv',
 'observation_2021_2.csv',
 'observation_2021_3.csv']

## 2019

In [6]:
year = 2021

In [7]:
to_transform = ['year', 'month', 'day', 'hour'] # to int

In [8]:
forecast_files = os.listdir(forecast_path)
forecast_files = list(filter(is_csv, forecast_files))

forecasts = read_all_files(forecast_path, forecast_files)
forecasts = forecasts[forecasts.year == year]

forecasts.reset_index(drop=True, inplace=True)
forecasts['forecast'] = forecasts['forecast'].apply(lambda x: kelvin_to_celsius(x))

for item in to_transform:
    forecasts = forecasts.astype({item: 'int32'})

print('transforming datetime')
forecasts['datetime'] = forecasts.apply(lambda x: datetime(x['year'], x['month'], x['day'], x['hour']), axis=1)
print('Done!')
print_status(forecasts)

  0%|          | 0/22 [00:00<?, ?it/s]

transforming datetime
Done!
(1077462, 12)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1077462 entries, 0 to 1077461
Data columns (total 12 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   nearest_forecast_long  1077462 non-null  float64       
 1   nearest_forecast_lat   1077462 non-null  float64       
 2   long                   1077462 non-null  float64       
 3   station_id             1077462 non-null  object        
 4   lat                    1077462 non-null  float64       
 5   forecast               1077462 non-null  float64       
 6   indexes                1077462 non-null  object        
 7   year                   1077462 non-null  int32         
 8   month                  1077462 non-null  int32         
 9   day                    1077462 non-null  int32         
 10  hour                   1077462 non-null  int32         
 11  datetime               1077462 non-null  dateti

In [9]:
gridpp_files = os.listdir(gridpp_path)
gridpp_files = list(filter(is_csv, gridpp_files))

gridpp = read_all_files(gridpp_path, gridpp_files)
gridpp = gridpp[gridpp.year == year]

gridpp.reset_index(drop=True, inplace=True)
gridpp['forecast'] = gridpp['forecast'].apply(lambda x: kelvin_to_celsius(x))

for item in to_transform:
    gridpp = gridpp.astype({item: 'int32'})
print('transforming datetime')
gridpp['datetime'] = gridpp.apply(lambda x: datetime(x['year'], x['month'], x['day'], x['hour']), axis=1)
print('Done!')
# Only for gridpp
gridpp.rename(columns={'forecast': 'gridpp'}, inplace=True)

print_status(gridpp)

  0%|          | 0/22 [00:00<?, ?it/s]

transforming datetime
Done!
(1075245, 14)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1075245 entries, 0 to 1075244
Data columns (total 14 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   long                   1075245 non-null  float64       
 1   lat                    1075245 non-null  float64       
 2   gridpp                 1075245 non-null  float64       
 3   station_id             1075245 non-null  object        
 4   nearest_forecast_lat   0 non-null        float64       
 5   indexes                1075245 non-null  object        
 6   nearest_forecast_long  0 non-null        float64       
 7   nearest_gridpp_lat     1075245 non-null  float64       
 8   nearest_gridpp_long    1075245 non-null  float64       
 9   year                   1075245 non-null  int32         
 10  month                  1075245 non-null  int32         
 11  day                    1075245 non-null  int32 

In [10]:
observation_files = filter_observation_list_by_year(year)
observation = read_all_files(observation_path, observation_files)

observation['observations'] = observation['observations'].apply(lambda x: x.split(':'))
re_to_extract_numbers = r'\-*\d+\.*\d*'
observation['observations'] = observation['observations'].apply(lambda x: float(re.findall(re_to_extract_numbers, x[-1])[0]))

print('transforming datetime')
observation['datetime'] = observation.referenceTime.apply(lambda x: str_to_datime(x))
print('Done!')
print('transforming Station Id')
observation['sourceId'] = observation['sourceId'].apply(lambda x: x.split(':')[0])
print('Done!')

print_status(observation)


  0%|          | 0/4 [00:00<?, ?it/s]

transforming datetime
Done!
transforming Station Id
Done!
(19937735, 4)
<class 'pandas.core.frame.DataFrame'>
Int64Index: 19937735 entries, 0 to 3219402
Data columns (total 4 columns):
 #   Column         Dtype         
---  ------         -----         
 0   sourceId       object        
 1   referenceTime  object        
 2   observations   float64       
 3   datetime       datetime64[ns]
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 760.6+ MB
None


### Merge

In [11]:
final_data = forecasts.merge(gridpp[['station_id', 'datetime', 'gridpp']], how='inner', on=['station_id', 'datetime'])
final_data.shape

(1073767, 13)

In [12]:
final_data = final_data.merge(observation, how='inner', left_on=['station_id', 'datetime'], right_on=['sourceId', 'datetime'])
final_data.shape

(923376, 16)

In [13]:
final_data = final_data[desired_columns]
final_data

Unnamed: 0,station_id,lat,long,forecast,gridpp,observations,year,month,day,hour
0,SN18700,59.9423,10.7200,0.496643,-4.427063,-4.7,2021,1,8,0
1,SN80740,66.9035,13.6460,-12.165466,-4.732391,-7.5,2021,1,8,0
2,SN61630,62.2583,8.2000,-19.095139,-21.671463,-20.4,2021,1,8,0
3,SN78910,64.6933,12.3295,-15.207458,-12.741394,-12.4,2021,1,8,0
4,SN8140,61.4255,11.0803,-4.668396,-13.406799,-14.7,2021,1,8,0
...,...,...,...,...,...,...,...,...,...,...
923371,SN58900,61.9157,6.5585,13.668213,14.045288,12.6,2021,6,8,6
923372,SN84380,68.3710,17.2438,4.283936,13.258179,12.7,2021,6,8,6
923373,SN10380,62.5773,11.3518,15.978271,11.468811,9.7,2021,6,8,6
923374,SN97120,69.7563,26.1457,12.851807,10.440582,13.5,2021,6,8,6


In [14]:
final_data.to_csv(f'../data/final_data_{year}.csv', index=False)

In [15]:
observation[observation.sourceId == 'SN61630']

Unnamed: 0,sourceId,referenceTime,observations,datetime
61318,SN61630,2021-01-01T00:00:00.000Z,-2.8,2021-01-01 00:00:00
61319,SN61630,2021-01-01T01:00:00.000Z,-3.0,2021-01-01 01:00:00
61320,SN61630,2021-01-01T02:00:00.000Z,-3.3,2021-01-01 02:00:00
61321,SN61630,2021-01-01T03:00:00.000Z,-3.1,2021-01-01 03:00:00
61322,SN61630,2021-01-01T04:00:00.000Z,-3.2,2021-01-01 04:00:00
...,...,...,...,...
70070,SN61630,2021-12-31T19:00:00.000Z,-2.4,2021-12-31 19:00:00
70071,SN61630,2021-12-31T20:00:00.000Z,-2.5,2021-12-31 20:00:00
70072,SN61630,2021-12-31T21:00:00.000Z,-2.2,2021-12-31 21:00:00
70073,SN61630,2021-12-31T22:00:00.000Z,-3.5,2021-12-31 22:00:00


In [16]:
observation.shape

(19937735, 4)