In [1]:
# 국룰 세팅1
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 40.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=f07a8fc0faf417a405e14e86c915a1cf02a2a99612f789248424a11b49bc8412
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
# 국룰 세팅2
import pandas as pd
from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()

In [3]:
# 국룰 세팅3
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [8]:
# 이전에 작업했던 일별 이용자수 데이터를 가져왔다.
# 이 dataframe은 필요한 col에 대한 작업은 해 두었지만, 일부 역명처리가 처리되지않았다.
df = spark.read.format("parquet").option("header", True).load("/content/drive/MyDrive/data/중간저장/preprocessing")
df

DataFrame[사용일자: date, 노선명: string, 역명: string, 승차총승객수: int, 하차총승객수: int, 이용객수: int, 코로나: int]

In [9]:
# 동명이역을 처리한다.(소속 노선으로 구분)
df = df.withColumn("역명", F.when((df.역명=='신촌') & (df.노선명=='경의중앙선'), '신촌(경의중앙)').otherwise(df.역명))
df = df.withColumn("역명", F.when((df.역명=='양평') & (df.노선명=='5호선'), '양평(서울)').otherwise(df.역명))

In [10]:
# 우리가 필요한 정보는 해당 시기(코로나 이전/이후)의 역별 총 승차인원이다.
df = df.groupby('`코로나`', '`역명`').agg(F.sum('`승차총승객수`').alias('역별이용자수')).orderBy(F.desc('`역명`'))

In [11]:
# 코로나 이전 이용자수를 새 df에 저장한다.
df_previous = df.filter(df.코로나 == -1)
df_previous = df_previous.withColumnRenamed('역별이용자수', '이전이용자수')
df_previous.show()

+------+--------------------+------------+
|코로나|                역명|이전이용자수|
+------+--------------------+------------+
|    -1|    흑석(중앙대입구)|     6155347|
|    -1|          효창공원앞|     6218741|
|    -1|    회현(남대문시장)|    18777734|
|    -1|                회룡|     8164715|
|    -1|                회기|    18265333|
|    -1|                화정|    12214324|
|    -1|                화전|     1603029|
|    -1|                화서|     5251906|
|    -1|화랑대(서울여대입구)|     8048587|
|    -1|                화곡|    18541001|
|    -1|                화계|     2038078|
|    -1|                홍제|    12084790|
|    -1|            홍대입구|    58440111|
|    -1|              호구포|     2271433|
|    -1|                혜화|    26035834|
|    -1|                행신|     4940103|
|    -1|                행당|     5299875|
|    -1|                합정|    30452244|
|    -1|                한티|     9224318|
|    -1|              한양대|     6886558|
+------+--------------------+------------+
only showing top 20 rows



In [12]:
# 코로나 이후 사용자수 또한 별도의 df에 저장한다.
df_post = df.filter(df.코로나 == 1)
df_post = df_post.withColumnRenamed('역별이용자수', '이후이용자수')
df_post.show()

+------+--------------------+------------+
|코로나|                역명|이후이용자수|
+------+--------------------+------------+
|     1|    흑석(중앙대입구)|     3816069|
|     1|          효창공원앞|     4826514|
|     1|    회현(남대문시장)|    11283181|
|     1|                회룡|     6276987|
|     1|                회기|    12044998|
|     1|                화정|     8525525|
|     1|                화전|      822115|
|     1|                화서|     3647735|
|     1|화랑대(서울여대입구)|     5573961|
|     1|                화곡|    14273259|
|     1|                화계|     1669995|
|     1|                홍제|     8787302|
|     1|            홍대입구|    32801540|
|     1|              호구포|     1813329|
|     1|                혜화|    15545701|
|     1|                행신|     3803644|
|     1|                행당|     4067437|
|     1|                합정|    22145669|
|     1|                한티|     6939102|
|     1|              한양대|     3511069|
+------+--------------------+------------+
only showing top 20 rows



In [13]:
# 이제 두 df를 left join을 이용해 합친다. 같은 df에서 파생된 두 df이므로 데이터 손실은 걱정하지 않아도 된다.
df_joined = df_previous.join(df_post, on='역명', how='left')

In [14]:
# 이제 각 시기의 이용자수 col을 만들었으므로, 두 col을 이용해 코로나 이전 대비 이후 이용자수 변화율을 구해주자.
df_joined = df_joined.withColumn('이용자변화율', F.col('이후이용자수') / F.col('이전이용자수') - 1)

In [15]:
df_joined.show()

