In [8]:
# !pip install gpxpy

In [9]:
import re, os, sys, gpxpy, folium
from datetime import datetime, timedelta
import pandas as pd
import numpy as np

dir_path = './'
# in_dir_name = 'rawdata'
# out_dir_name = 'result'
in_dir_name = 'in_files'
out_dir_name = 'out_files'
time_tolerance = 1000000

In [10]:
def init_dir():
    dir_list = os.listdir(dir_path)
    is_init = False
    if in_dir_name not in dir_list:
        print('作業ディレクトリが存在しません')
        os.mkdir(dir_path + in_dir_name)
        print('make: /' + in_dir_name)
        is_init = True
    if out_dir_name not in dir_list:
        if not is_init:
            print('作業ディレクトリが存在しません')
        os.mkdir(dir_path + out_dir_name)
        print('make: /' + out_dir_name)
        is_init = True
    if is_init:
        print('初期化が完了しました')
        sys.exit(1)

In [11]:
def get_valid_num(prompt='', get_int=True, max=100000):
    while True:
        if prompt != '':
            print(prompt)
        num = input('>')
        try:
            num = float(num)
            if num > max - 1 or num < 0:
                print(f'0以上{max}未満の数を入力してください')
                continue
            else:
                break
        except ValueError:
            print('数字を入力してください')
            continue
    if get_int:
        return int(num)
    else:
        return num

In [12]:
# author: https://aotoshiro.jpn.org/2021/2376
def calc_dist(rlat_a, rlat_b, rlon_a, rlon_b):
    A = 6378137.000  # WGS84測地系の楕円体の長半径a
    B = 6356752.314  # WGS84測地系の楕円体の短半径b
    E = np.sqrt((A ** 2 - B ** 2) / A ** 2)  # 離心率
    Dy = rlat_a - rlat_b  # 2点の緯度(latitude)の差 [rad]
    Dx = rlon_a - rlon_b  # 2点の経度(longitude)の差 [rad]
    P = (rlat_a + rlat_b) / 2  # 2点の緯度(latitude)の平均 [rad]
    W = np.sqrt(1 - E ** 2 * np.sin(P) ** 2)
    M = A * (1 - E ** 2) / W ** 3  # 子午線曲率半径
    N = A / W  # 卯酉線曲線半径
    D = np.sqrt((Dy * M) ** 2 + (Dx * N * np.cos(P)) ** 2)
    return D  # 2点間の距離 [m]

In [13]:
def parse_ct(file_path):
    column = ['date', 'time_str', 'time1', 'time2', 'time_passed', 'event', 'elev_spec']
    data_list = []

    prev_datetime = None

    with open(file_path, 'r') as ct_file:
        date_obj = None
        pattern_date_title = r'(\d{4}/\d{1,2}/\d{1,2}.*)'
        pattern_date = r'(\d{4}/\d{1,2}/\d{1,2})'
        pattern_event = r'(\d{1,2}:\d{2},.*)'
        pattern_event_rest = r'(\d{1,2}:\d{2}~\d{1,2}:\d{2},\w+)'

        time_passed = 0

        for line in ct_file:
            parts = line.split(',')
            if len(parts) == 3:
                time_str, event, elev = parts
                elev = elev.strip()
            elif len(parts) == 2:
                time_str, event = parts
                event = event.strip()
                elev = -1019

            if re.match(pattern_date_title, line):
                date_str = re.match(pattern_date, line)[0]
                date_obj = datetime.strptime(date_str, '%Y/%m/%d')

            elif re.match(pattern_event, line):
                time_obj = datetime.strptime(time_str, '%H:%M')
                date_obj = datetime(date_obj.year, date_obj.month, date_obj.day, time_obj.hour, time_obj.minute)
                if prev_datetime is not None:
                    time_passed = (date_obj - prev_datetime).total_seconds() / 60
                data_list.append([date_obj, time_str, time_str, None, time_passed, event, elev])
                prev_datetime = date_obj

            elif re.match(pattern_event_rest, line):
                time1, time2 = time_str.split('~')
                time_obj = datetime.strptime(time1, '%H:%M')
                date_obj = datetime(date_obj.year, date_obj.month, date_obj.day, time_obj.hour, time_obj.minute)
                time2_obj = datetime.strptime(time2, '%H:%M')
                if prev_datetime is not None:
                    time_passed = (date_obj - prev_datetime).total_seconds() / 60
                data_list.append([date_obj, time_str, time1, time2, time_passed, event, elev])
                prev_datetime = datetime(date_obj.year, date_obj.month, date_obj.day, time2_obj.hour, time2_obj.minute)

    time_event_df = pd.DataFrame(data_list, columns=column)

    return time_event_df


