In [None]:
import pandas as pd
import numpy as np
import xml.etree.ElementTree as ET
import os
import json
import warnings
from multiprocessing import Pool
from datetime import datetime
import gc

warnings.filterwarnings('ignore')

pd.set_option('display.max_columns', None)
output_path = "./../output/population/"

# Loading events to separate tables
* agents: people
* vehicles:
    * cars
    * subway
    * buses
    * trams
    * trains

In [None]:
events = pd.DataFrame()

dtypes = {
    "Unnamed: 0" : np.float64,
    "time": np.float64,
    "type": str,
    "driverId": str,
    "vehicleId": str,
    "transitLineId": str,
    "transitRouteId": str,
    "departureId": str,
    "person": str,
    "link": str,
    "legMode": 'category', #category
    "vehicle": str,
    "networkMode": str, #category
    "relativePosition": np.float64,
    "facility": str,
    "delay": np.float64,
    "x": np.float64,
    "y": np.float64,
    "actType": str,
    "computationalRoutingMode": str,
    "distance" : np.float64,
    "mode": str,
    "agent": str,
    "atStop": str
}

#reading all events at once, will be an issue for 100k+ population
def read_csv(path):
    #print("Reading", path)
    return pd.read_csv(path, dtype=dtypes)


args = list()
for csv in os.listdir(output_path):
    args.append(output_path+csv)

print("Files prepared:", len(args), "files")

In [None]:
with Pool(os.cpu_count()) as pool:
    results = pool.map(read_csv , args)
pool.close()
pool.join()
print("Files loaded.")
events = pd.concat(results)
events.sort_index(inplace=True)

In [None]:
events.info()
del results
#events.describe()

In [None]:
del pool
gc.collect()

In [None]:
events.legMode.value_counts()
#events.facility.value_counts()

In [None]:
np.where(events.person == "8")

In [None]:
person = events.loc[np.where(events.person == 1.0)]
#display(person.head(10))
#person.iloc[np.where(person.legMode == "walk")]

## Loading people to separate file

In [None]:
"""
people = [
        {id: 1,
        events: [
                {event_id :,
                 time: 
                 type:
                 link:
                 vehicle_id:
                 delay:
                 coords: [x,y]
                 destination:  Act)
                 }
        ]}
]
"""

def load_events(row):
    event = {}
    event["event_id"] = row[0]
    event["time"] = row["time"]
    event["type"] = row["type"]
    event["link"] = row["link"]
    event["vehicle_id"] = row["vehicle"]
    event["delay"] = row["delay"]
    event["actType"] = row["actType"]
    event["legMode"] = row["legMode"]
    event["coords_x"] = row["x"]
    event["coords_y"] = row["y"]
    return event

def save_agents(persons, json_file, chunk_size=-1):
    people = pd.DataFrame()
    counter = 0
    chunk_i = 0
    for person in persons:
        agent = pd.DataFrame(person[1])
        agent_id = person[0]
        agent = agent.sort_values("time")
        #print("Person Id:", agent_id)
        events = []
        for id, row in agent.iterrows():
            events.append(load_events(row))

        new_vehicle = pd.DataFrame(columns=['id','events'])
        new_vehicle.loc[0,'id'] = agent_id
        new_vehicle.loc[0,'events']  =  events
        people = people.append(new_vehicle)

        if(chunk_size != -1 and counter > 0 and counter % chunk_size == 0):
            if not os.path.exists(json_file+"/agent"):
                os.makedirs(json_file+"/agent")
            
            people.reset_index(drop=True, inplace=True)
            people.to_json(json_file+"/agent/"+str(chunk_i)+".json", lines=True, orient='records') 
            people = pd.DataFrame()
            chunk_i+=1

        counter += 1
        

    people.reset_index(drop=True, inplace=True)
    if(chunk_size == -1):
        people.to_json(json_file+"/agent.json", lines=True, orient='records')
    else:  
        if not os.path.exists(json_file+"/agent"):
            os.makedirs(json_file+"/agent")
        people.to_json(json_file+"/agent/"+str(chunk_i)+".json", lines=True, orient='records') 

        
    print("People save to:", json_file)

In [None]:
# seq solution
#agents = pd.DataFrame()
#agents =  events[pd.to_numeric(events['person'], errors='coerce').notnull()] # removes drivers
#agents.head()
#agents.vehicle.unique()
# #save_agents(agents.groupby("person"), json_file = "./../output/events", chunk_size=500)

In [None]:
def process_chunk(args):
    persons, json_file, chunk_i = args
    people = pd.DataFrame()
    for person in persons:
        agent_id = person.person.unique()[0]
        agent = person.sort_values("time")
        #print("Person Id:", agent_id)
        events = []
        for id, row in agent.iterrows():
            events.append(load_events(row))

        new_person = pd.DataFrame(columns=['id','events'])
        new_person.loc[0,'id'] = agent_id
        new_person.loc[0,'events']  =  events
        people = people.append(new_person)
        del person

    if not os.path.exists(json_file+"/agent"):
        os.makedirs(json_file+"/agent")
            
    people.reset_index(drop=True, inplace=True)
    people.to_json(json_file+"/agent/"+str(chunk_i)+".json", lines=True, orient='records')
    del people
    gc.collect()
    return


