In [None]:
import pandas as pd
import numpy as np
import craftai.pandas
import os
from multiprocessing import Pool

## Load Data

In [2]:
PATH = 'data/'

In [3]:
yellow = pd.read_csv(PATH + 'yellow.csv')

In [4]:
yellow.pickup_datetime = pd.to_datetime(yellow.pickup_datetime, utc=True)
yellow.set_index('pickup_datetime', drop=True, inplace=True)

yellow.index = yellow.index.tz_convert('America/New_York')

yellow.pickup_datetime = pd.to_datetime(yellow.pickup_datetime, utc=True)
yellow.set_index('pickup_datetime', drop=True, inplace=True)

yellow.index = yellow.index.tz_convert('America/New_York')
yellow['timezone'] = '-05:00'

In [5]:
yellow.head()

Unnamed: 0_level_0,taxi_zone,trip_counter,timezone
pickup_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2017-01-01 00:00:00-05:00,1,2.0,-05:00
2017-01-01 00:00:00-05:00,4,54.0,-05:00
2017-01-01 00:00:00-05:00,7,66.0,-05:00
2017-01-01 00:00:00-05:00,10,1.0,-05:00
2017-01-01 00:00:00-05:00,12,3.0,-05:00


In [6]:
def build_agent_df(taxi_zone):
    trips = []
    data = yellow[yellow.taxi_zone == taxi_zone]
    for t in time_index:
        if t in data.index:
            trips.append(data.trip_counter[data.index == t].values[0])
        else:
            trips.append(0)
    print('--| Zone', taxi_zone, 'computed')
    return taxi_zone, {'data': pd.DataFrame(data={'trip_counter':trips, 'timezone':'-05:00'}, index=time_index)}

In [7]:
%%time 
time_index = pd.date_range("2017-01-01 00:00", "2017-12-31 23:00", freq="h", tz='America/New_York')


p = Pool(10)
full = dict(p.map(build_agent_df, yellow.taxi_zone.unique()[:20]))

--| Zone 20 computed
--| Zone 18 computed
--| Zone 21 computed
--| Zone 14 computed
--| Zone 1 computed
--| Zone 26 computed
--| Zone 10 computed
--| Zone 17 computed
--| Zone 35 computed
--| Zone 12 computed
--| Zone 13 computed
--| Zone 39 computed
--| Zone 7 computed
--| Zone 4 computed
--| Zone 24 computed
--| Zone 25 computed
--| Zone 36 computed
--| Zone 33 computed
--| Zone 37 computed
--| Zone 40 computed
CPU times: user 138 ms, sys: 93.5 ms, total: 232 ms
Wall time: 23 s


