# Data Cleaning and Preparation

## Raw JSON to CSV

Converts raw JSON files containing the vehiclePositions from STIB to a single CSV file
**Reads from**: raw JSON files in `data/raw` folder (`data/raw/vehiclePosition*.json`)
**Writes to**: Single CSV file containing all the vehicle positions in `data` folder (`data/processed/vehiclePositions.csv`)

In [115]:
import csv
import json

In [116]:
def convert_json_into_csv(csv_file: csv.writer, json_path: str) -> None:
    with open(json_path, 'r') as file:
        data = json.load(file)['data']
        for time in data:
            timestamp = time['time']
            for response in time['Responses']:
                if response is None:
                    continue
                for line in response['lines']:
                    line_id = line['lineId']
                    for vehiclePosition in line['vehiclePositions']:
                        csv_file.writerow([
                            timestamp,
                            line_id,
                            vehiclePosition['directionId'],
                            vehiclePosition['distanceFromPoint'],
                            vehiclePosition['pointId'],
                        ])

In [117]:
def convert_json_files_into_single_csv() -> None:
    files = [
        '../data/raw/vehicleTimestamp/vehiclePosition01.json',
        '../data/raw/vehicleTimestamp/vehiclePosition02.json',
        '../data/raw/vehicleTimestamp/vehiclePosition03.json',
        '../data/raw/vehicleTimestamp/vehiclePosition04.json',
        '../data/raw/vehicleTimestamp/vehiclePosition05.json',
        '../data/raw/vehicleTimestamp/vehiclePosition06.json',
        '../data/raw/vehicleTimestamp/vehiclePosition07.json',
        '../data/raw/vehicleTimestamp/vehiclePosition08.json',
        '../data/raw/vehicleTimestamp/vehiclePosition09.json',
        '../data/raw/vehicleTimestamp/vehiclePosition10.json',
        '../data/raw/vehicleTimestamp/vehiclePosition11.json',
        '../data/raw/vehicleTimestamp/vehiclePosition12.json',
        '../data/raw/vehicleTimestamp/vehiclePosition13.json'
    ]
    with write_csv('../data/processed/vehiclePositions.csv') as writer:
        writer.writerow(['Timestamp', 'LineId', 'DirectionId', 'DistanceFromPoint', 'PointId'])
        for path in tqdm(files):
            convert_json_into_csv(writer, path)

## Shapefile to routes

Converts raw Stops Shapefiles from STIB to a single CSV file containing line routes
**Reads from**:
 - Shapefiles in `data/raw/shapefiles` folder (`data/raw/shapefiles/ACTU_STOPS.*`)
 - `stops.txt` GTFS file in `data/raw/gtfs` folder

**Writes to**: Single CSV file containing all the line routes in `data` folder (`data/line_stops.csv`)

In [118]:
from typing import List, Dict
import shapefile

In [119]:
def group_shape_stops_by_line(stops: List[shapefile._Record]) -> Dict[str, List[shapefile._Record]]:
    lines = {}
    for stop in stops:
        line_id = f'{stop["Code_Ligne"]}{stop["Variante"]}'
        if line_id not in lines:
            lines[line_id] = []
        lines[line_id].append(stop)
    for stops in lines.values():
        stops.sort(key=lambda stop: stop['succession'])
    return lines

