## Cleaning sensors' reads data

In this notebook, we are going to clean and format the data loaded previously in a convinent way.

As we stated previously, this dataset is composed of two file formats:

- CSV files and,
- JSON lines files

Fortunately, all files have the same structure, meaning that they have the same column names. The column names are:

- sensor: The name of the sensor which is performing the reads
- value: The read value
- time: Time when the read was performed

This fact is going to make the cleaning much simpler.

Before starting, we copy the helper function defined in the previous notebook.

In [1]:
import json
import shutil
from pathlib import Path
from functools import partial

import pandas as pd

In [2]:
def read_json_lines(f: str) -> pd.DataFrame:
    lines = Path(f).read_text().split('\n')
    json_data = [json.loads(o, encoding='utf-8') 
                 for o in lines if o]
    return pd.DataFrame(json_data)

In [3]:
RAW_DATA_PATH = Path('../../data/raw/measures')
PROCESSED_DATA_PATH = Path('../../data/processed/measures')
INTERIM_DATA_PATH = Path('../../data/interim/measures')

INTERIM_DATA_PATH.mkdir(exist_ok=True, parents=True)
PROCESSED_DATA_PATH.mkdir(exist_ok=True, parents=True)

To obtain cleaned data, we are going to:

1. Convert all JSON files to CSV files
2. Join all reads comming from the same a sensor type to the same file. For example, we are going to generate a single file from these two: `H-DHT11-measures.json`, `T-DHT11-measures.csv`

## JSON lines to CSV

We first list all JSON lines files inside `measures` directory.

In [4]:
json_lines_fnames = list(RAW_DATA_PATH.glob('*.json'))
json_lines_fnames

[WindowsPath('../../data/raw/measures/H-DHT11-measures.json'),
 WindowsPath('../../data/raw/measures/P-DM280-measures.json'),
 WindowsPath('../../data/raw/measures/T-DM280-measures.json'),
 WindowsPath('../../data/raw/measures/T-HTU21-measures.json')]

For each JSON lines file, we read it using the `read_json_lines` function in order to load it as `pandas.DataFrame`, and later, we export it with the `to_csv` method to the intermediate processed data folder.

In [5]:
def json_lines_to_csv(fname):
    fname_out = INTERIM_DATA_PATH / f'{fname.stem}.csv'
    df = read_json_lines(fname)
    df.to_csv(fname_out, index=False)

for f in json_lines_fnames:
    json_lines_to_csv(f)

Done!

To make the next steps simpler, we are going to copy all the remaining CSV files to `interim` data folder.

In [6]:
csv_fnames = RAW_DATA_PATH.glob('*.csv')
for f in csv_fnames:
    fname_out = INTERIM_DATA_PATH / f'{f.stem}.csv'
    shutil.copy(str(f), str(fname_out))

At this point, we have all the CSV files under the `interim` data directory.

In [7]:
list(map(str, INTERIM_DATA_PATH.iterdir()))

['..\\..\\data\\interim\\measures\\H-DHT11-measures.csv',
 '..\\..\\data\\interim\\measures\\H-DHT22-measures.csv',
 '..\\..\\data\\interim\\measures\\H-HTU21-measures.csv',
 '..\\..\\data\\interim\\measures\\P-BMP280-measures.csv',
 '..\\..\\data\\interim\\measures\\P-DM280-measures.csv',
 '..\\..\\data\\interim\\measures\\T-BMP280-measures.csv',
 '..\\..\\data\\interim\\measures\\T-DHT11-measures.csv',
 '..\\..\\data\\interim\\measures\\T-DHT22-measures.csv',
 '..\\..\\data\\interim\\measures\\T-DM280-measures.csv',
 '..\\..\\data\\interim\\measures\\T-HTU21-measures.csv']

## Join sensor reads

Taking a look at the files we can easily came up with a naming pattern. The files follow the naming convention described below:

```
{MEASURE}-{SENSOR_NAME}-measures.csv
```

Where MEASURE can be T (temperature), H (humidity) or P (pressure).

Thanks to this naming convention, we will be able to group files by sensor name and make the files contain the all reads coming from the same sensor. Note, that this is possible because the sensor reads were made in approximately the same time interval.

To show how our method works, we first do it for a single sensor. For instance, we are going to do it for the sensor `DHT11`.