Process ForkPoolWorker-6:
Process ForkPoolWorker-7:
Process ForkPoolWorker-2:
Traceback (most recent call last):
Process ForkPoolWorker-10:
Traceback (most recent call last):
Process ForkPoolWorker-4:
Process ForkPoolWorker-8:
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Process ForkPoolWorker-3:
Process ForkPoolWorker-5:
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
Process ForkPoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
Traceback (most recent

In [8]:
for k in sorted(full.keys()):
    print(k)
    print(full[k]['data'].head())
    print('\n\n')

1
                           trip_counter timezone
2017-01-01 00:00:00-05:00           2.0   -05:00
2017-01-01 01:00:00-05:00           2.0   -05:00
2017-01-01 02:00:00-05:00           2.0   -05:00
2017-01-01 03:00:00-05:00           0.0   -05:00
2017-01-01 04:00:00-05:00           0.0   -05:00



4
                           trip_counter timezone
2017-01-01 00:00:00-05:00          54.0   -05:00
2017-01-01 01:00:00-05:00          36.0   -05:00
2017-01-01 02:00:00-05:00          28.0   -05:00
2017-01-01 03:00:00-05:00          36.0   -05:00
2017-01-01 04:00:00-05:00          34.0   -05:00



7
                           trip_counter timezone
2017-01-01 00:00:00-05:00          66.0   -05:00
2017-01-01 01:00:00-05:00          54.0   -05:00
2017-01-01 02:00:00-05:00          42.0   -05:00
2017-01-01 03:00:00-05:00          26.0   -05:00
2017-01-01 04:00:00-05:00          33.0   -05:00



10
                           trip_counter timezone
2017-01-01 00:00:00-05:00           1.0   -05:00
20

## 2. Connect to craftai api


In [9]:
client = craftai.pandas.Client({
  "token": os.environ.get("CRAFT_TOKEN")
})

## 3. Create agents


In [10]:
CONFIGURATION = {
    "context": {
        "month_of_year": {
            "type" : "month_of_year"
        },
        "day_of_week": {                  
            "type" : "day_of_week"
        },
        "time": {                    
            "type": "time_of_day"
        },
        "timezone": {                       
            "type" : "timezone",
        },
        "trip_counter": {                          
            "type": "continuous"
        }
    },
    "output": ["trip_counter"],                    # the output is continuous
}

def create_agent(taxi_zone):
    agent_id = "taxi_zone{:0>3}".format(taxi_zone)

    # Delete older version of the agent
    client.delete_agent(agent_id)

    # Add the new agent
    agent = client.create_agent(CONFIGURATION, agent_id)
    full[taxi_zone]['agent'] = agent
    print("Agent", agent["id"], "has successfully been created")
    

### Create Agents

In [19]:
any(map(create_agent, full.keys()))

Agent taxi_zone001 has successfully been created
Agent taxi_zone004 has successfully been created
Agent taxi_zone007 has successfully been created
Agent taxi_zone010 has successfully been created
Agent taxi_zone012 has successfully been created
Agent taxi_zone013 has successfully been created
Agent taxi_zone014 has successfully been created
Agent taxi_zone017 has successfully been created
Agent taxi_zone018 has successfully been created
Agent taxi_zone020 has successfully been created
Agent taxi_zone021 has successfully been created
Agent taxi_zone024 has successfully been created
Agent taxi_zone025 has successfully been created
Agent taxi_zone026 has successfully been created
Agent taxi_zone033 has successfully been created
Agent taxi_zone035 has successfully been created
Agent taxi_zone036 has successfully been created
Agent taxi_zone037 has successfully been created
Agent taxi_zone039 has successfully been created
Agent taxi_zone040 has successfully been created


False

## 4. Add Agent operations

In [20]:
def add_operations(taxi_zone):
    print(client.add_operations(full[taxi_zone]['agent']['id'], full[taxi_zone]['data'])['message'])
    

In [21]:
any(map(add_operations, full.keys()))

Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone001" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone004" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone007" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone010" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone012" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone013" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone014" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone017" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone018" context.
Successfully added 8760 operation(s) to the agent "yrieix.leprince/Taxi/taxi_zone020" context.
Successfully added 8760 operation(s) to the agent 

False

client.add_operations(agent_id, yellow)

## 5. Get Last Decision Tree

In [34]:
ts = full[1]['data'].index.astype(np.int64).values[-1] // 10**9 #get last timestamp


def get_DT(taxi_zone):
    full[taxi_zone]['tree'] = client.get_decision_tree(full[taxi_zone]['agent']['id'], timestamp=ts)
    print('DT', taxi_zone, '--> ok')

In [36]:
for taxi_zone in full.keys():
    get_DT(taxi_zone)

DT 1 --> ok
DT 4 --> ok
DT 7 --> ok
DT 10 --> ok
DT 12 --> ok
DT 13 --> ok
DT 14 --> ok


KeyboardInterrupt: 

In [35]:
#any(map(get_DT, full.keys()))

DT 1 --> ok


KeyboardInterrupt: 

Process ForkPoolWorker-46:
Process ForkPoolWorker-47:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/yrieix/.pyenv/versions/3.7.0/lib/python3.7/multiprocessing/queues.py", line 

## 6. Decide 

decisions_df = pd.DataFrame(
    [['-05:00'] for t in range(24)],
    columns=['timezone'],
    index=pd.date_range("2018-01-01 00:00", periods=24, freq="h").tz_localize("America/New_York")
)

decisions_df.head()

pd.DataFrame?