In [1]:
import pandas as pd
import numpy as np
from collections import defaultdict, Counter
from shapely.geometry import Point
from shapely.geometry import LineString
from shapely import wkt
from datetime import datetime, timedelta
# from Data_Preprocessing.Bus_Routes.generate_routes import transform_coordinates

pd.set_option('display.max_columns', 500)
UNASSIGNED = -1
MIN_NUM_OF_PINGS = 5 # before that we can't conclude on a drive or close the drive
MAX_NUM_OF_PINGS = 10 # number of most updated pings to be considered in deciding on a route
MAX_PINGS_ERROR = 50 # max allowed avg distance from route-linestring (meters)
MAX_TIME_BETWEEN_PINGS = 300 # seconds
MAX_UNASSIGNED_PINGS = 50
MAX_DISTANCE_TO_END_STOP = 50 # meters

In [2]:
# def adjust_stream_data(stream):
#     stream["timestamp"] = pd.to_datetime(stream["timestamp"]).apply(lambda elem:elem.replace(second=0))
#     stream["point"] = stream.apply(
#         lambda row: transform_coordinates(row["longitude"],row["latitude"]),axis=1)
#     return stream

stream = pd.read_csv("..//Data//Samples//3days3lines_adjusted_small.csv",dtype={"lineId":str,"vehicleId":str})
stream["timestamp"] = pd.to_datetime(stream["timestamp"]).apply(lambda elem:elem.replace(second=0))
stream["point"] = stream["point"].apply(lambda elem : wkt.loads(elem))
stream

Unnamed: 0,longitude,latitude,lineId,timestamp,journeyPatternId,vehicleId,point
0,-6.367692,53.353845,25,2017-07-03 10:56:00,025B0002,33401,POINT (708666.7984612143 734978.8023896469)
1,-6.260080,53.346720,25,2017-07-03 10:56:00,025A0002,33352,POINT (715849.5037407882 734355.274681642)
2,-6.295959,53.325905,150,2017-07-03 10:56:00,01501001,33493,POINT (713516.0332653577 731981.8164871124)
3,-6.316828,53.316637,150,2017-07-03 10:56:00,01500001,33518,POINT (712150.2444403638 730917.7067627693)
4,-6.256600,53.348150,25,2017-07-03 10:56:00,025B1002,33364,POINT (716077.3078691751 734520.0255593333)
...,...,...,...,...,...,...,...
495,-6.458023,53.337860,25,2017-07-03 11:10:00,025B1002,33407,POINT (702692.0767034456 733066.6135791161)
496,-6.424870,53.347156,25,2017-07-03 11:10:00,025A1002,33399,POINT (704877.0159858828 734149.0787458537)
497,-6.316827,53.320858,150,2017-07-03 11:10:00,01501001,33519,POINT (712139.2438399385 731387.330514383)
498,-6.348072,53.347270,25,2017-07-03 11:10:00,025A0002,33352,POINT (709989.7736232018 734277.3119445775)


In [3]:
class Route:
# Represents a general route of a bus line
    def __init__(self,route_df):
        self.direction_ = route_df.iloc[0]["direction_id"]
        self.line_ = route_df.iloc[0]["route_short_name"]
        self.route_ = route_df[["stop_sequence","stop_id","stop_point","dist_traveled"]]
        self.linestring_ = LineString(list(self.route_["stop_point"]))
        self.stops_dist_from_beg_ = [self.linestring_.project(Point(coords)) for coords in self.linestring_.coords]
    
    def getProjectedPoints(self,points,max_pings = MAX_NUM_OF_PINGS):
        proj_points = defaultdict(lambda :list())
        last_points = points[-max_pings:]
        for point in last_points:
            proj_points["dist_to_line"].append(self.linestring_.distance(point))
            proj_points["dist_traveled"].append(self.linestring_.project(point))
            proj_points["proj_point"].append(self.linestring_.interpolate(self.linestring_.project(point)))
        return proj_points
    
    # Score - the lower the better
    def getDriveRouteScore(self,points):
        proj_points = self.getProjectedPoints(points)
        # same direction
        # TODO: enable some errors in the direction decision making and adjust the score accordingly