In [120]:
def create_line_stops_csv():
    stops = shapefile.Reader('../data/raw/shapefiles/ACTU_STOPS.shp')
    grouped_stops = group_shape_stops_by_line(stops.records())
    gtfs_stops_file = open('../data/raw/gtfs/stops.csv', 'r', encoding='utf8')
    gtfs_stops = {stop['stop_id']: stop for stop in csv.DictReader(gtfs_stops_file)}
    with open('../data/processed/line_stops.csv', 'w', newline='', encoding='utf8') as file:
        writer = csv.writer(file)
        writer.writerow(['lineId', 'direction', 'stop_id', 'stop_id_int', 'name', 'name_ascii', 'lat', 'long', 'order'])
        for line, line_stops in sorted(grouped_stops.items()):
            for stop in line_stops:
                stop_lat = gtfs_stops[stop['stop_id']]['stop_lat'] if stop['stop_id'] in gtfs_stops else None
                stop_lon = gtfs_stops[stop['stop_id']]['stop_lon'] if stop['stop_id'] in gtfs_stops else None
                stop_id = int(stop['stop_id'].strip('qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM'))
                writer.writerow([stop['Code_Ligne'], stop['Variante'], stop['stop_id'], stop_id, stop['alpha_fr'],
                                 stop['descr_fr'], stop_lat, stop_lon, stop['succession']])


create_line_stops_csv()

## Drop incomplete data from CSV

Cleans `vehiclePositions.csv` file created in previous section
**Reads from**: CSV file containing all the vehicle positions in `data` folder (`data/processed/vehiclePositions.csv`)
**Writes to**: CSV file containing filtered vehicle positions in `data` folder (`data/processed/vehiclePositionsClean.csv`)

In [121]:
from scripts.helpers import *
from functools import reduce
from operator import add

In [122]:
def drop_positions_with_unknown_stop_or_direction():
    total_count = {}
    deletion_count = {}
    stops = {f'{int(stop[0][:-1])}-{stop[3]}' for stop in read_csv_stream('../data/processed/line_stops.csv')}
    positions = read_csv_stream('../data/processed/vehiclePositions.csv', skip_first=False)
    with write_csv('../data/processed/vehiclePositionsClean.csv') as writer:
        writer.writerow(next(positions))
        for position in tqdm(positions):
            line_id = position[1]
            if line_id not in total_count:
                total_count[line_id] = 0
            if line_id not in deletion_count:
                deletion_count[line_id] = 0
            direction_id = position[2]
            stop_id = position[4]
            total_count[line_id] += 1
            if f'{line_id}-{direction_id}' not in stops or f'{line_id}-{stop_id}' not in stops:
                deletion_count[line_id] += 1
            else:
                writer.writerow(position)
    for line in sorted(total_count):
        deleted = deletion_count[line]
        total = total_count[line]
        print(f'\tLine {line}: {deleted} rows deleted out of {total} ({(deleted / total) * 100:.2f}%)')
    total_rows = reduce(add, total_count.values())
    total_deletions = reduce(add, deletion_count.values())
    print(f'\tTotal: {total_deletions} rows deleted out of {total_rows} ({(total_deletions / total_rows) * 100:.2f}%)')

## Add direction to CSV

Adds direction to `vehiclePositionsClean.csv` file created in previous section
**Reads from**: CSV file containing filtered vehicle positions in `data` folder (`data/processed/vehiclePositionsClean.csv`)
**Writes to**: CSV file containing filtered vehicle positions with direction in `data` folder (`data/processed/vehiclePositionsCleanDirected.csv`)

In [123]:
def get_direction_from_line_stop_and_destination(line: Tuple[List[List[str]], List[List[str]]], stop_id: str,
                                                 destination_id: str) -> int:
    # Simple case 1 -> destination_id is in one direction but not in the other:
    stops = ({stop[3] for stop in line[0]}, {stop[3] for stop in line[1]})
    if destination_id in stops[0] and destination_id not in stops[1]:
        return 0
    if destination_id not in stops[0] and destination_id in stops[1]:
        return 1

    # Simple case 2 -> destination_id is the last stop of a direction:
    if destination_id == line[0][-1][3]:
        return 0
    if destination_id == line[1][-1][3]:
        return 1

    # Simple case 3 -> stop_id is in one direction but not in the other:
    if stop_id in stops[0] and stop_id not in stops[1]:
        return 0
    if stop_id not in stops[0] and stop_id in stops[1]:
        return 1

    # Complex case 1 -> if stop_id != destination_id, return the direction in which the stop with
    # id destination_id is after the stop with id stop_id
    if stop_id != destination_id:
        index_of_destination_0 = next(int(stop[8]) for stop in line[0] if stop[3] == destination_id)
        index_of_stop_0 = next(int(stop[8]) for stop in line[0] if stop[3] == stop_id)
        return 0 if index_of_stop_0 < index_of_destination_0 else 1
    # Complex case 2 -> if stop_id == destination_id, return the direction in which the stop with
    # id destination_id is further down the direction
    else:
        index_of_destination_0 = next(int(stop[8]) for stop in line[0] if stop[3] == destination_id)
        index_of_destination_1 = next(int(stop[8]) for stop in line[1] if stop[3] == destination_id)
        return 0 if index_of_destination_0 > index_of_destination_1 else 1