+--------------------+------+------------+------+------------+--------------------+
|                역명|코로나|이전이용자수|코로나|이후이용자수|        이용자변화율|
+--------------------+------+------------+------+------------+--------------------+
|    흑석(중앙대입구)|    -1|     6155347|     1|     3816069|-0.38003998799742733|
|          효창공원앞|    -1|     6218741|     1|     4826514|-0.22387602249394212|
|    회현(남대문시장)|    -1|    18777734|     1|    11283181| -0.3991191375913622|
|                회룡|    -1|     8164715|     1|     6276987|-0.23120562077182116|
|                회기|    -1|    18265333|     1|    12044998| -0.3405541525029957|
|                화정|    -1|    12214324|     1|     8525525| -0.3020059890338589|
|                화전|    -1|     1603029|     1|      822115| -0.4871490160190489|
|                화서|    -1|     5251906|     1|     3647735| -0.3054454896945985|
|화랑대(서울여대입구)|    -1|     8048587|     1|     5573961| -0.3074609245076185|
|                화곡|    -1|    18541001|     1|    14273

In [16]:
# 보기편하게 역명 오름차순으로 재정렬
df_joined = df_joined.select(['역명', '이용자변화율']).orderBy('역명')
df_joined.show()

+------------------+--------------------+
|              역명|        이용자변화율|
+------------------+--------------------+
|      4.19민주묘지| -0.2625318589341581|
|              가능| -0.2599903937669372|
|          가락시장| -0.2404176256996079|
|    가산디지털단지| -0.1358371150297102|
|              가양| -0.2190068185714824|
|            가오리|-0.20101607740954452|
|              가좌| -0.2075038932747617|
|            가천대| -0.3859967071050384|
|              가평| -0.3822845094559424|
|              간석|-0.28193791382842914|
|              갈매|-0.10071730504533882|
|              강남| -0.3256539890517869|
|          강남구청|-0.16275754284531496|
|              강동|-0.22425780281891472|
|          강동구청|-0.22524604129526749|
|              강매|-0.15748086306509812|
|강변(동서울터미널)| -0.4220370130679365|
|              강촌| -0.3354668507836245|
|              개롱|-0.25814490432094295|
|              개봉|-0.27270737442298854|
+------------------+--------------------+
only showing top 20 rows



In [17]:
# 역들의 위치를 기록한 데이터를 불러옴
station_locate = spark.read.format("csv").option("header", True).load("/content/drive/MyDrive/data/station_coordinate.csv", encoding='utf-8')
station_locate = station_locate.withColumnRenamed("name", "역명")
station_locate = station_locate.withColumnRenamed("line", "노선명")
station_locate = station_locate.select('노선명', '역명', 'lat', 'lng').distinct()

In [18]:
station_locate.show()

+--------------+------------+---------+----------+
|        노선명|        역명|      lat|       lng|
+--------------+------------+---------+----------+
|        02호선|        아현|37.557345|126.956141|
|        03호선|        원당|37.653324|126.843041|
|        03호선|        대곡|37.631626|126.811024|
|우이신설경전철|        화계|37.634802|127.017519|
|        05호선|        마곡|37.560183|126.825448|
|        05호선|        방이|37.508857|127.126133|
|        경춘선|        신내|37.612887|127.103218|
|        분당선|        모란| 37.43213|127.129087|
|        04호선|        사당| 37.47653|126.981685|
|        06호선|        대흥|37.547771|126.942069|
|        06호선|        증산|37.583876|126.909645|
|        07호선|      보라매|37.499872|126.920428|
|        08호선|        잠실| 37.51395|127.102234|
|        서해선|        신현|     null|      null|
|      신분당선|        판교|37.394761|127.111217|
|        01호선|        덕계|37.818486|127.056486|
|        02호선|        이대|37.556733|126.946013|
|        경의선|        운정|37.725826|126.767257|
|        인천선|경인교대입

안타깝게도 위치데이터를 갱신한 df를 따로 저장하지 않았다.

In [19]:
# null값 채워넣기
# 인천공항1터미널
station_locate = station_locate.withColumn("lat", F.when(station_locate.역명=='인천공항1터미널', '37.447492').otherwise(station_locate.lat))
station_locate = station_locate.withColumn("lng", F.when(station_locate.역명=='인천공항1터미널', '126.452555').otherwise(station_locate.lng))
# 인천공항2터미널
station_locate = station_locate.withColumn("lat", F.when(station_locate.역명=='인천공항2터미널', '37.467407').otherwise(station_locate.lat))
station_locate = station_locate.withColumn("lng", F.when(station_locate.역명=='인천공항2터미널', '126.434142').otherwise(station_locate.lng))
# 경기광주역
station_locate = station_locate.withColumn("lat", F.when(station_locate.역명=='경기광주', '37.398775').otherwise(station_locate.lat))
station_locate = station_locate.withColumn("lng", F.when(station_locate.역명=='경기광주', '127.252308').otherwise(station_locate.lng))