#         print("points: ", points)
#         print("proj_points: ",proj_points)
        print("direction list: ",[proj_points["dist_traveled"][i] < proj_points["dist_traveled"][i+1] for i in range(len(proj_points["dist_traveled"])-1)])
        print("avg distance: ", np.array(proj_points["dist_to_line"]).mean())
        print()
        if not all([proj_points["dist_traveled"][i] < proj_points["dist_traveled"][i+1] for i in range(len(proj_points["dist_traveled"])-1)]):
            return np.inf
        score = np.array(proj_points["dist_to_line"]).mean()
        return score
    
    def getNextStop(self,point):
        proj_point = self.getProjectedPoints([point])
        return np.searchsorted(self.stops_dist_from_beg_, proj_point["dist_traveled"], side='right') + 1
    
    def isFinalStop(self,point):
        next_stop_num = self.getDriveRouteScore(point)
        # redundant check
        if next_stop_num < 1 or next_stop_num > len(self.linestring_.coords):
            raise Exception("Stop number doesn't exists: {}.".format(next_stop_num))
        return next_stop_num == len(self.linestring_.coords) or point.distance(self.linestring_.coords[-1]) < MAX_DISTANCE_TO_END_STOP
        
    
    
class Routes:
    def __init__(self,routes_filename):
        df_trips = pd.read_csv(routes_filename)
        df_trips["stop_point"] = df_trips["stop_point"].apply(lambda elem : wkt.loads(elem))
        self.routes_ = defaultdict(lambda :list()) # key: line, value: list of routes, on per direction.
        # each route has a single line and 1 or 2 direction.
        for attrs, trip_df in df_trips.groupby(["route_id","route_short_name","direction_id"]):
            (route_id,line,direction) = attrs
            self.routes_[line].append(Route(trip_df))
        print(list(self.routes_.keys()))
        
    def __getitem__(self, line):
        return self.routes_[line]
        

class BusDrive:
    def __init__(self,routes,pings_df):
        self.routes_ = routes
        self.line_ = pings_df.iloc[0]["lineId"]
        self.vehicle_ = pings_df.iloc[0]["vehicleId"]
        self.pings_df_ = pd.DataFrame(columns=pings_df.columns)
        self.route_ = UNASSIGNED
        self.route_score_ = UNASSIGNED
        self.last_ping_ = None
        self.addPings(pings_df)
        
    def __len__(self):
        return self.pings_df_.shape[0]
    
    def lastActivity(self):
        return self.last_ping_["timestamp"] 
    
    def addPings(self,pings_df):
        if not pings_df[(pings_df["lineId"]!=self.line_)|(pings_df["vehicleId"]!=self.vehicle_)].empty:
            raise Exception("""Bus drive {}, {} got wrong pings.
                            {}""".format(self.line_,self.vehicle_,pings_df[(pings_df["lineId"]!=self.line_)|(pings_df["vehicleId"]!=self.vehicle_)]))
        self.last_ping_ = pings_df.iloc[pings_df.shape[0]-1]
        self.pings_df_ = self.pings_df_.append(pings_df)
        self.identifyPath()
        
    def identifyPath(self):
        if self.pings_df_.shape[0] < MIN_NUM_OF_PINGS:
            return
        best_route_score = np.inf
        best_route = None
        print(self.line_,self.routes_[self.line_])
        for route in self.routes_[self.line_]:
            score = route.getDriveRouteScore(self.pings_df_["point"].values)
            if score < best_route_score:
                best_route_score = score
                best_route = route
        self.route_ = best_route
        self.route_score_ = best_route_score
        if best_route_score > MAX_PINGS_ERROR:
            self.route_ = UNASSIGNED
            
    def isDriveEnded(self,curr_time):
        if curr_time - self.lastActivity() > timedelta(seconds=MAX_TIME_BETWEEN_PINGS): # Didn't received a ping for a long time
            return True
        if self.route_ == UNASSIGNED and self.pings_df_.shape[0] > MAX_UNASSIGNED_PINGS:
            return True
        if self.route_ == UNASSIGNED:
            return False
        if self.route_.isFinalStop(self.last_ping_["point"]):
            return True
        return False
    
    def getRouteScore(self):
        return self.route_score_
    
    def isAssignedRoute(self):
        return self.route_ != UNASSIGNED
            

# TODO: - DONE
# Ending active drives (no pings, end trip, new vehicle drive)

# adding ping ping to the busDrive - 
# pings may not be from same drive, redundant pings (bus next stop is the final stop) - 
# being able to identify that the same vehicle started the same line in the other direction - 
# solution: with small enough chunks the ended drive will be close

# change names, get projected coordinates, adding project points!!


# TODO: - still need to do
# identified route, when getting new pings make sure it don't deviate. (ending active drives - bus deviation)

# getting stream pings with points also - adjust stream data format

# enable some errors in the direction decision making and adjust the score accordingly