def save_agents_parallel(persons, json_file, chunk_size=500):
    chunks_count = (len(persons)/chunk_size)
    if(len(persons)%chunk_size > 0):
        chunks_count += 1

    #split persons into chunks
    print("Chunks:", chunks_count)
    args = list()
    chunk = []
    chunk_i  = 0
    for e,p in enumerate(persons):
        if(len(chunk) > 0 and e % chunk_size == 0):
            args.append([chunk, json_file, chunk_i])
            chunk = []
            chunk_i +=1
        chunk.append(p)

    if len(chunk) > 0:
        args.append([chunk, json_file, chunk_i])

    print("Args:", len(args))
    with Pool(os.cpu_count()//2) as pool:
        pool.map(process_chunk, args)

    pool.close()
    pool.join()
    print("Agents processed.")

In [None]:
agents = pd.DataFrame()
agents =  events[pd.to_numeric(events['person'], errors='coerce').notnull()] # removes drivers
dfs = [x for _, x in agents.groupby("person")]
del agents
gc.collect()
save_agents_parallel(dfs, json_file = "./../output/events", chunk_size=500)

## Loading vehicle events to separate files

- cars
- subway
- buses
- trams
- trains
- ferry
- funicular


In [None]:
def process_row(row, vehicle_type):
    event = {}
    event["event_id"] = row[0]
    event["time"] = row["time"]
    event["type"] = row["type"]
    event["link"] = row["link"]
    event["person_id"] = row["person"]
    event["delay"] = row["delay"]
    event["facility"] = row['facility']
            #if facility, parse out link
            #print(type(row['facility']))
    if isinstance(row['facility'],str): #and not(np.isnan(row['facility'])):
                #print("filling link", row['facility'],row['facility'].split(":")[-1])
        event['link'] = row['facility'].split(":")[-1]
                #return
    event["networkMode"] = row['networkMode']
    event["relativePosition"] = row['relativePosition']
    event["actType"] = row["actType"]
    event["legMode"] = row["legMode"]
    event["coords_x"] = row["x"]
    event["coords_y"] = row["y"]

    if(vehicle_type != "car"):
        if(event["type"] == "TransitDriverStarts"):
                    #print("Driver starts")
            event["transitLine"] = row['transitLineId']
            event["transitRoute"] = row['transitRouteId'] ## add to output
        event["departure"] = row['departureId']
        event["atStop"] = row["atStop"]
        event["destinationStop"] = row["destinationStop"]
    return event



def save_vehicle(vehicle_events, json_file, vehicle_type = "", chunk_size=-1):
    print("Saving vehicle", vehicle_type)
    vehicles = pd.DataFrame()
    counter = 0
    chunk_i = 0
    ids_in_chunks = {}
    for agent in vehicle_events:
        vehicle = pd.DataFrame(agent[1])
        vehicle = vehicle.sort_values("time")
        events = []
        if(vehicle_type == "car"):
            vehicle_id = int(vehicle.vehicle.unique()[0])
        else:
            vehicle_id = vehicle.vehicle.unique()[0]

        ids_in_chunks[vehicle_id] = chunk_i

        #parse events
        for id, row in vehicle.iterrows():
            event = process_row(row, vehicle_type)
            events.append(event)
            
        new_vehicle = pd.DataFrame(columns=['id','events'])
        new_vehicle.loc[0,'id'] = vehicle_id
        new_vehicle.loc[0,'events']  =  events
        vehicles = vehicles.append(new_vehicle)
        vehicles.reset_index(drop=True, inplace=True)
        if(chunk_size != -1 and counter > 0 and counter % chunk_size == 0):
            if not os.path.exists(json_file+"/"+vehicle_type):
                os.makedirs(json_file+"/"+vehicle_type)
            vehicles.to_json(json_file+"/"+vehicle_type+"/"+str(chunk_i)+".json", lines=True, orient='records') 
            vehicles = pd.DataFrame()
            chunk_i+=1

        counter += 1

    vehicles.reset_index(drop=True, inplace=True)
    
    if(chunk_size == -1):
        vehicles.to_json(json_file+"/"+vehicle_type+".json", lines=True, orient='records')  
    else:
        if not os.path.exists(json_file+"/"+vehicle_type):
            os.makedirs(json_file+"/"+vehicle_type)
        vehicles.to_json(json_file+"/"+vehicle_type+"/"+str(chunk_i)+".json", lines=True, orient='records') 
        
    with open(json_file+"/"+vehicle_type+'_map.json', 'w') as f:
        json.dump(ids_in_chunks,f)
    del vehicles 
    gc.collect()
    return

In [None]:


def filter_vehicle_events(vehicle_type, json_path="./../output/events"):
    print("Started filtering:",vehicle_type, "at", datetime.now())
    vehicles = pd.DataFrame()
    if vehicle_type == 'car':
        vehicles = events[pd.to_numeric(events['vehicle'], errors='coerce').notnull()]
    else:
        vehicles = events.loc[events['vehicle'].str.contains(vehicle_type, case=False)]
        driver_events = events[events['vehicleId'].notnull() & events['vehicleId'].str.contains(vehicle_type, case=False)]
        driver_events['vehicle'] = driver_events['vehicleId']
        vehicles = vehicles.append(driver_events)
        
    save_vehicle(vehicles.groupby("vehicle"), json_file = json_path, vehicle_type=vehicle_type, chunk_size=500)
    print("Saved vehicle type:",vehicle_type, "to", json_path, "at", datetime.now())
    del vehicles
    gc.collect()
    return

In [None]:
#Sequential solution
#for veh_type in ["subway"]:#['bus',"car","funicular"]: #tram, subway
#        
#        filter_vehicle_events(veh_type)

In [None]:
vehicle_types = ["bus"] #"car","funicular","subway", ,"bus"
events.vehicle = events.vehicle.astype("string")
with Pool(min(os.cpu_count(), len(vehicle_types))) as pool: #todo vehicle chunks, issue with RAM
    pool.map(filter_vehicle_events, vehicle_types)

pool.close()
pool.join()