In [1]:
import uuid
import random
import math

import dask.dataframe as dd
from glob import glob
import numpy as np
import pandas as pd

from dask.diagnostics import ProgressBar

from helpers import *

from tqdm import tqdm_notebook

pbar = ProgressBar()
pbar.register()

  data = yaml.load(f.read()) or {}


# Generate fake events/features

## Configuration

In [2]:
number_of_cores = 8
devices_fake_path = '../data/device_geolocations_fake/100_devices_with_geolocation.parquet'

# real data
weather_real_path = '../data/weather_real/*/*/*.parquet'

# save to
fake_features_dir_path = '../data/features_fake/' # slash in the end

## Faked devices

In [3]:
devices_raw = pd.read_parquet(devices_fake_path)

In [4]:
devices_raw.head()

Unnamed: 0,gateway_uuid,geo_lat,geo_lng,zip_code
0,45bdde5d-d718-46c0-9f40-2fb7611f299e,52.4119,11.5299,39326
1,3634f970-3acb-4143-8fa0-1d2828bace63,51.0148,7.0729,51375
2,5d5f372e-bc86-48c6-af87-77e590e11061,50.2436,9.0757,63654
3,8dadf510-6ecd-424f-b89e-6660907682a1,48.534,11.6614,84072
4,9ee3850f-5eb8-47d6-8e27-3047a795663c,51.2969,13.8929,1945


## Real weather data

In [5]:
weather_raw = dd.read_parquet(glob(weather_real_path))

In [9]:
locations = weather_raw[['zip_code', 'timestamp', 'temp']]
locations['temp'] = locations['temp'].astype(float)
locations['timestamp'] = locations['timestamp'].astype(str)
locations['zip_code'] = locations['zip_code'].astype(str)
locations['zip_code_prefix'] = locations['zip_code'].str[:1]
locations['timestamp_prefix'] = locations['timestamp'].str[:13]
locations['month'] = locations['timestamp'].str[5:7]
locations['day'] = locations['timestamp'].str[8:10]
locations['hour'] = locations['timestamp'].str[11:13]
locations['month'] = locations['month'].astype(int)
locations['day'] = locations['day'].astype(int)
locations['hour'] = locations['hour'].astype(int)

### Basic temperature for generating outside temp events

In [10]:
locations_grouped = locations.groupby(['zip_code_prefix', 'month', 'day', 'hour'])['temp'].aggregate(['mean', 'std'])
locations_grouped = locations_grouped.reset_index()
temp_in_time = locations_grouped.compute()

[                                        ] | 0% Completed |  0.0s

  return pd.MultiIndex(levels=levels, labels=labels, names=idx.names)