def correct_ct(time_event_df, new_dir_name):
    crt_ct_file_path = f'{dir_path}{out_dir_name}/{new_dir_name}/courseTime_{time_event_df.loc[0, "date"].date()}-{time_event_df.loc[len(time_event_df) - 1, "date"].date()}.txt'
    with open(crt_ct_file_path, 'w', encoding='shift-jis') as ccf:  # ccf: correct_courseTime_file
        prev_datetime = datetime(1910, 3, 17, 0, 0)
        for index, row in time_event_df.iterrows():
            if prev_datetime.date() != row['date'].date():
                ccf.write(f'\n{row["date"].strftime("%Y/%m/%d")}コースタイム\n')
            line_str = f'{row["time_str"]},{row["event"].strip()},{round(int(row["elev_calc"]), -1)}'
            if row['elev_spec'] != -1019:
                line_str = f'{line_str}({row["elev_spec"]})'
            ccf.write(line_str + '\n')
            prev_datetime = row['date']


def analyze_ct(time_event_df, new_dir_name):
    analytics_file_path = f'{dir_path}{out_dir_name}/{new_dir_name}/courseTimeAnalytics_{time_event_df.loc[0, "date"].date()}-{time_event_df.loc[len(time_event_df) - 1, "date"].date()}.csv'
    to_csv_df = time_event_df[
        ['date', 'time_str', 'time_passed', 'event', 'elev_spec', 'elev_calc', 'lat', 'lon', 'dist']]
    to_csv_df.to_csv(analytics_file_path, index=True, encoding='utf-8')
    return 0


class GPXDataAnalyzer:
    def __init__(self, gpx_file_path):
        self.gpx_file_path = gpx_file_path
        self.gpx_data = self.load_gpx_data()

    def load_gpx_data(self):
        with open(self.gpx_file_path, 'r', encoding='utf-8') as gpx_file:
            gpx = gpxpy.parse(gpx_file)
            return gpx

    # def linear_interpolation(self, point1, point2, target_time):
    #     time1 = point1.time.timestamp()
    #     time2 = point2.time.timestamp()
    #     value1 = point1.elevation  # 例: 高度データ
    #     value2 = point2.elevation  # 例: 高度データ
    #
    #     interpolated_value = value1 + (value2 - value1) * ((target_time - time1) / (time2 - time1))
    #     return interpolated_value

    def search_gpx_data_by_time(self, target_time_jst, time_tolerance):
        closest_point = None
        closest_time_diff = None

        jst_offset = timedelta(hours=9)  # JSTとUTCの固定オフセット
        target_time_utc = target_time_jst - jst_offset  # JSTからUTCに変換
        for track in self.gpx_data.tracks:
            for segment in track.segments:
                for point in segment.points:
                    if point.time:
                        time_diff = abs((point.time.replace(tzinfo=None) - target_time_utc).total_seconds())
                        if closest_point is None or time_diff < closest_time_diff:
                            closest_point = point
                            closest_time_diff = time_diff

        if closest_point and closest_time_diff <= time_tolerance:
            return closest_point
        else:
            return None


