In [1]:
import numpy as np
import pandas as pd
import os
import os
import threading
import psutil
import warnings
warnings.filterwarnings('ignore')

In [2]:
def set_df(df):
    df.drop(df[df['Operating_Area']=='9999999999'].index,inplace=True)
    df = df.dropna(axis=0)
    df.axis=0

    #지운 행렬이 존재하므로 인덱스 행 재정렬
    df=df.reset_index(drop=True)


    #정상적인 좌표값 계산을 위해 현재 .(점) 없이 기재되어있는 좌표 숫자를 정상적으로 인식할 수 있게 .을 붙여줍니다.
    df['LAT'] = df['LAT'].astype(float).div(1000000).round(6)
    df['LNG'] = df['LNG'].astype(float).div(1000000).round(6)

    return df

In [3]:
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix


def get_orderby_car_number(df, route, threshold=0.00030):
    """
    벡터화 방식으로 각 버스 위치가 인접한 정류장을 찾는 함수
    df     : 버스 데이터프레임 (LNG, LAT 열 포함)
    route  : 정류장 정보가 담긴 데이터프레임 (LNG, LAT 열 포함)
    threshold : 인접 판단 임계값 (0.00030 이내면 도착으로 간주)
    """

    # 정류장과 버스 위치 좌표 추출
    bus_stops = route[['LNG', 'LAT']].values  # shape = (M, 2)
    coords = df[['LNG', 'LAT']].values        # shape = (N, 2)

    # (N×M) 크기의 거리 행렬 생성
    dist_mat = distance_matrix(coords, bus_stops)  # scipy.spatial.distance_matrix

    # 각 버스 위치(행)에 대해 가장 가까운 정류장까지의 거리 계산
    min_dist_array = dist_mat.min(axis=1)

    # 임계값 이하인 경우를 bus_stop=True로 표시
    df['bus_stop'] = min_dist_array <= threshold

    # 버스정류장에 도착한 행만 필터링
    trueBusStop = df[df['bus_stop']].reset_index(drop=True)

    # 필요한 열만 반환 (원하시는 열 구성에 맞춰 조정)
    return trueBusStop[['LNG', 'LAT', 'Information_Occurrence', 'Car_RegistrationNumber']]

In [4]:
def calculate_chunk_size(file_path, memory_limit):
    """
    파일에서 샘플 데이터를 읽어 한 행당 메모리 크기를 계산하고
    주어진 메모리 제한에 맞는 청크 크기를 반환합니다.
    """
    # 샘플 데이터 읽기
    sample = pd.read_csv(file_path, sep='|', header=None, nrows=100, encoding='utf-8')
    sample_memory = sample.memory_usage(deep=True).sum()  # 샘플 데이터 메모리 사용량
    row_memory = sample_memory / len(sample)  # 한 행당 평균 메모리 크기
    chunk_size = int(memory_limit / row_memory)  # 청크 크기 계산
    return chunk_size

In [5]:
def run_with_chunks(bus_path, busro, chunk_size):
    columns = ['Trip_Key', 'Recorder_Model', 'Car_RentalNumber', 'Car_Type', 'Car_RegistrationNumber', 
               'Carrier_RegistrationNumber', 'Driver_Code', 'Day_Drive', 'Total_Drive', 'Car_Speed', 
               'Engine_Rotation', 'Break_Signal', 'LNG', 'LAT', 'GIS_Azimuth', 'Acceleration_Vx', 
               'Acceleration_Vy', 'Status_Code', 'Operating_Area', 'Information_Occurrence']
    result_columns = ['LNG', 'LAT', 'Information_Occurrence']

    result = pd.DataFrame(columns=result_columns)

    # 청크 단위로 데이터 읽기
    chunk_num = 1
    for chunk in pd.read_csv(bus_path, sep='|', header=None, names=columns, chunksize=chunk_size, encoding='utf-8'):
        print(f"Processing chunk {chunk_num} in {os.path.basename(bus_path)}")
        chunk = set_df(chunk)  # 사용자 정의 함수로 데이터 전처리
        processed_chunk = get_orderby_car_number(chunk, busro)  # 사용자 정의 함수로 데이터 처리
        result = pd.concat([result, processed_chunk], ignore_index=True)
        chunk_num += 1

    return result

