작성자: 김혜인 (서울시립대학교 도시공학과 석사과정), hye2in@uos.ac.kr / hyein2kim@gmail.com, 2025

# 수도권 생활이동 데이터 해커톤 교육

In [None]:
# 기본 시스템/환경 관련 모듈
import os
import warnings
# 데이터 처리
import numpy as np
import polars as pl
import pandas as pd
import geopandas as gpd
from shapely.geometry import LineString, Point, Polygon
# 시각화
import matplotlib as mpl
import matplotlib.lines as mlines
import matplotlib.pyplot as plt
from matplotlib import cm, colors
from matplotlib.cm import ScalarMappable
import seaborn as sns

In [None]:
root = 'root'
data = os.path.join(root, 'data')
raw_data = os.path.join(data, 'raw')
processed_data = os.path.join(data, 'processed')
results = os.path.join(root, 'results')

## Create a 250m empty grid shapefile 250m 격자 Shapefile 생성

In [None]:
# 사전 전처리된 이동 데이터 필요 (base_ldf)

In [None]:
# Grid size definition 격자 크기 정의
grid_size = 250
half_grid_size = grid_size / 2

# 1. Extract unique origin grid information from base data 출발지 격자 정보 추출
origin_grids_df = base_ldf.select(['o_cell_id', 'o_cell_x', 'o_cell_y']).unique().collect().to_pandas()
origin_grids_df = origin_grids_df.rename(columns={'o_cell_id': 'cell_id', 'o_cell_x': 'cell_x', 'o_cell_y': 'cell_y'})

# 2. Extract unique destination grid information from base data 도착지 격자 정보 추출
destination_grids_df = base_ldf.select(['d_cell_id', 'd_cell_x', 'd_cell_y']).unique().collect().to_pandas()
destination_grids_df = destination_grids_df.rename(columns={'d_cell_id': 'cell_id', 'd_cell_x': 'cell_x', 'd_cell_y': 'cell_y'})

# 3. Merge both datasets and remove duplicates 출발지 도착지 격자 정보 결합 및 중복 제거
all_grids_df = pd.concat([origin_grids_df, destination_grids_df], ignore_index=True)
all_grids_df = all_grids_df.drop_duplicates(subset=['cell_id']).reset_index(drop=True)

# 4. Create Polygon geometry (250m x 250m) centered on (cell_x, cell_y) 격자 중심좌표 기준 사각형 폴리곤 생성
geometry_all = [
    Polygon([
        (x - half_grid_size, y - half_grid_size),
        (x + half_grid_size, y - half_grid_size),
        (x + half_grid_size, y + half_grid_size),
        (x - half_grid_size, y + half_grid_size),
        (x - half_grid_size, y - half_grid_size)
    ])
    for x, y in zip(all_grids_df['cell_x'], all_grids_df['cell_y'])
]

# 5. Create GeoDataFrame GeodataFrame 생성 및 좌표계 설정
all_grids_gdf = gpd.GeoDataFrame(all_grids_df, geometry=geometry_all)
all_grids_gdf.crs = "EPSG:5179"  # UTM-K

# 6. Define output path and save to shapefile
processed_data = './output'
os.makedirs(processed_data, exist_ok=True)
all_shp_path = os.path.join(processed_data, 'grid_250m.shp')
all_grids_gdf.to_file(all_shp_path, driver='ESRI Shapefile')

## Visualization

### OD MATRIX

In [None]:
# Load processed OD movement data (base_ldf) 사전 전처리된 이동 데이터 (base_ldf)
# o_cell_id, d_cell_id, total_cnt가 포함되어야 함

# Load 250m Grid shapefile 앞서 생성한 250m 격자 shapefile (grid_250m)

# Remove self-loops (movements within the same grid)
# # In this script, movements within the same grid cell are excluded to focus on overall movement flow patterns.
# 본 스크립트에서는 이동 흐름 추세를 확인하기 위해 동일 격자 내 이동은 제거하였음
base_ldf_no_self_loops = base_ldf.filter(pl.col('o_cell_id') != pl.col('d_cell_id'))

# Sum total_cnt for each origin-destination pair 출발지-도착지 쌍별 total_cnt 합산
od_summed_ldf = base_ldf_no_self_loops.group_by(['o_cell_id', 'd_cell_id']).agg(
    pl.sum('total_cnt').alias('total_movement_summed')
)