def plot_ct():
    print('=' * 50)
    print('*** コースタイム・軌跡分析 ***')

    in_datas = os.listdir(dir_path + '/' + in_dir_name)
    gpx_datas = [file for file in in_datas if file.endswith(".gpx")]
    ct_datas = [file for file in in_datas if file.endswith(".txt")]
    if len(gpx_datas) == 0:
        print(in_dir_name + 'に .gpx ファイルが存在しません')
        sys.exit(3)
    if len(ct_datas) == 0:
        print(in_dir_name + 'に .txt ファイルが存在しません')
        sys.exit(3)

    print('使用するGPXデータファイルを選択')
    for i in range(len(gpx_datas)):
        print(f'{i}) {gpx_datas[i]}')
    gpx_filename = gpx_datas[get_valid_num(max=len(gpx_datas))]
    gpx_file_path = dir_path + in_dir_name + '/' + gpx_filename
    print('-' * 50)
    print('使用するコースタイムファイルを選択')
    for i in range(len(ct_datas)):
        print(f'{i}) {ct_datas[i]}')
    ct_filename = ct_datas[get_valid_num(max=len(ct_datas))]
    ct_file_path = dir_path + in_dir_name + '/' + ct_filename
    print('-' * 50)

    time_event_df = parse_ct(ct_file_path)
    print(f'{ct_file_path} は適切に読み込まれました')
    analyzer = GPXDataAnalyzer(gpx_file_path)
    print(f'{gpx_file_path} は適切に読み込まれました')
    print('-' * 50)

    lat_list = []
    lon_list = []
    elev_list = []
    for event in time_event_df['date']:
        closest_point = analyzer.search_gpx_data_by_time(event, time_tolerance)
        lat_list.append(closest_point.latitude)
        lon_list.append(closest_point.longitude)
        elev_list.append(closest_point.elevation)
    time_event_df['elev_calc'] = elev_list
    time_event_df['lat'] = lat_list
    time_event_df['lon'] = lon_list

    dist = []
    rlat_a, rlon_a = 0, 0
    for index, row in time_event_df.iterrows():
        if index == 0:
            dist.append(0)
            rlat_a = np.radians(row['lat'])
            rlon_a = np.radians(row['lon'])
        else:
            dist.append(calc_dist(rlat_a, np.radians(row['lat']), rlon_a, np.radians(row['lon'])))
            rlat_a = np.radians(row['lat'])
            rlon_a = np.radians(row['lon'])
    time_event_df['dist'] = dist

    # pd.set_option('display.max_rows', None)
    # pd.set_option('display.max_columns', None)
    # print(time_event_df)

    map_center = [time_event_df.loc[0, 'lat'], time_event_df.loc[0, 'lon']]
    gpx_map = folium.Map(tiles='http://cyberjapandata.gsi.go.jp/xyz/std/{z}/{x}/{y}.png', attr='© GSI Japan',
                         location=map_center, zoom_start=15)

    for track in analyzer.gpx_data.tracks:
        for segment in track.segments:
            folium.PolyLine(
                locations=[(point.latitude, point.longitude) for point in segment.points],
                color='blue'
            ).add_to(gpx_map)

    for index, row in time_event_df.iterrows():
        marker = folium.Marker(
            location=[row['lat'], row['lon']],
            tooltip=f'{row["date"].strftime("%m/%d")} {row["time_str"]} {row["event"].strip()}: {round(int(row["elev_calc"]), -1)}m',
            icon=folium.Icon(color='blue')
        )
        marker.add_to(gpx_map)

    new_dir_name = f'{time_event_df.loc[0, "date"].date()}-{time_event_df.loc[len(time_event_df) - 1, "date"].date()}'
    if new_dir_name not in os.listdir(dir_path + out_dir_name):
        os.mkdir(f'{dir_path}{out_dir_name}/{new_dir_name}')

    map_file_path = f'{dir_path}{out_dir_name}/{new_dir_name}/courseTimeMap_{time_event_df.loc[0, "date"].date()}-{time_event_df.loc[len(time_event_df) - 1, "date"].date()}.html'
    gpx_map.save(map_file_path)
    correct_ct(time_event_df, new_dir_name)
    analyze_ct(time_event_df, new_dir_name)

    print(f'{dir_path}{out_dir_name}/{new_dir_name} に結果を保存しました\n')

In [14]:
try:
    init_dir()

    plot_ct()

    print('正常に終了しました')
    print('=' * 50)
except SystemExit:
    print('\nプログラムを終了します')

    print('=' * 50)

作業ディレクトリが存在しません
make: /rawdata
make: /result
初期化が完了しました

プログラムを終了します