In [6]:
def process_file(file_path, busro, memory_limit):
    """
    파일 처리 함수: 주어진 메모리 제한에 맞춰 동적으로 청크 크기를 계산하여 처리.
    """
    print(f"Processing file: {file_path} in Thread: {threading.current_thread().name}")
    chunk_size = calculate_chunk_size(file_path, memory_limit)
    print(f"Calculated chunk size: {chunk_size} rows")
    result = run_with_chunks(file_path, busro, chunk_size)

    # 처리 결과 저장
    output_path = file_path + '_processed.csv'
    print(f"Saving file to: {output_path}")
    result.to_csv(output_path, index=False, encoding="cp949")
    print(f"File saved: {output_path}")

In [None]:
def main():
    # 버스 정보 데이터 파일 읽기
    busro = pd.read_csv("./bus_route_specific_13.csv", encoding='cp949')

    # 루트 디렉토리 정의
    root = "./data"
    files = [os.path.join(path, name) for path, _, files in os.walk(root) for name in files]

    # 사용 가능한 메모리 계산
    total_memory = psutil.virtual_memory().available
    thread_memory_limit = total_memory * 0.05  # 각 스레드에 20% 메모리 할당

    # 쓰레드 리스트 생성
    threads = []
    for file_path in files:
        thread = threading.Thread(target=process_file, args=(file_path, busro, thread_memory_limit))
        threads.append(thread)
        thread.start()

    # 모든 쓰레드가 완료될 때까지 대기
    for thread in threads:
        thread.join()

    print("모든 파일 처리 완료")

if __name__ == "__main__":
    main()


Exception in thread Thread-9 (process_file):
Traceback (most recent call last):
  File "parsers.pyx", line 1120, in pandas._libs.parsers.TextReader._convert_tokens
Exception in thread Thread-10 (process_file):
Traceback (most recent call last):
  File "parsers.pyx", line 1120, in pandas._libs.parsers.TextReader._convert_tokens
Exception in thread Thread-11 (process_file):
Traceback (most recent call last):
  File "parsers.pyx", line 1120, in pandas._libs.parsers.TextReader._convert_tokens
Exception in thread Thread-12 (process_file):
Traceback (most recent call last):
  File "parsers.pyx", line 1120, in pandas._libs.parsers.TextReader._convert_tokens
  File "parsers.pyx", line 1272, in pandas._libs.parsers.TextReader._convert_with_dtype
  File "parsers.pyx", line 1272, in pandas._libs.parsers.TextReader._convert_with_dtype
  File "parsers.pyx", line 1272, in pandas._libs.parsers.TextReader._convert_with_dtype
  File "parsers.pyx", line 1272, in pandas._libs.parsers.TextReader._convert_

Processing file: ./data\DTG-r-00000 in Thread: Thread-5 (process_file)
Processing file: ./data\DTG-r-00001 in Thread: Thread-6 (process_file)
Processing file: ./data\DTG-r-00002 in Thread: Thread-7 (process_file)
Processing file: ./data\DTG-r-00003 in Thread: Thread-8 (process_file)
Processing file: ./data\data\DTG-r-00000_processed.csv in Thread: Thread-9 (process_file)
Processing file: ./data\data\DTG-r-00001_processed.csv in Thread: Thread-10 (process_file)
Processing file: ./data\data\DTG-r-00002_processed.csv in Thread: Thread-11 (process_file)
Processing file: ./data\data\DTG-r-00003_processed.csv in Thread: Thread-12 (process_file)
Calculated chunk size: 2697149 rows
Calculated chunk size: 2697149 rows
Calculated chunk size: 2697149 rows
Calculated chunk size: 2697149 rows
Processing chunk 1 in DTG-r-00002Processing chunk 1 in DTG-r-00000

Processing chunk 1 in DTG-r-00003
Processing chunk 1 in DTG-r-00001
Processing chunk 2 in DTG-r-00002
Processing chunk 2 in DTG-r-00003
Proce