class RTS:
    def __init__(self,routes_filename):
        # Loading routes to identify and monitor
        self.routes_ = Routes(routes_filename)
        self.active_drives_ = {}
        self.system_time_ = datetime(2017, 1, 1, 0, 0) # min system time
    
    def recieveDataStream(self,pings_df):
        self.system_time_ = max(self.system_time_,max(pings_df["timestamp"]))
        for attrs, drive_pings_df in pings_df.groupby(["lineId","vehicleId"]): # todo: sort by timestamp
            (line_id,vehicle_id) = attrs
            if (line_id,vehicle_id) in self.active_drives_.keys():
                self.active_drives_[(line_id,vehicle_id)].addPings(drive_pings_df)
            else:
                self.active_drives_[(line_id,vehicle_id)] = BusDrive(self.routes_,drive_pings_df)
        keys_to_remove = []
        for drive_key, drive in self.active_drives_.items():
            if drive.isDriveEnded(self.system_time_):
                keys_to_remove.append(drive_key)
        self.removeDrives(keys_to_remove)
        self.validateVehicleSingularity()
        self.printSystemState()
                
    def validateVehicleSingularity(self):
        vehicle_dic = defaultdict(lambda:[])
        for (line_id,vehicle_id) in self.active_drives_.keys():
            vehicle_dic[vehicle_id].append(line_id)
        for vehicle_id, line_list in vehicle_dic.items():
            if len(line_list) > 1:
                self.removeDuplicateVehiclesDrives(vehicle_id,line_list)
    
    def removeDuplicateVehiclesDrives(self,vehicle_id,line_list):
        keys_to_remove = []
        last_activity_time = -np.inf
        last_activity_key_drive = None
        for line_id in line_list:
            drive = self.active_drives_[(vehicle_id,line_id)]
            if len(drive) < MIN_NUM_OF_PINGS: # allow for small drives to stay for now 
                continue
            # keep only the most updated drive
            if last_activity_time < drive.lastActivity():
                keys_to_remove.append(last_activity_key_drive) if last_activity_key_drive!=None else None
                last_activity_time = drive.lastActivity()
                last_activity_key_drive = (vehicle_id,line_id)
        self.removeDrives(keys_to_remove)
    
    def removeDrives(self,drive_keys):
        for key in drive_keys:
            del self.active_drives_[key]
    
    def printSystemState(self):
        stats_dic = defaultdict(lambda:[])
        for attrs in sorted(self.active_drives_.keys()):
            drive = self.active_drives_[attrs]
            stats_dic["drive"].append(attrs)
            stats_dic["num_pings"].append(len(drive))
            stats_dic["route_score"].append(drive.getRouteScore())
            stats_dic["found_route"].append(drive.isAssignedRoute())
        print(pd.DataFrame.from_dict(stats_dic))
                

In [4]:
rts = RTS("..//Data_Preprocessing//Bus_Routes//clean_routes.csv")
for i, (time,group_df) in enumerate(stream.groupby(["timestamp"])): # TODO: sort group_df by timestamp
    print(i)
    print(group_df.shape)
    rts.recieveDataStream(group_df)
    print()
    if i == 10:
        break

['1', '11', '116', '118', '122', '123', '130', '14', '140', '142', '14c', '150', '155', '15a', '15b', '15d', '16', '16c', '16d', '25', '25a', '25b', '25d', '25x', '26', '27a', '27x', '29a', '31', '31a', '31b', '31d', '32', '32x', '33', '33b', '33d', '33e', '33x', '37', '38a', '38b', '38d', '39', '39x', '4', '40b', '40e', '41', '41b', '41c', '41d', '42', '42d', '43', '44', '44b', '46e', '49', '51d', '51x', '53', '53a', '54a', '56a', '61', '65b', '66', '66a', '66b', '66e', '67', '68a', '68x', '69', '69x', '7', '70', '70d', '747', '757', '77a', '77x', '79', '79a', '7a', '7b', '7d', '83', '83a', '84a', '9']
0
(7, 7)
          drive  num_pings  route_score  found_route
0  (150, 33493)          1           -1        False
1  (150, 33518)          1           -1        False
2   (25, 33352)          1           -1        False
3   (25, 33358)          1           -1        False
4   (25, 33364)          1           -1        False
5   (25, 33401)          1           -1        False
6   (25, 

TypeError: 'Point' object is not subscriptable

In [None]:
c = Counter()
for time,group_df in stream.groupby(["timestamp","lineId","vehicleId"]):
    c[str(group_df.shape[0])] += 1
print(len(c))
print(c.most_common(100))

In [34]:
df = pd.read_csv("..//Data//Samples//3days3lines.csv")
df = df.sort_values(["lineId","vehicleId","timestamp"])
df["driveId"] = df.apply(lambda row : str(row["lineId"])+"_"+str(row["vehicleId"]),axis=1)
df.to_csv("..//Drafts//3days3lines_bydrive.csv")

In [20]:
a = [1,2,3]
a[-1:]

[3]