In [8]:
df_h = pd.read_csv(INTERIM_DATA_PATH / 'H-DHT11-measures.csv', index_col='time')
df_t = pd.read_csv(INTERIM_DATA_PATH / 'T-DHT11-measures.csv', index_col='time')

df_h.head()

Unnamed: 0_level_0,sensor,value
time,Unnamed: 1_level_1,Unnamed: 2_level_1
2017-12-22T11:22:11Z,H-DHT11,31
2017-12-22T11:22:16Z,H-DHT11,31
2017-12-22T11:22:20Z,H-DHT11,31
2017-12-22T11:22:24Z,H-DHT11,31
2017-12-22T11:22:28Z,H-DHT11,31


Since we are working with timeseries, having the time as the `DataFrame` index may be handy for some data transformations.

Now we join the datasets by `time`.

In [9]:
df_ht = df_h.merge(df_t, 'left', 'time', suffixes=('_h', '_t'))
df_ht.head()

Unnamed: 0_level_0,sensor_h,value_h,sensor_t,value_t
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2017-12-22T11:22:11Z,H-DHT11,31,T-DHT11,27.0
2017-12-22T11:22:16Z,H-DHT11,31,T-DHT11,28.0
2017-12-22T11:22:20Z,H-DHT11,31,T-DHT11,28.0
2017-12-22T11:22:24Z,H-DHT11,31,T-DHT11,28.0
2017-12-22T11:22:28Z,H-DHT11,31,T-DHT11,28.0


We can rid off from the sensor name column, because we already know from which sensor are the reads coming. 

Also,  we are going to rename the `value_h` and `value_t` to `humidity` and `temperature`.

In [10]:
df_ht.drop('sensor_h', inplace=True, axis='columns')

In [11]:
df_ht.rename(columns=dict(
    sensor_t='sensor',
    value_h='humidity',
    value_t='temperature'), inplace=True)
df_ht.head()

Unnamed: 0_level_0,humidity,sensor,temperature
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2017-12-22T11:22:11Z,31,T-DHT11,27.0
2017-12-22T11:22:16Z,31,T-DHT11,28.0
2017-12-22T11:22:20Z,31,T-DHT11,28.0
2017-12-22T11:22:24Z,31,T-DHT11,28.0
2017-12-22T11:22:28Z,31,T-DHT11,28.0


After performing the left join, we are going to have few rows with NaNs. The reason for this NaNs, is because in some concrete time intervals, the sensor was off, in this case we have NaNs for the temperature column. Since NaNs means *no sensor activity*, we can safely set them to 0.

In [12]:
print('Rows containing NaN:', df_ht.isna().any(axis=1).sum())
df_ht.isna().any(axis=0)

Rows containing NaN: 81566


humidity       False
sensor          True
temperature     True
dtype: bool

In [13]:
df_ht.sensor = 'DHT11'
df_ht.fillna(0., inplace=True)

At this point, we know how to join two files from the same sensor. So we have to repeat it for each sensor.

In [14]:
sensors = set(o.stem.split('-')[1] for o in INTERIM_DATA_PATH.iterdir())
sensors

{'BMP280', 'DHT11', 'DHT22', 'DM280', 'HTU21'}

In [15]:
units_mapper = dict(H='humidity', T='temperature', P='pressure')
read_sensor =  partial(pd.read_csv, 
                       index_col='time', 
                       usecols=['time', 'value'])

def join_files(sensor, files):
    reads = [(read_sensor(o), o.stem.split('-')[0]) 
             for o in files]
    dfs, units = zip(*reads)

    df = pd.merge(*dfs, 'left', 'time')
    df.rename(columns=dict(
        value_x=units_mapper[units[0]],
        value_y=units_mapper[units[1]]), inplace=True)

    df['sensor'] = s
    df.fillna(0., inplace=True)

    return df

for s in list(sensors):
    print(f'Joining sensor {s} files...', end='')
    files = INTERIM_DATA_PATH.glob(f'*-{s}-*')
    s_df = join_files(s, files)
    s_df.to_csv(PROCESSED_DATA_PATH / f'{s}.csv')
    print(' done')

Joining sensor BMP280 files... done
Joining sensor HTU21 files... done
Joining sensor DHT22 files... done
Joining sensor DHT11 files... done
Joining sensor DM280 files... done