In [124]:
def add_direction_to_csv():
    positions = read_csv_stream('../data/processed/vehiclePositionsClean.csv', skip_first=False)
    directed_positions, output_file = get_csv_writer('../data/processed/vehiclePositionsCleanDirected.csv')
    directed_positions.writerow([*next(positions), 'Direction'])
    grouped_lines = group_line_stops(read_csv_stream('../data/processed/line_stops.csv'))
    memory = {}
    for position in tqdm(positions):
        line_id = position[1]
        stop_id = position[4]
        destination_id = position[2]
        tuple_id = f'{line_id}-{stop_id}-{destination_id}'
        if tuple_id not in memory:
            memory[tuple_id] = get_direction_from_line_stop_and_destination(grouped_lines[line_id], stop_id,
                                                                            destination_id)
        directed_positions.writerow([*position, memory[tuple_id]])
    output_file.close()

## Split CSV into lines

Splits `vehiclePositionsCleanDirected.csv` file created in previous section into separate CSV (one per line)
**Reads from**: CSV file containing filtered vehicle positions with direction in `data` folder (`data/processed/vehiclePositionsCleanDirected.csv`)
**Writes to**: CSV file per line containing filtered vehicle positions with direction in `data/processed/assignment1/vehiclePositionsPerLine` folder (`data/processed/assignment1/vehiclePositionsPerLine/vehiclePositions*.csv`)

In [125]:
def split_csv_by_lines():
    files = {}
    positions = read_csv_stream('../data/processed/vehiclePositionsCleanDirected.csv', skip_first=False)
    header = next(positions)
    for line in tqdm(positions):
        line_id = line[1]
        if line_id not in files:
            files[line_id] = get_csv_writer(f'../data/processed/assignment1/vehiclePositionsPerLine/vehiclePositions{line_id}.csv')
            files[line_id][0].writerow(header)
        files[line_id][0].writerow(line)
    for _, file in files.values():
        file.close()

## Vehicle Matching

Tries to link several vehicle positions belonging to same physical vehicle
**Reads from**: CSV file per line containing filtered vehicle positions with direction in `data/processed/assignment1/vehiclePositionsPerLine` folder (`data/processed/assignment1/vehiclePositionsPerLine/vehiclePositions*.csv`)
**Writes to**: CSV file per line containing vehicle positions with `bus_id` in `data/processed/assignment1/csv_lines_linked` folder (`data/processed/assignment1/csv_lines_linked/vehiclePositions*.csv`)

In [126]:

from enum import Enum
from itertools import count

In [127]:
def split_positions_by_direction(positions: Iterable[List[str]]) -> Tuple[List[List[str]], List[List[str]]]:
    line = ([], [])
    for position in positions:
        line[int(position[-1])].append(position)
    return line

In [128]:
def get_index_of_stop_in_line(line, direction, stop_id):
    return get_index_of_stop_in_line_direction(line[direction], stop_id)


def get_index_of_stop_in_line_direction(line, stop_id):
    return next((int(stop[8]) for stop in line if stop[3] == stop_id), -1)


