# Heart of Helsinki using MS Azure tools

### Import needed libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow.keras
import requests
import json
from pandas.io.json import json_normalize
import functools
import asyncio
import requests
import functools
from ast import literal_eval
import time
from datetime import date

import nest_asyncio
nest_asyncio.apply()

### API Station request

In [None]:
url_station = "https://api.hypr.cl/station/"
headers_station = {
    'x-api-key': "****************************************",
    'command': "list",
    'Accept': "*/*",
    'Cache-Control': "no-cache",
    'Host': "api.hypr.cl",
    'Accept-Encoding': "gzip, deflate",
    'Content-Length': "0",
    'Connection': "keep-alive",
    'cache-control': "no-cache"
}
station = requests.request("POST", url_station, headers=headers_station)

In [None]:
station_json = json.loads(station.text)
station_df = json_normalize(station_json['list'])

In [None]:
station_df.head()

### API Raw request

In [None]:
url = "https://api.hypr.cl/raw/"

headers = {
    'x-api-key': "iQ0WKQlv3a7VqVSKG6BlE9IQ88bUYQws6UZLRs1B",
    'time_start': "2019-08-01T12:00:00Z",
    'time_stop': "2019-08-01T12:01:00Z", 'Accept': "*/*",
    'Cache-Control': "no-cache",
    'Host': "api.hypr.cl",
    'Accept-Encoding': "gzip, deflate",
    'Content-Length': "0",
    'Connection': "keep-alive",
    'cache-control': "no-cache" }

raw = requests.request("POST", url, headers=headers)
print(raw.status_code)

In [None]:
raw_json = json.loads(raw.text)
raw_json['raw'][0]

In [None]:
raw_df = json_normalize(raw_json['raw'])
raw_df['time'] = pd.to_datetime(raw_df['time'])

In [None]:
raw_df['hash'].nunique()

### Define functions

Threaded API raw data loop

In [None]:
async def get_hour_data(hour, day, month, year):
    
    # Parameters
    url = "https://api.hypr.cl/raw/"
    batch_size = 10
    hour_str = "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{}Z"
    
    # Temporary results list for JSON data
    result = []
    
    for i in range(0,int(60/batch_size)):
        
        print("Loop:", i+1)
        
        loop_start = time.time()
        
        for j in range(0, batch_size):
        
            datestart = hour_str.format(year, month, day, hour, batch_size*i+j, "00")
            dateend = hour_str.format(year, month, day, hour, batch_size*i+j, "59")

            header_str = {
            'x-api-key': "iQ0WKQlv3a7VqVSKG6BlE9IQ88bUYQws6UZLRs1B",
            'time_start': datestart,
            'time_stop': dateend,
            'Accept': "*/*",
            'Cache-Control': "no-cache",
            'Host': "api.hypr.cl",
            'Accept-Encoding': "gzip, deflate",
            'Content-Length': "0",
            'Connection': "keep-alive",
            'cache-control': "no-cache"
            }

            loop = asyncio.get_event_loop()
            
            if (j==0):
                f0 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==1):
                f1 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==2):
                f2 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==3):
                f3 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==4):
                f4 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==5):
                f5 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==6):
                f6 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==7):
                f7 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==8):
                f8 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
            elif (j==9):
                f9 = loop.run_in_executor(None, functools.partial(requests.request, "POST", url,
                                                               headers=header_str));
        
        response0 = await f0
        response1 = await f1
        response2 = await f2
        response3 = await f3
        response4 = await f4
        response5 = await f5
        response6 = await f6
        response7 = await f7
        response8 = await f8
        response9 = await f9
        
        result.extend(json.loads(response0.text)['raw'])
        result.extend(json.loads(response1.text)['raw'])
        result.extend(json.loads(response2.text)['raw'])
        result.extend(json.loads(response3.text)['raw'])
        result.extend(json.loads(response4.text)['raw'])
        result.extend(json.loads(response5.text)['raw'])
        result.extend(json.loads(response6.text)['raw'])
        result.extend(json.loads(response7.text)['raw'])
        result.extend(json.loads(response8.text)['raw'])
        result.extend(json.loads(response9.text)['raw'])
        
        loop_end = time.time()
        
        print("Loop lasted {} seconds".format(round(loop_end-loop_start,2)))
    
    
    output_df = json_normalize(result)

    print("Time to read JSON to DataFrame:" , round(time.time() - loop_end,2))
    #print("Total amount of rows:", output_df.shape[0])
    
    return output_df


Function for extracting movements from station to another by hash

