In [32]:
from google.colab import drive
import os
import sys

drive.mount('/content/drive')

shared_folder = '/content/drive/MyDrive/Ariya Narayanasamy/Data'
os.chdir(shared_folder)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [33]:
import csv
import pandas as pd
import numpy as np
import pickle

In [34]:
!pip install pykakasi



In [35]:
import pykakasi
import string
from datetime import datetime
import multiprocessing

In [36]:
def to_romaji(text, preprocess = "都道府県市町村区"):
    # Text preprocessing
    if preprocess:
        text = ''.join([c for c in text if c not in preprocess])

    to_replace = [('Kakuekiteisha', 'Local'), ('Kaisoku', 'Rapid'), ('Tokyou', 'Tokyo'), ('Taishi', 'Daishi'),
    ('Keiyou (1)', 'Keiyo'), ('Marunouchi (1)', 'Marunouchi'), ('Tokaidou', 'Tokaido'), ('Yuurakucho', 'Yurakucho')]

    kks = pykakasi.kakasi()
    result = kks.convert(text)

    romaji = ' '.join([el['passport'] for el in result]).title().strip()
    for replace_stuff in to_replace:
        romaji = romaji.replace(*replace_stuff)

    if romaji.split()[-1] == 'Hon': # '本線' check
        romaji = ''.join(romaji.split()[:-1]) + ' Main'

    romaji = ''.join([c for c in romaji if c in string.ascii_lowercase + string.ascii_uppercase + ' '])

    return romaji.strip()

In [37]:
train_companies = {
    "東日本旅客鉄道": "JR East",
    "東京地下鉄": "Tokyo Metro",
    "東武鉄道": "Tobu Railway",
    "西武鉄道": "Seibu Railway",
    "京成電鉄": "Keisei Electric Railway",
    "京浜急行電鉄": "Keikyu Corporation",
    # "東京臨海高速鉄道": "Tokyo Waterfront Area Rapid Transit",
    # "東京モノレール": "Tokyo Monorail",
    "小田急電鉄": "Odakyu Electric Railway",
    "相鉄": "Sagami Railway",
    "東急電鉄": "Tokyu Corporation",
    "京王電鉄": "Keio Corporation",
    "東京都交通局": "Toei Subway"
}

In [38]:
def translate_station(station):
    if station in train_companies.keys():
        return train_companies[station]

In [39]:
trip_cols = ['entry',
    'enter_company', 'enter_train_name', 'enter_station_name', 'enter_pref', 'enter_ward', 'enter_time',
    'exit_company', 'exit_train_name', 'exit_station_name', 'exit_pref', 'exit_ward', # Exit time is not a column header
    'time_taken', 'num_people'
]

In [40]:
trip_data = pd.DataFrame(columns=trip_cols)

In [41]:
tokyo_metro_trip_data = pd.DataFrame(columns=trip_cols)

In [None]:
process = 0