# 신촌역과 양평역은 동일한 이름의 역명이 2개 존재해서 위치 데이터가 잘못기록되어 있었다.
# 신촌역(경의중앙)
station_locate = station_locate.withColumn("lat", F.when((station_locate.역명=='신촌') & (station_locate.노선명=='경의선'), '37.559864').otherwise(station_locate.lat))
station_locate = station_locate.withColumn("lng", F.when((station_locate.역명=='신촌') & (station_locate.노선명=='경의선'), '126.942571').otherwise(station_locate.lng))
# 양평역(서울)
station_locate = station_locate.withColumn("lat", F.when((station_locate.역명=='양평') & (station_locate.노선명=='05호선'), '37.525338').otherwise(station_locate.lat))
station_locate = station_locate.withColumn("lng", F.when((station_locate.역명=='양평') & (station_locate.노선명=='05호선'), '126.886194').otherwise(station_locate.lng))

In [20]:
# 이용자수 df와 다른 역명 처리
# 해당하는 역들로 dictionary를 만들고 그 key값들을 list에 저장
change_name = {'서울': '서울역', '419민주묘지': '4.19민주묘지', '지제': '평택지제'}
change_name_list = []
for key in change_name:
    change_name_list.append(key)

In [21]:
from itertools import chain


mapping_expr = F.create_map([F.lit(x) for x in chain(*change_name.items())])
station_locate = station_locate.withColumn("역명", F.when(station_locate.역명.isin(change_name_list), mapping_expr[F.col("역명")]).otherwise(station_locate.역명))

In [22]:
# 동명이역 처리
# 가짜 신촌역 이름 바꿔줌
station_locate = station_locate.withColumn("역명", F.when((station_locate.역명=='신촌') & (station_locate.노선명=='경의선'), '신촌(경의중앙)').otherwise(station_locate.역명))
# 양평역 이름 바꿔줌
station_locate = station_locate.withColumn("역명", F.when((station_locate.역명=='양평') & (station_locate.노선명=='05호선'), '양평(서울)').otherwise(station_locate.역명))

In [23]:
# 필요한 col만 추려냄
station_locate = station_locate.select('역명', 'lat', 'lng').distinct()

In [24]:
# 각 노선내 순서를 기록해둔 데이터를 불러옴
ordered = spark.read.format("csv").option("header", True).load("/content/drive/MyDrive/data/ordered_line(ver2).csv", encoding='utf-8')

In [25]:
ordered.show()

+------+---------+----------+----+
|노선명|sub노선명|      역명|순번|
+------+---------+----------+----+
| 1호선|    1호선|    소요산| 100|
| 1호선|    1호선|    동두천| 101|
| 1호선|    1호선|      보산| 102|
| 1호선|    1호선|동두천중앙| 103|
| 1호선|    1호선|      지행| 104|
| 1호선|    1호선|      덕정| 105|
| 1호선|    1호선|      덕계| 106|
| 1호선|    1호선|      양주| 107|
| 1호선|    1호선|      녹양| 108|
| 1호선|    1호선|      가능| 109|
| 1호선|    1호선|    의정부| 110|
| 1호선|    1호선|      회룡| 111|
| 1호선|    1호선|    망월사| 112|
| 1호선|    1호선|    도봉산| 113|
| 1호선|    1호선|      도봉| 114|
| 1호선|    1호선|      방학| 115|
| 1호선|    1호선|      창동| 116|
| 1호선|    1호선|      녹천| 117|
| 1호선|    1호선|      월계| 118|
| 1호선|    1호선|    광운대| 119|
+------+---------+----------+----+
only showing top 20 rows



In [26]:
# 순서 df(그래프에 필요한 역들)에 위치 df를 left outer join으로 병합
test = ordered.join(station_locate, on='역명', how='left')

In [27]:
# ordered에는 있고 station_locate에는 없는 역을 확인
test.select('역명', '노선명').where(F.col('lat').isNull()==True).distinct().orderBy('노선명').show()

+----+------+
|역명|노선명|
+----+------+
+----+------+