class Match(Enum):
    WRONG = 1
    OK = 2
    TOO_FAR = 3

In [142]:
def group_positions_by_timestamp(positions: Iterable[List[str]]) -> List[Tuple[int, List[List[str]]]]:
    grouped_positions = []
    old_timestamp = -1
    current_timestamp_positions = []
    for position in positions:
        current_timestamp = int(position[0])
        if current_timestamp != old_timestamp:
            assert current_timestamp > old_timestamp
            grouped_positions.append((old_timestamp, current_timestamp_positions))
            old_timestamp = current_timestamp
            current_timestamp_positions = []
        current_timestamp_positions.append(position)
    grouped_positions.append((old_timestamp, current_timestamp_positions))
    return grouped_positions[1:]

In [130]:
def possible_match(first_position: List[str], second_position: List[str],
                   line: List[List[str]]) -> Match:
    first_stop_id = first_position[4]
    second_stop_id = second_position[4]
    # Both positions are in the same stop -> Compare using distance from that stop
    if first_stop_id == second_stop_id:
        first_distance = int(first_position[3])
        second_distance = int(second_position[3])
        return Match.OK if first_distance <= second_distance else Match.WRONG
    # Positions are in different stops -> Compare using order of stops in direction
    else:
        # Assert both positions have the same direction
        first_stop_index = get_index_of_stop_in_line_direction(line, first_stop_id)
        second_stop_index = get_index_of_stop_in_line_direction(line, second_stop_id)
        if second_stop_index - first_stop_index > 3:
            return Match.TOO_FAR
        return Match.OK if first_stop_index < second_stop_index else Match.WRONG

In [131]:
def find_bus_matched_of_line_direction(positions: Iterable[List[str]], line: List[List[str]], line_id: str,
                                       direction: int, writer):
    grouped_positions = group_positions_by_timestamp(positions)
    previous_positions = []
    bus_id = (f'{line_id}-{direction}-{i:06d}' for i in count())
    sorting_key = lambda vehicle_position: get_index_of_stop_in_line_direction(line, vehicle_position[-2])

    for timestamp, current_positions in grouped_positions:
        sorted_positions = sorted(current_positions, key=sorting_key)
        while len(sorted_positions) > 0 and get_index_of_stop_in_line_direction(line, sorted_positions[0][-2]) == -1:
            sorted_positions.pop(0)
        current_previous_position_index = 0
        current_position_index = 0
        while current_position_index < len(sorted_positions) and current_previous_position_index < len(
                previous_positions):
            previous_position = previous_positions[current_previous_position_index]
            current_position = sorted_positions[current_position_index]
            result = possible_match(previous_position, current_position, line)
            if result == Match.OK:
                current_position.append(previous_position[-1])
                current_previous_position_index += 1
                current_position_index += 1
            elif result == Match.WRONG:
                current_position.append(next(bus_id))
                current_position_index += 1
            elif result == Match.TOO_FAR:
                current_previous_position_index += 1
        for position in sorted_positions:
            if len(position) == 6:
                position.append(next(bus_id))
            writer.writerow(position)
        previous_positions = sorted_positions

In [132]:
def find_bus_matches_of_line(file_path: str, output_path: str, line_id: str,
                             line: Tuple[List[List[str]], List[List[str]]]) -> None:
    positions = read_csv_stream(file_path, skip_first=False)
    with write_csv(output_path) as linked_positions:
        linked_positions.writerow([*next(positions), 'BusId'])
        direction1, direction2 = split_positions_by_direction(positions)
        find_bus_matched_of_line_direction(direction1, line[0], line_id, 0, linked_positions)
        find_bus_matched_of_line_direction(direction2, line[1], line_id, 1, linked_positions)