In [42]:
def process_chunk(chunk):
    global process

    aggregate_trip_data = pd.DataFrame(columns=trip_cols)
    aggregate_tokyo_metro_trip_data = pd.DataFrame(columns=trip_cols)

    print('Chunk process starting')

    for entry, row in chunk.iterrows():
        print(entry)

        if (
            row['【入場】圏域'] == '1.首都圏' and row['【出場】圏域'] == '1.首都圏' and
            translate_station(row['【入場】事業者名']) and translate_station(row['【出場】事業者名'])
        ):
            row_df = pd.DataFrame({
                'entry'               : [entry],
                'enter_train_name'    : [to_romaji(row['【入場】路線名'], preprocess="線")],
                'enter_company'       : [translate_station(row['【入場】事業者名'])],
                'enter_station_name'  : [to_romaji(row['【入場】駅名'], preprocess=False)],
                'enter_pref'          : [to_romaji(row['【入場】都道府県'])],
                'enter_ward'          : [to_romaji(row['【入場】市町村区'])],
                'enter_time'          : [row['【入場】時間帯']],
                'exit_company'        : [translate_station(row['【出場】事業者名'])],
                'exit_train_name'     : [to_romaji(row['【出場】路線名'], preprocess="線")],
                'exit_station_name'   : [to_romaji(row['【出場】駅名'], preprocess=False)],
                'exit_pref'           : [to_romaji(row['【出場】都道府県'])],
                'exit_ward'           : [to_romaji(row['【出場】市町村区'])],
                'time_taken'          : [row['所要時間（５分単位）']],
                'num_people'          : [row['人数']],
            })
            aggregate_trip_data = pd.concat([aggregate_trip_data, row_df], ignore_index=True)

            pickle_file = 'Pickle Saves/aggregate_dynamic_chunk.pkl'

            if os.path.exists(pickle_file):
                existing_data = pd.read_pickle(pickle_file)
            else:
                existing_data = pd.DataFrame()

            combined_data = pd.concat([existing_data, row_df], axis=0)
            combined_data.to_pickle(pickle_file)

            if (
                translate_station(row['【入場】事業者名']) == 'Tokyo Metro' and
                translate_station(row['【出場】事業者名']) == 'Tokyo Metro'
            ):
                row_df2 = pd.DataFrame({
                    'entry'               : [entry],
                    'enter_train_name'    : [to_romaji(row['【入場】路線名'], preprocess="線")],
                    'enter_company'       : [translate_station(row['【入場】事業者名'])],
                    'enter_station_name'  : [to_romaji(row['【入場】駅名'], preprocess=False)],
                    'enter_pref'          : [to_romaji(row['【入場】都道府県'])],
                    'enter_ward'          : [to_romaji(row['【入場】市町村区'])],
                    'enter_time'          : [row['【入場】時間帯']],
                    'exit_company'        : [translate_station(row['【出場】事業者名'])],
                    'exit_train_name'     : [to_romaji(row['【出場】路線名'], preprocess="線")],
                    'exit_station_name'   : [to_romaji(row['【出場】駅名'], preprocess=False)],
                    'exit_pref'           : [to_romaji(row['【出場】都道府県'])],
                    'exit_ward'           : [to_romaji(row['【出場】市町村区'])],
                    'time_taken'          : [row['所要時間（５分単位）']],
                    'num_people'          : [row['人数']],
                })
                aggregate_tokyo_metro_trip_data = pd.concat([aggregate_tokyo_metro_trip_data, row_df2], ignore_index=True)

                pickle_file2 = 'Pickle Saves/tm_dynamic_chunk.pkl'

                if os.path.exists(pickle_file2):
                    existing_data = pd.read_pickle(pickle_file2)
                else:
                    existing_data = pd.DataFrame()

                combined_data = pd.concat([existing_data, row_df2], axis=0)
                combined_data.to_pickle(pickle_file2)

        process += 1
        print(process)

    aggregate_trip_data.to_pickle(f'Pickle Saves/final_chunk_{index}.pkl')
    aggregate_tokyo_metro_trip_data.to_pickle(f'Pickle Saves/tm_final_chunk_{index}.pkl')

    print(aggregate_trip_data.head())
    print('Succesfully saved as pickle')

    return aggregate_trip_data, aggregate_tokyo_metro_trip_data