[########################################] | 100% Completed | 40.8s


In [11]:
temp_in_time.head()

Unnamed: 0,zip_code_prefix,month,day,hour,mean,std
0,0,10,1,0,6.613445,2.008861
1,0,10,1,1,6.163866,2.064853
2,0,10,1,2,5.420168,2.248727
3,0,10,1,3,4.785714,2.002179
4,0,10,1,4,4.693277,1.830282


### Timeframe for generated events

In [12]:
weather_timeframes = locations.groupby(['zip_code_prefix'])['timestamp_prefix'].aggregate(['min', 'max']).reset_index().compute()

[########################################] | 100% Completed | 23.0s


In [13]:
weather_timeframes

Unnamed: 0,zip_code_prefix,min,max
0,0,2018-10-01T00,2018-11-30T23
1,1,2018-10-01T00,2018-11-30T23
2,2,2018-10-01T00,2018-11-30T23
3,3,2018-10-01T00,2018-11-30T23
4,4,2018-10-01T00,2018-11-30T23
5,5,2018-10-01T00,2018-11-30T23
6,6,2018-10-01T00,2018-11-30T23
7,7,2018-10-01T00,2018-11-30T23
8,8,2018-10-01T00,2018-11-30T23
9,9,2018-10-01T00,2018-11-30T23


### All days as month, day df

In [14]:
timeframes = temp_in_time.groupby(['month', 'day']).first().reset_index()[['month', 'day']]

## Prepare events

In [15]:
devices = devices_raw[['gateway_uuid', 'zip_code']]
devices['zip_code_prefix'] = devices['zip_code'].str[:1]
ddata = dd.from_pandas(devices, npartitions=number_of_cores)
devices['factor'] = ddata.apply(lambda x: random.uniform(-1, 1), axis=1, meta=('x', float)).compute()
devices['pressure_base_factor'] = ddata.apply(lambda x: np.random.choice([0.2, 0.5, 1.5, 2.5, 4.0], 1, p=[0.01, 0.02, 0.95, 0.01, 0.01])[0], axis=1, meta=('x', float)).compute()

[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.1s


In [16]:
devices['key'] = 0
timeframes['key'] = 0

In [17]:
events_base = pd.merge(devices, timeframes, on='key')[['gateway_uuid', 'zip_code', 'zip_code_prefix', 'factor', 'pressure_base_factor', 'month', 'day']]

In [18]:
events_base.head()

Unnamed: 0,gateway_uuid,zip_code,zip_code_prefix,factor,pressure_base_factor,month,day
0,45bdde5d-d718-46c0-9f40-2fb7611f299e,39326,3,-0.478963,1.5,10,1
1,45bdde5d-d718-46c0-9f40-2fb7611f299e,39326,3,-0.478963,1.5,10,2
2,45bdde5d-d718-46c0-9f40-2fb7611f299e,39326,3,-0.478963,1.5,10,3
3,45bdde5d-d718-46c0-9f40-2fb7611f299e,39326,3,-0.478963,1.5,10,4
4,45bdde5d-d718-46c0-9f40-2fb7611f299e,39326,3,-0.478963,1.5,10,5


# Generate events (day by day)

## Remove old files

In [19]:
%rm -rf ../data/features_fake/*

## Processing (save day by day)

In [20]:
for row in tqdm_notebook(timeframes.iterrows(), total=61):
    month = row[1]['month']
    day = row[1]['day']
    day_events = events_base[(events_base.month == month) & (events_base.day == day)]
    day_events = day_events[['gateway_uuid', 'zip_code_prefix', 'factor', 'pressure_base_factor', 'month', 'day']].reset_index(drop=True)
    day_temps = temp_in_time[(temp_in_time.month == month) & (temp_in_time.day == day)].reset_index(drop=True)
    
    
    # device.temperature.outside/value
    temp_outside_events = prepareOutsideTemperatureEvents(day_events, day_temps, month, day, 120, number_of_cores)
    
    # device.temperature.room/value
    temp_room_events = prepareRoomTemperatureEvents(day_events, month, day, 60, number_of_cores)
    
    # device.water.pressure/value
    water_pressure_events = prepareWaterPressureEvents(day_events, month, day, 900, number_of_cores)
    
   
    all_day_events = temp_outside_events
    all_day_events = all_day_events.append(
        [
            temp_room_events,
            water_pressure_events
        ]).sort_values(['timestamp'], ascending=[1])
    
    all_day_events.to_parquet(fake_features_dir_path, index=True, partition_cols=['month', 'day'])

HBox(children=(IntProgress(value=0, max=61), HTML(value='')))

[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.5s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.7s
[########################################] | 100% Completed |  0.1s
[########################################] | 100

[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1.2s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1.3s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100

[########################################] | 100% Completed |  0.7s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1.3s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1.3s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.7s
[########################################] | 100% Completed |  0.1s
[########################################] | 100

[########################################] | 100% Completed |  0.6s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  1.4s
[########################################] | 100% Completed |  0.1s
[########################################] | 100% Completed |  0.3s
[########################################] | 100% Completed |  0.1s



In [22]:
df = pd.read_parquet('../data/features_fake/')

In [25]:
df.head()

Unnamed: 0,gateway_uuid,timestamp,property,value,__index_level_0__,month,day
0,45bdde5d-d718-46c0-9f40-2fb7611f299e,2018-10-01T00:00:00Z,device.temperature.outside/value,6.1,0,10,1
1,f236cbfa-6d5a-4b2d-b2ee-897ca26c39bb,2018-10-01T00:00:00Z,device.water.pressure/value,1.448305,27,10,1
2,3405cabf-1354-4faf-871a-c1ee66ccc8d8,2018-10-01T00:00:00Z,device.water.pressure/value,1.439627,28,10,1
3,689fbf80-c3e3-4f38-bb65-e897a8d90ba5,2018-10-01T00:00:00Z,device.water.pressure/value,1.403195,29,10,1
4,bf93b78b-aa01-4ed5-80b4-344ae96b07f4,2018-10-01T00:00:00Z,device.water.pressure/value,1.517292,30,10,1