# Convert LazyFrame to Pandas DataFrame LazyFrame을 Pandas DataFrame으로 변환
od_summed_df = od_summed_ldf.collect().to_pandas()

# Join Grid Geometries 격자 지오메트리 결합

# Prepare origin grid geometries 출발 격자 지오메트리
grid_origin = grid[['cell_id', 'geometry']].rename(columns={'cell_id': 'o_cell_id', 'geometry': 'geometry_origin'})
# Merge origin geometries 출발 격자 지오메트리 결합
od_summed_with_origin_geom = od_summed_df.merge(grid_origin, on='o_cell_id', how='left')

# Prepare destination grid geometries 도착 격자 지오메트리 준비
grid_destination = grid[['cell_id', 'geometry']].rename(columns={'cell_id': 'd_cell_id', 'geometry': 'geometry_destination'})
# Merge destination geometries 도착 격자 지오메트리 결합
od_summed_with_geoms = od_summed_with_origin_geom.merge(grid_destination, on='d_cell_id', how='left')

# Calculate centroids 중심점 계산
od_summed_with_geoms['centroid_origin'] = od_summed_with_geoms['geometry_origin'].centroid  # 출발지 중심점
od_summed_with_geoms['centroid_destination'] = od_summed_with_geoms['geometry_destination'].centroid  # 도착지 중심점

# Filter out rows with null centroids 중심점이 null인 행 제거
od_summed_filtered_centroids = od_summed_with_geoms[
    od_summed_with_geoms['centroid_origin'].notna() &
    od_summed_with_geoms['centroid_destination'].notna()
].copy()

# Create LineString geometries for flows 출발지-도착지 라인스트링 생성
geometries_summed = [
    LineString([origin, destination])
    for origin, destination in zip(
        od_summed_filtered_centroids['centroid_origin'],
        od_summed_filtered_centroids['centroid_destination']
    )
]

# Create GeoDataFrame with flow lines 플로우 라인으로 GeoDataFrame 생성
od_lines_summed = gpd.GeoDataFrame(od_summed_filtered_centroids, geometry=geometries_summed)

# Calculate top 1% threshold of total movements 이동량 상위 1% 임계값 계산
total_movement_summed_series = od_lines_summed['total_movement_summed']
threshold_summed_top1 = total_movement_summed_series.quantile(0.99, interpolation='linear')  # 99 분위수

# Filter for top 1% flows 상위 1% 플로우 필터링
od_lines_summed_top1_filtered = od_lines_summed[
    od_lines_summed['total_movement_summed'] >= threshold_summed_top1
].copy()

#  Visualization 플롯 생성
fig, ax = plt.subplots(1, 1, figsize=(15, 15))
max_linewidth_summed = 5  # 최대 라인 두께 설정
od_lines_summed_top1_filtered.plot(
    ax=ax,
    linewidth=od_lines_summed_top1_filtered['total_movement_summed'] / od_lines_summed_top1_filtered['total_movement_summed'].max() * max_linewidth_summed,
    alpha=0.6, # 투명도 조절
    color="#403bc3" # 색상 선택
)

ax.set_title('Title', fontsize=15)
ax.set_axis_off()
plt.show()

### Bivariate Choropleth Map 이변량 맵 생성

In [None]:
# This script visualizes a bivariate choropleth map using two selected variables (Variable 1 & Variable 2)
# 두 개 변수(변수 1, 변수 2)를 사용한 이변량 맵 시각화
# Variables can be customized for different analysis contexts.  # 분석 목적에 맞는 변수 조정 필요

import matplotlib.patches as mpatches # 범례 색상 칸 생성에 필요한 모듈 추가 로드
from mpl_toolkits.axes_grid1.inset_locator import inset_axes # 범례 삽입을 위한 모듈 추가 로드

# Aggregate Variable 1 per Grid Cell  # 격자 셀별 변수 1 집계
origin_var1_ldf = base_ldf.group_by('o_cell_id').agg(
    pl.sum('variable1_cnt').alias('total_origin_var1')
)
destination_var1_ldf = base_ldf.group_by('d_cell_id').agg(
    pl.sum('variable1_cnt').alias('total_destination_var1')
)