In [143]:
def find_bus_matches_of_lines():
    path = '../data/processed/assignment1/vehiclePositionsPerLine'
    output_path = '../data/processed/assignment1/csv_lines_linked'
    lines = group_line_stops(read_csv_stream('../data/processed/line_stops.csv'))
    for file in tqdm(os.listdir(path)):
        line_id = file[16:-4]
        line = lines[line_id]
        find_bus_matches_of_line(f'{path}/{file}', f'{output_path}/{file}', line_id, line)

find_bus_matches_of_lines()

  0%|          | 0/74 [00:00<?, ?it/s]


AssertionError: 

# Calculate average time between stops

In [134]:
from scripts.helpers import *
import datetime
from tqdm import tqdm
import os

In [135]:
def group_positions_by_vehicle(positions: List[List[str]]) -> Dict[str, List[List[List[str]]]]:
    grouped_positions = {}
    for position in positions:
        bus_id = position[-1]
        if bus_id not in grouped_positions:
            grouped_positions[bus_id] = []
        grouped_positions[bus_id].append(position)
    return grouped_positions

In [136]:
def get_hour_from_timestamp(timestamp: int) -> int:
    return datetime.datetime.fromtimestamp(timestamp // 1000).hour

In [137]:
def get_vehicle_times_between_stops(vehicle_positions: List[List[str]],
                                    line: Tuple[List[List[str]], List[List[str]]]) -> List:
    times = []
    previous_timestamp = int(vehicle_positions[0][0])
    previous_stop = vehicle_positions[0][4]
    for position in vehicle_positions:
        current_timestamp = int(position[0])
        current_stop = position[4]
        if current_stop != previous_stop:
            time_difference = (current_timestamp - previous_timestamp) // 1000
            first_hour = get_hour_from_timestamp(previous_timestamp)
            last_hour = get_hour_from_timestamp(current_timestamp)
            times.append([first_hour, previous_stop, current_stop, time_difference])
            if first_hour != last_hour:
                times.append([last_hour, previous_stop, current_stop, time_difference])
            previous_stop = current_stop
            previous_timestamp = current_timestamp
    return times


In [138]:
def calculate_average_time_between_stops_of_line(positions: List[List[str]], line_id: str,
                                                 line: Tuple[List[List[str]], List[List[str]]], output):
    grouped_positions = group_positions_by_vehicle(positions)
    times = []
    for vehicle_id, vehicle_positions in grouped_positions.items():
        times += get_vehicle_times_between_stops(vehicle_positions, line)
    for direction in [0, 1]:
        for fromStop, toStop in zip(line[direction][:-1], line[direction][1:]):
            time_sum = [0 for _ in range(24)]
            time_sum_filtered = [0 for _ in range(24)]
            time_count = [0 for _ in range(24)]
            time_count_filtered = [0 for _ in range(24)]
            for time in times:
                if time[1] == fromStop[3] and time[2] == toStop[3]:
                    time_sum[time[0]] += time[3]
                    time_count[time[0]] += 1
                    if time[3] < 6000:  # Times larger than 10 minutes are likely anomalies that shouldn't be counter
                        time_sum_filtered[time[0]] += time[3]
                        time_count_filtered[time[0]] += 1
            output[0].writerow(
                [line_id, fromStop[3], toStop[3],
                 *[f'{total / amount:.2f}' if amount > 0 else 0 for total, amount in zip(time_sum, time_count)]])
            output[1].writerow([line_id, fromStop[3], toStop[3], *time_count])
            output[2].writerow(
                [line_id, fromStop[3], toStop[3], *[f'{total / amount:.2f}' if amount > 0 else 0 for total, amount in
                                                    zip(time_sum_filtered, time_count_filtered)]])
            output[3].writerow([line_id, fromStop[3], toStop[3], *time_count_filtered])



In [139]:
def calculate_average_time_between_stops():
    source_path = '../data/processed/assignment1/csv_lines_linked'
    output_path = '../data/processed/average_time_between_stops.csv'
    output_path_filtered = '../data/processed/average_time_between_stops_filtered.csv'
    output_path_count = '../data/processed/average_time_between_stops_count.csv'
    output_path_filtered_count = '../data/processed/average_time_between_stops_filtered_count.csv'
    lines = group_line_stops(read_csv_stream('../data/processed/line_stops.csv'))
    with write_csv(output_path) as output, write_csv(output_path_filtered) as output_filtered, write_csv(
            output_path_count) as output_count, write_csv(output_path_filtered_count) as output_filtered_count:
        header = ['LineId', 'FromStop', 'ToStop', *[f'{i}' for i in range(24)]]
        output.writerow(header)
        output_filtered.writerow(header)
        output_count.writerow(header)
        output_filtered_count.writerow(header)
        for file in tqdm(os.listdir(source_path)):
            line_id = file[16:-4]
            line = lines[line_id]
            positions = read_csv_list(f'{source_path}/{file}')[1:]
            calculate_average_time_between_stops_of_line(positions, line_id, line,
                                                         (output, output_count, output_filtered, output_filtered_count))


calculate_average_time_between_stops()

0it [00:00, ?it/s]


# Main script

Runs all previous functions in succession

In [140]:
# TODO: @Victor recompose whole_process to work with fix_data if statement.

def whole_process(fix_data=False):
    # Convert JSON files to csv with vehicle positions
    print('Converting JSON files to CSV')
    convert_json_files_into_single_csv()

    # Create vehicle routes from shapefiles
    print('Creating vehicle routes from shapefiles')
    create_line_stops_csv()

    # Fix/drop vehicle positions directions
    if fix_data:
        print('Fixing vehicle positions with unknown data')
        # TODO: finish fix code
        pass
    else:
        print('Dropping vehicle positions with unknown data')
        drop_positions_with_unknown_stop_or_direction()

    print('Adding direction to positions')
    add_direction_to_csv()

    # Split positions by line
    print('Creating CSVs for each line')
    split_csv_by_lines()

    # Match positions by vehicle
    print('Matching positions')
    find_bus_matches_of_lines()

    # Calculate average time between stops per hour/line
    print('Calculating average time between stops')
    calculate_average_time_between_stops()


whole_process()

Converting JSON files to CSV


100%|██████████| 13/13 [00:40<00:00,  3.11s/it]


Creating vehicle routes from shapefiles
Dropping vehicle positions with unknown data


19421883it [00:36, 533599.91it/s]


	Line 1: 63100 rows deleted out of 305454 (20.66%)
	Line 12: 2091 rows deleted out of 238655 (0.88%)
	Line 13: 12387 rows deleted out of 231422 (5.35%)
	Line 14: 26554 rows deleted out of 272554 (9.74%)
	Line 17: 393 rows deleted out of 93979 (0.42%)
	Line 19: 9177 rows deleted out of 374100 (2.45%)
	Line 2: 4129 rows deleted out of 215114 (1.92%)
	Line 20: 12947 rows deleted out of 240316 (5.39%)
	Line 21: 5420 rows deleted out of 196146 (2.76%)
	Line 25: 349123 rows deleted out of 387235 (90.16%)
	Line 27: 34005 rows deleted out of 222168 (15.31%)
	Line 28: 13419 rows deleted out of 188596 (7.12%)
	Line 29: 37447 rows deleted out of 352595 (10.62%)
	Line 3: 162263 rows deleted out of 383543 (42.31%)
	Line 33: 0 rows deleted out of 69363 (0.00%)
	Line 34: 77757 rows deleted out of 271519 (28.64%)
	Line 36: 8227 rows deleted out of 243214 (3.38%)
	Line 37: 33673 rows deleted out of 163857 (20.55%)
	Line 38: 8697 rows deleted out of 346695 (2.51%)
	Line 39: 8136 rows deleted out of 2112

16237429it [00:31, 520080.83it/s]


Creating CSVs for each line


16237429it [00:26, 604895.09it/s]


Matching positions


  0%|          | 0/74 [00:00<?, ?it/s]


AssertionError: 