In [None]:
def exctract_flow_directions(df):
    
    df_grouped = df.groupby(['serial','serial_lead']).nunique()
    
    df_result_temp = pd.DataFrame(columns=['serial', 'serial_lead', 'count'])
    
    for serial in df['serial'].unique():
        for serial_lead in df['serial'].unique():
            
            if (serial != serial_lead):
                try:
                    count = int(df_grouped.loc[(serial, serial_lead), 'hash'])

                    if (df_result_temp[(df_result_temp['serial']==serial_lead) & (df_result_temp['serial_lead']==serial)].shape[0] == 0):
                        df_result_temp = df_result_temp.append({'serial': serial, 'serial_lead': serial_lead, 'count': count}, ignore_index=True)
                    else:
                        df_result_temp = df_result_temp.append({'serial': serial_lead, 'serial_lead': serial, 'count': -count}, ignore_index=True)

                except Exception:
                    pass
    
    df_result_temp_grouped = df_result_temp.groupby(['serial','serial_lead']).sum()
    
    df_result = pd.DataFrame(columns=['serial', 'serial_lead', 'count'])
    
    for serial in df_result_temp['serial'].unique():
        for serial_lead in df_result_temp['serial_lead'].unique():
            try:
                count = int(df_result_temp_grouped.loc[(serial, serial_lead),'count'])

                if (count > 0):
                    df_result = df_result.append({'serial': serial, 'serial_lead': serial_lead, 'count': count}, ignore_index=True)
                else:
                    df_result = df_result.append({'serial': serial_lead, 'serial_lead': serial, 'count': -count}, ignore_index=True)
            except Exception:
                pass

    return df_result

### Run functions in loops

Count how many unique hashs were at station during each hour

In [None]:
test_df = pd.DataFrame()

day = 14
month = 8
year = 2019
loop_start = time.time()

extracted = pd.DataFrame()

for hour in range(0,12):
    
    print(hour, round(time.time()-loop_start,2))
    
    loop = asyncio.get_event_loop()
    
    test_df = loop.run_until_complete(get_hour_data(hour, day, month, year))
    test_df['datetime'] = pd.to_datetime(test_df['time'])
    test_df['serial_lead'] = (test_df.sort_values(by=['datetime'], ascending=True)
                       .groupby(['hash'])['serial'].shift(-1)).reset_index()['serial']
    
    test_df = exctract_flow_directions(test_df)
    test_df['hour'] = hour
    test_df['date'] = date(year=year, month=month, day=day)
    
    if (extracted.shape[0] == 0):
        extracted = test_df
    else:
        extracted = extracted.append(test_df)


print("Ready! Loop lasted: ", round(time.time()-loop_start,2), "seconds.")

Count what the people flow streams were between each station during the hour. When flow was to both directions, the amount for one direction is subsctracted from another, and result is positive number representing flow from origin (serial) to destination (serial_lead) station.

In [None]:
from datetime import datetime
from datetime import date

test_df = pd.DataFrame()
serial_df_temp = pd.DataFrame()
serial_df = pd.DataFrame()

day = 14
month = 8
year = 2019
loop_start = time.time()
for hour in range(9,12):

    print(hour, round(time.time()-loop_start,2))
    
    loop = asyncio.get_event_loop()
    test_df = loop.run_until_complete(get_hour_data(hour, day, month, year))


    test_df['datetime'] = pd.to_datetime(test_df['time'])

    serial_df_temp = test_df.groupby(['serial']).hash.nunique().reset_index(name='nunique')[['serial','nunique']]
    serial_df_temp['date'] = date(year=year,month=month,day=day)
    serial_df_temp['hour'] = hour

    if serial_df.shape[0] == 0:
        serial_df = serial_df_temp
    else:
        serial_df = serial_df.append(serial_df_temp)


print("Ready! Loop lasted: ", round(time.time()-loop_start,2), "seconds.")

Lines for saving data to .csv file in Azure Notebooks / Machine Learning Services

In [None]:
extracted.to_csv('statdata_{:04d}-{:02d}-{:02d}.csv'.format(year,month,day))

In [None]:
serial_df.to_csv('flowdata_{:04d}-{:02d}-{:02d}.csv'.format(year,month,day))

In [None]:
datastore = ws.get_default_datastore()
datastore.upload_files(files = ['./flowdata_{:04d}-{:02d}-{:02d}.csv'.format(year,month,day)],
                       target_path = 'train-dataset/tabular/',
                       overwrite = True,
                       show_progress = True)

In [None]:
# Check core SDK version number
import azureml.core

print('SDK version:', azureml.core.VERSION)


from azureml.core import Workspace

ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep='\n')

experiment_name = 'train-with-datasets'

from azureml.core import Experiment
exp = Experiment(workspace=ws, name=experiment_name)

from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
import os

# choose a name for your cluster
compute_name = os.environ.get('AML_COMPUTE_CLUSTER_NAME', 'cpu-cluster')
compute_min_nodes = os.environ.get('AML_COMPUTE_CLUSTER_MIN_NODES', 0)
compute_max_nodes = os.environ.get('AML_COMPUTE_CLUSTER_MAX_NODES', 4)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get('AML_COMPUTE_CLUSTER_SKU', 'STANDARD_D2_V2')


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                                min_nodes=compute_min_nodes, 
                                                                max_nodes=compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)

    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)