origin_var1_df = origin_var1_ldf.collect().to_pandas().rename(columns={'o_cell_id': 'cell_id'})  # 데이터 수집 및 컬럼명 통일
destination_var1_df = destination_var1_ldf.collect().to_pandas().rename(columns={'d_cell_id': 'cell_id'})

cell_var1_df = origin_var1_df.merge(destination_var1_df, on='cell_id', how='outer').fillna(0)  # 출발/도착 변수 1 합산 및 결측값 처리
cell_var1_df['total_var1_related_to_cell'] = cell_var1_df['total_origin_var1'] + cell_var1_df['total_destination_var1']  # 변수 1 총합 계산

# Merge Variable 1 with Variable 2 Data  # 변수 1과 변수 2 데이터 결합
grid_bivariate = grid_with_variable2.merge(
    cell_var1_df[['cell_id', 'total_var1_related_to_cell']],
    on='cell_id',
    how='left'
).fillna({'total_var1_related_to_cell': 0})  # 결측값 0 처리

# Quantile Classification for Both Variables  # 두 변수에 대한 분위수 구간화
grid_bivariate['variable2_quantile'] = pd.qcut(
    grid_bivariate['variable2_value'], q=3, labels=[0, 1, 2], duplicates='drop'
)
grid_bivariate['variable1_quantile'] = pd.qcut(
    grid_bivariate['total_var1_related_to_cell'], q=3, labels=[0, 1, 2], duplicates='drop'
)

# Create Bivariate Category  # 이변량 카테고리 생성 (예: '2-1')
grid_bivariate['bivariate_category'] = grid_bivariate.apply(
    lambda row: f"{int(row['variable2_quantile'])}-{int(row['variable1_quantile'])}", axis=1
)

# Define Bivariate Color Mapping (3x3 Matrix)  # 색상 매핑 정의 (3x3 행렬)
bivariate_colors_map = {
    "0-0": "#e8e8e8", "0-1": "#ace4e4", "0-2": "#5ac8c8",
    "1-0": "#dfb0d6", "1-1": "#a5add3", "1-2": "#5698b9",
    "2-0": "#be64ac", "2-1": "#8c62aa", "2-2": "#3b4994"
}
grid_bivariate['bivariate_color'] = grid_bivariate['bivariate_category'].map(bivariate_colors_map)

# Plot Bivariate Choropleth Map  # 시각화
fig, ax = plt.subplots(1, 1, figsize=(15, 15))
grid_bivariate.plot(
    color=grid_bivariate['bivariate_color'],
    ax=ax,
    edgecolor="#6D6D6D", # 격자 테두리 색상
)
ax.set_title('Title', fontsize=15)
ax.set_axis_off()

# Add Bivariate Legend  # 범례 추가
legend_colors_matrix = np.array([
    [bivariate_colors_map["0-0"], bivariate_colors_map["0-1"], bivariate_colors_map["0-2"]],
    [bivariate_colors_map["1-0"], bivariate_colors_map["1-1"], bivariate_colors_map["1-2"]],
    [bivariate_colors_map["2-0"], bivariate_colors_map["2-1"], bivariate_colors_map["2-2"]]
])

axins = inset_axes(ax, width="10%", height="10%", loc='lower left', borderpad=2)  # 범례 위치 및 크기

for i in range(3):
    for j in range(3):
        axins.add_patch(
            mpatches.Rectangle((i, j), 1, 1, facecolor=legend_colors_matrix[j, i], edgecolor='none')
        )

axins.set_xticks([0.5, 1.5, 2.5])  # x축 눈금 위치 설정
axins.set_yticks([0.5, 1.5, 2.5])  # y축 눈금 위치 설정
axins.set_xticklabels(['Low', 'Mid', 'High'], fontsize=8)  # x축 눈금 레이블
axins.set_yticklabels(['Low', 'Mid', 'High'], fontsize=8)  # y축 눈금 레이블
axins.set_xlabel('Variable 1')  # x축 레이블
axins.set_ylabel('Variable 2')  # y축 레이블
axins.set_xlim(0, 3)  # x축 범위
axins.set_ylim(0, 3)  # y축 범위
axins.tick_params(left=False, bottom=False, labelleft=True, labelbottom=True)
plt.show()