In [1]:
from dotenv import load_dotenv
from pymongo import MongoClient
import pandas as pd
import os
from pyproj import Transformer

#### 데이터베이스 연결

In [2]:
# 데이터베이스 연결
load_dotenv()
client = MongoClient(os.getenv('DB_ADR'),
          username=os.getenv('DB_USER'),
          password=os.getenv('DB_PASSWORD'),
          authSource=os.getenv('DB_AuthSource'),
          authMechanism=os.getenv('DB_AuthMechanism'))

db = client.get_database(os.getenv('DB_Collection'))

# 좌표 변환기 설정 (EPSG:5174 -> EPSG:4326)
transformer = Transformer.from_crs("EPSG:5174", "EPSG:4326", always_xy=True)

#### 사용 함수

In [3]:
# raw_od_uuid 컬렉션에서 시간 필터링('11', '12', '18', '19')
def timefilter(raw_cl, timefiltered_cl):
    target_values=['11', '12', '18', '19']
    regex_pattern = f"^.{{11}}({'|'.join(target_values)}).*$"

    tgt_data=list(raw_cl.find({"time_end": {"$regex":regex_pattern}},{
        '_id':False,
        'uuid':True,
        'time_end':True,
        'destination_lat':True,
        'destination_lng':True}))
    print(len(tgt_data))
    timefiltered_cl.insert_many(tgt_data)

In [4]:
def public_docs_append_field_4326(public_cl_list):
    for public_cl in public_cl_list:
        public_collection=db.get_collection(public_cl)
        for public in public_collection.find():
            lat2, lng2 = public['좌표정보x(epsg5174)'], public['좌표정보y(epsg5174)']
            if lat2 is not None and lng2 is not None:
                lng2, lat2 = transformer.transform(lat2, lng2)
                public_collection.update_one(
                    {'_id': public['_id']},
                    {'$set': {'좌표정보x(epsg4326)': lat2, '좌표정보y(epsg4326)': lng2}}
                 )

In [48]:
pipeline_04 = [
    {
        '$lookup': {
            'from': 'public_open_04',
            'let': {
                'lat1': '$destination_lat',
                'lng1': '$destination_lng'
            },
            'pipeline': [
                {
                    '$match': {
                        '$expr': {
                            '$and': [
                                {'$lt': [{'$abs': {'$subtract': ['$좌표정보x(epsg4326)', '$$lat1']}}, 0.0001]},
                                {'$lt': [{'$abs': {'$subtract': ['$좌표정보y(epsg4326)', '$$lng1']}}, 0.0001]}
                            ]
                        },
                        # NaN 값이 아닌 경우만 필터링
                        '좌표정보x(epsg4326)': {'$nin': [float('NaN')]},
                        '좌표정보y(epsg4326)': {'$nin': [float('NaN')]}
                    }
                },
                {
                    '$project': {
                        '_id': 0,
                        '소재지전체주소': 1,
                        '도로명전체주소': 1,
                        '사업장명': 1
                    }
                }
            ],
            'as': 'matched_docs'
        }
    },
    {
        '$match': {
            'matched_docs': {'$ne': []}  # 매칭된 데이터가 있는 경우만 필터링
        }
    },
    {
        '$project': {
            '_id': 0,
            'uuid': 1,
            'time_end': 1,
            'destination_lat': 1,
            'destination_lng': 1,
            '소재지전체주소': '$matched_docs.소재지전체주소',
            '도로명전체주소': '$matched_docs.도로명전체주소',
            '사업장명': '$matched_docs.사업장명'
        }
    }
]