In [28]:
# 이 df를 이용자수 df에 합치면 병기역명 때문에 문제가 생긴다. 이를 처리해주자.
# 변경해야하는 병기역명이 많이므로 딕셔너리를 활용하려한다.(수작업)
duel_name = {'충정로': '충정로(경기대입구)', '안암': '안암(고대병원앞)', '천호': '천호(풍납토성)',
             '신창': '신창(순천향대)', '봉화산': '봉화산(서울의료원)', '광화문': '광화문(세종문화회관)',
             '마곡나루': '마곡나루(서울식물원)', '어린이대공원': '어린이대공원(세종대)', '경복궁': '경복궁(정부서울청사)',
             '상봉': '상봉(시외버스터미널)', '왕십리': '왕십리(성동구청)', '교대': '교대(법원.검찰청)',
             '남한산성입구': '남한산성입구(성남법원.검찰청)', '수유': '수유(강북구청)', '월드컵경기장': '월드컵경기장(성산)',
             '쌍용': '쌍용(나사렛대)', '청량리': '청량리(서울시립대입구)', '숭실대입구': '숭실대입구(살피재)',
             '총신대입구': '총신대입구(이수)', '동작': '동작(현충원)', '새절': '새절(신사)', '강변': '강변(동서울터미널)',
             '몽촌토성': '몽촌토성(평화의문)', '아차산': '아차산(어린이대공원후문)', '고려대': '고려대(종암)', '증산': '증산(명지대앞)',
             '광흥창': '광흥창(서강)', '서울대입구': '서울대입구(관악구청)', '군자': '군자(능동)', '오목교': '오목교(목동운동장앞)',
             '신정': '신정(은행정)', '잠실': '잠실(송파구청)', '미아': '미아(서울사이버대학)', '삼성': '삼성(무역센터)',
             '대림': '대림(구로구청)', '회현': '회현(남대문시장)', '구의': '구의(광진구청)', '용마산': '용마산(용마폭포공원)',
             '남부터미널': '남부터미널(예술의전당)', '화랑대': '화랑대(서울여대입구)', '녹사평': '녹사평(용산구청)',
             '용두': '용두(동대문구청)', '대흥': '대흥(서강대앞)', '낙성대': '낙성대(강감찬)', '양재': '양재(서초구청)',
             '상월곡': '상월곡(한국과학기술연구원)', '숙대입구': '숙대입구(갈월)', '광나루': '광나루(장신대)',
             '온수': '온수(성공회대입구)', '올림픽공원': '올림픽공원(한국체대)', '한성대입구': '한성대입구(삼선교)',
             '이촌': '이촌(국립중앙박물관)', '동대문역사문화공원': '동대문역사문화공원(DDP)', '공릉': '공릉(서울과학기술대)',
             '성신여대입구': '성신여대입구(돈암)', '굽은다리': '굽은다리(강동구민회관앞)', '월곡': '월곡(동덕여대)', '흑석': '흑석(중앙대입구)'}

In [29]:
duel_name_list = []
for key in duel_name:
    duel_name_list.append(key)

In [30]:
# test의 역명을 바꿔줌
mapping_expr = F.create_map([F.lit(x) for x in chain(*duel_name.items())])
test = test.withColumn("역명", F.when(test.역명.isin(duel_name_list), mapping_expr[F.col("역명")]).otherwise(test.역명))

In [31]:
# 이용자수 df를 left outer join으로 합쳐줌
test2 = test.join(df_joined, on='역명', how='left')

In [32]:
# null값 존재하는지 확인
test2.where(F.col('lat').isNull()==True).orderBy('역명').show()

+----+------+---------+----+---+---+------------+
|역명|노선명|sub노선명|순번|lat|lng|이용자변화율|
+----+------+---------+----+---+---+------------+
+----+------+---------+----+---+---+------------+



In [33]:
# 반대 방향의 join을 실시하여 손실되는 data확인
df_joined.join(test, on='역명', how='left').where(F.col('lat').isNull()==True).show()

+----+------------+------+---------+----+---+---+
|역명|이용자변화율|노선명|sub노선명|순번|lat|lng|
+----+------------+------+---------+----+---+---+
+----+------------+------+---------+----+---+---+



In [34]:
result = test2.orderBy('sub노선명')

In [35]:
result.show()

+----------------------+------+---------+----+---------+----------+--------------------+
|                  역명|노선명|sub노선명|순번|      lat|       lng|        이용자변화율|
+----------------------+------+---------+----+---------+----------+--------------------+
|                  용산| 1호선|    1호선| 135|37.529849|126.964561|-0.35582950083197395|
|            동두천중앙| 1호선|    1호선| 103|37.901885|127.056482|-0.31707269927925985|
|               종로3가| 1호선|    1호선| 130|37.571607|126.991806| -0.3702077446113671|
|                신이문| 1호선|    1호선| 121|37.601854|127.067325|-0.24867124031403898|
|청량리(서울시립대입구)| 1호선|    1호선| 124|37.580178|127.046835|-0.30873663723207145|
|                  송내| 1호선|    1호선| 150|  37.4876|126.753664| -0.3069108566247968|
|                광운대| 1호선|    1호선| 119|37.623632|127.061835| -0.3007307666340202|
|                  구로| 1호선|    1호선| 141|37.503039|126.881966|-0.26640820730679904|
|                동대문| 1호선|    1호선| 128| 37.57142|127.009745| -0.3471682452334497|
|                

In [None]:
# 문제가 없으니 저장
result.write.parquet('/content/drive/MyDrive/data/contrast_metro_user/ordered_line_difference_ratio.parquet')