In [None]:
for subdir in [f'Tokyo Metro OD Data/Stage {i + 1}' for i in range(1)]: # Only read stage 1 data
    filepath = os.path.join(shared_folder, subdir)
    csv_files = sorted([f for f in os.listdir(filepath) if f.endswith('.csv')])

    for csv_file in csv_files:
        csv_file = os.path.join(filepath, csv_file)

        print(f'Processing {csv_file}')

        num_cores = multiprocessing.cpu_count()
        chunk_size = 20000

        chunks = pd.read_csv(csv_file, chunksize=chunk_size)
        # chunks_with_index = [(index, chunk) for index, chunk in enumerate(chunks)]

        pool = multiprocessing.Pool(processes=num_cores)
        try:
            chunk_trip_data, chunk_tm_trip_data = pool.map(process_chunk, chunks)

            pool.close()
            pool.join()

            trip_data = pd.concat([trip_data, chunk_trip_data], ignore_index=True)
            tokyo_metro_trip_data = pd.concat([tokyo_metro_trip_data, chunk_tm_trip_data], ignore_index=True)

        except Exception as e:
            print(repr(e))

            pool.close()

            current_time = datetime.now()
            tnow = current_time.strftime('%Y-%m-%d %H:%M:%S')

            trip_data.to_pickle(f'Pickle Saves/Error {tnow}.pkl')
            tokyo_metro_trip_data.to_pickle(f'Pickle Saves/TM Error {tnow}.pkl')

        break # Only read one file for now

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
1010532
1010533
1010534
1010535
1010536
1010537
1010538
1010539
1010540
1010541
1010542
1010543
1010544
1010545
1010546
1010547
1010548
1010549
1010550
1010551
1010552
1010553
1010554
1010555
1010556
1010557
1010558
1010559
1010560
1010561
1010562
1010563
1010564
1010565
1010566
1010567
1010568
1010569
1010570
1010571
1010572
1010573
1010574
1010575
1010576
1010577
1010578
1010579
1010580
1010581
1010582
1010583
1010584
1010585
1010586
1010587
1010588
1010589
1010590
1010591
1010592
1010593
1010594
1010595
1010596
1010597
1010598
1010599
1010600
1010601
1010602
1010603
1010604
1010605
1010606
1010607
1010608
1010609
1010610
1010611
1010612
1010613
1010614
1010615
1010616
1010617
1010618
1010619
1010620
1010621
1010622
1010623
1010624
1010625
1010626
1010627
1010628
1010629
1010630
1010631
1010632
1010633
1010634
1010635
1010636
1010637
1010638
1010639
1010640
1010641
1010642
1010643
1010644
1010645
1010646
1010647
1010648

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



EOFError('Ran out of input')
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "<ipython-input-42-340290aca9df>", line 37, in process_chunk
    existing_data = pd.read_pickle(pickle_file)
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/pickle.py", line 208, in read_pickle
    return pickle.load(handles.handle)
EOFError: Ran out of input
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<ipython-input-43-703d9e947adc>", line 18, in <cell line: 1>
    chunk_trip_data, chunk_tm_trip_data = pool.map(process_chunk, chunks_with_index)
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).g

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "<ipython-input-42-340290aca9df>", line 37, in process_chunk
    existing_data = pd.read_pickle(pickle_file)
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/pickle.py", line 208, in read_pickle
    return pickle.load(handles.handle)
EOFError: Ran out of input
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<ipython-input-43-703d9e947adc>", line 18, in <cell line: 1>
    chunk_trip_data, chunk_tm_trip_data = pool.map(process_chunk, chunks_with_index)
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3

In [None]:
trip_data.to_pickle(f'Pickle Saves/FINAL TRIP DATA.pkl')
tokyo_metro_trip_data.to_pickle(f'Pickle Saves/TM FINAL TRIP DATA.pkl')

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-43-7943119cebd3>", line 1, in <cell line: 1>
    trip_data.to_pickle(f'Pickle Saves/FINAL TRIP DATA.pkl')
  File "/usr/local/lib/python3.10/dist-packages/pandas/core/generic.py", line 3064, in to_pickle
    to_pickle(
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/pickle.py", line 97, in to_pickle
    with get_handle(
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/common.py", line 734, in get_handle
    check_parent_directory(str(handle))
  File "/usr/local/lib/python3.10/dist-packages/pandas/io/common.py", line 596, in check_parent_directory
    if not parent.is_dir():
  File "/usr/lib/python3.10/pathlib.py", line 1305, in is_dir
    return S_ISDIR(self.stat().st_mode)
  File "/usr/lib/python3.10/pathlib.py", line 1097, in stat
    return self._acce