pipeline_05 = [
    {
        '$lookup': {
            'from': 'public_open_05',
            'let': {
                'lat1': '$destination_lat',
                'lng1': '$destination_lng'
            },
            'pipeline': [
                {
                    '$match': {
                        '$expr': {
                            '$and': [
                                {'$lt': [{'$abs': {'$subtract': ['$좌표정보x(epsg4326)', '$$lat1']}}, 0.0001]},
                                {'$lt': [{'$abs': {'$subtract': ['$좌표정보y(epsg4326)', '$$lng1']}}, 0.0001]}
                            ]
                        },
                        # NaN 값이 아닌 경우만 필터링
                        '좌표정보x(epsg4326)': {'$nin': [float('NaN')]},
                        '좌표정보y(epsg4326)': {'$nin': [float('NaN')]}
                    }
                },
                {
                    '$project': {
                        '_id': 0,
                        '소재지전체주소': 1,
                        '도로명전체주소': 1,
                        '사업장명': 1
                    }
                }
            ],
            'as': 'matched_docs'
        }
    },
    {
        '$match': {
            'matched_docs': {'$ne': []}  # 매칭된 데이터가 있는 경우만 필터링
        }
    },
    {
        '$project': {
            '_id': 0,
            'uuid': 1,
            'time_end': 1,
            'destination_lat': 1,
            'destination_lng': 1,
            '소재지전체주소': '$matched_docs.소재지전체주소',
            '도로명전체주소': '$matched_docs.도로명전체주소',
            '사업장명': '$matched_docs.사업장명'
        }
    }
]

In [None]:
tgt_data=list(db["public_open_04"].find({'좌표정보x(epsg4326)':float('NaN')}))
len(tgt_data)

#### 실행 코드

In [None]:
#2020~23년도 필터링 후 데이터 삽입
raw_cls=['raw_od_uuid_2023', 'raw_od_uuid_2022', 'raw_od_uuid_2021', 'raw_od_uuid_2020']
timefilterd_cls=['timefiltered_2023', 'timefiltered_2022', 'timefiltered_2021', 'timefiltered_2020']

for raw, time in zip(raw_cls, timefilterd_cls):
    raw_cl = db.get_collection(raw)
    timefiltered_cl = db.get_collection(time)
    timefilter(raw_cl,timefiltered_cl)


# public 컬렉션 데이터에 변환된 좌표 추가
collections_to_check = ["public_open_04", "public_open_05"]
public_docs_append_field_4326(collections_to_check)


##### 필터링 데이터 개수
||2023|2022|2021|2020|
|---|---|---|---|---|
|개수|5,909,489|7,527,799|8,627,310|8,907,064|

In [None]:
result_collection = db.get_collection("user_destination_restaurant_timefiltering_2022")
batch_size = 500
last_id = None 

total_count = db["timefiltered_2022"].count_documents({})
print(total_count)
for processed in range(0, total_count, batch_size):
    print(f"{processed}번째 문서부터 {processed + batch_size}번째 문서까지 처리 중...")
    
    batch_pipeline = [
        {"$sort": {"_id": 1}},
        {"$skip": processed},
        {"$limit": batch_size}, 
    ] + pipeline_04 

    batch_docs = list(db["timefiltered_2022"].aggregate(batch_pipeline))
    
    if batch_docs:
        print(f"{len(batch_docs)}개의 문서 저장")
        result_collection.insert_many(batch_docs)
    else:
        print("해당 구간에는 조건에 맞는 데이터가 없음")

In [None]:
result_collection = db.get_collection("user_destination_restaurant_timefiltering_2022")
batch_size = 100
last_id = None 

total_count = db["timefiltered_2022"].count_documents({})
print(total_count)
for processed in range(0, total_count, batch_size):
    print(f"{processed}번째 문서부터 {processed + batch_size}번째 문서까지 처리 중...")
    
    batch_pipeline = [
        {"$sort": {"_id": 1}},
        {"$skip": processed},
        {"$limit": batch_size}, 
    ] + pipeline_05 

    batch_docs = list(db["timefiltered_2022"].aggregate(batch_pipeline))
    
    if batch_docs:
        print(f"{len(batch_docs)}개의 문서 저장")
        result_collection.insert_many(batch_docs)
    else:
        print("해당 구간에는 조건에 맞는 데이터가 없음")