In [1]:
# [+] PySpark 시작
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('assgn-tlc-trip-arc-layer')
sc = SparkContext(conf=conf)

In [2]:
# 데이터 파일 경로 및 이름
path = './data/'
coord_file = 'taxi_zone_lookup_coordinates_v2.csv'
trip_file = 'fhvhv_tripdata_2020-03_short.csv'

In [3]:
# [+] coordinates 데이터 파일을 읽어 RDD로 생성
coord_lines = sc.textFile('./data/' + coord_file)

# [+] RDD 값 5개 출력
coord_lines.take(5)

['LocationID,Borough,Zone,service_zone,latitude,longitude',
 '1,EWR,Newark Airport,EWR,40.69287997,-74.18544993',
 '2,Queens,Jamaica Bay,Boro Zone,40.6057,-73.8713',
 '3,Bronx,Allerton/Pelham Gardens,Boro Zone,40.86521003,-73.8435548',
 '4,Manhattan,Alphabet City,Yellow Zone,40.72599,-73.98057']

In [4]:
# [+] Trip 데이터 파일을 읽어 RDD로 생성
trip_lines = sc.textFile('./data/' + trip_file)

# [+] RDD 값 5개 출력
trip_lines.take(5)

['hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag',
 'HV0005,B02510,2020-03-01 00:03:40,2020-03-01 00:23:39,81,159,',
 'HV0005,B02510,2020-03-01 00:28:05,2020-03-01 00:38:57,168,119,',
 'HV0003,B02764,2020-03-01 00:03:07,2020-03-01 00:15:04,137,209,1',
 'HV0003,B02764,2020-03-01 00:18:42,2020-03-01 00:38:42,209,80,']

In [5]:
# [+] coord_lines의 헤더 제거
coord_header = coord_lines.first()
coord_filtered_lines = coord_lines.filter(lambda row:row != coord_header)

In [6]:
coord_filtered_lines.first()

'1,EWR,Newark Airport,EWR,40.69287997,-74.18544993'

In [7]:
# [+] trip_lines의 헤더 제거
trip_header = trip_lines.first()
trip_filtered_lines = trip_lines.filter(lambda row:row != trip_header)

In [8]:
trip_filtered_lines.first()

'HV0005,B02510,2020-03-01 00:03:40,2020-03-01 00:23:39,81,159,'

In [9]:
# [+] 헤더가 제거된 coord_filtered_lines 값 5개 출력
coord_filtered_lines.take(5)

['1,EWR,Newark Airport,EWR,40.69287997,-74.18544993',
 '2,Queens,Jamaica Bay,Boro Zone,40.6057,-73.8713',
 '3,Bronx,Allerton/Pelham Gardens,Boro Zone,40.86521003,-73.8435548',
 '4,Manhattan,Alphabet City,Yellow Zone,40.72599,-73.98057',
 '5,Staten Island,Arden Heights,Boro Zone,40.5564,-74.1735']

In [10]:
# [+] 헤더가 제거된 trip_filtered_lines 값 5개 출력
trip_filtered_lines.take(5)

['HV0005,B02510,2020-03-01 00:03:40,2020-03-01 00:23:39,81,159,',
 'HV0005,B02510,2020-03-01 00:28:05,2020-03-01 00:38:57,168,119,',
 'HV0003,B02764,2020-03-01 00:03:07,2020-03-01 00:15:04,137,209,1',
 'HV0003,B02764,2020-03-01 00:18:42,2020-03-01 00:38:42,209,80,',
 'HV0003,B02764,2020-03-01 00:44:24,2020-03-01 00:58:44,256,226,']

In [11]:
# [+] Key-Value RDD 생성
coord_kv = coord_filtered_lines.map(lambda x: ((x.split(",")[0]),(x.split(",")[4], x.split(",")[5])))

In [12]:
# [+] coord_kv 값 5개 출력
coord_kv.take(5)

[('1', ('40.69287997', '-74.18544993')),
 ('2', ('40.6057', '-73.8713')),
 ('3', ('40.86521003', '-73.8435548')),
 ('4', ('40.72599', '-73.98057')),
 ('5', ('40.5564', '-74.1735'))]

In [13]:
# latitude, longitude 값을 float 타입으로 변환
coord_kv = coord_kv.mapValues(lambda x: [float(x[0]), float(x[1])])

In [14]:
# [+] coord_kv 값 5개 출력
coord_kv.take(5)

[('1', [40.69287997, -74.18544993]),
 ('2', [40.6057, -73.8713]),
 ('3', [40.86521003, -73.8435548]),
 ('4', [40.72599, -73.98057]),
 ('5', [40.5564, -74.1735])]

In [15]:
# [+] Key-Value RDD 생성
trip_kv = trip_filtered_lines.map(lambda x: ((x.split(",")[4]),(x.split(",")[5])))

In [16]:
# [+] trip_kv 값 5개 출력
trip_kv.take(5)

[('81', '159'), ('168', '119'), ('137', '209'), ('209', '80'), ('256', '226')]

In [17]:
# [+] trip_kv와 coord_kv의 조인 연산
pu_joined = trip_kv.join(coord_kv)

In [18]:
# [+] pu_joined 5개 값 출력
pu_joined.take(5)

[('209', ('80', [40.7072, -74.0027])),
 ('209', ('37', [40.7072, -74.0027])),
 ('209', ('13', [40.7072, -74.0027])),
 ('209', ('127', [40.7072, -74.0027])),
 ('209', ('39', [40.7072, -74.0027]))]

In [19]:
# Key(PULocationID) 제거
pu_joined = pu_joined.values()
pu_joined.take(5)

[('80', [40.7072, -74.0027]),
 ('37', [40.7072, -74.0027]),
 ('13', [40.7072, -74.0027]),
 ('127', [40.7072, -74.0027]),
 ('39', [40.7072, -74.0027])]

In [20]:
# [+] pu_joined와 coord_kv의 조인 연산
pudo_joined = pu_joined.join(coord_kv)

In [21]:
# [+] pudo_joined 값 5개 출력
pudo_joined.take(5)

[('40', ([40.7072, -74.0027], [40.6802, -74.00163])),
 ('40', ([40.7072, -74.0027], [40.6802, -74.00163])),
 ('40', ([40.7072, -74.0027], [40.6802, -74.00163])),
 ('40', ([40.7072, -74.0027], [40.6802, -74.00163])),
 ('40', ([40.7072, -74.0027], [40.6802, -74.00163]))]

In [22]:
# Key(DOLocationID) 제거
pudo_joined = pudo_joined.values()
pudo_joined.take(5)

[([40.7072, -74.0027], [40.6802, -74.00163]),
 ([40.7072, -74.0027], [40.6802, -74.00163]),
 ([40.7072, -74.0027], [40.6802, -74.00163]),
 ([40.7072, -74.0027], [40.6802, -74.00163]),
 ([40.7072, -74.0027], [40.6802, -74.00163])]

In [23]:
# [+] pudo_joined를 List 객체로 출력
coord_lst = pudo_joined.collect()

In [24]:
# 샘플 출력
coord_lst[0]

([40.7072, -74.0027], [40.6802, -74.00163])

In [25]:
# 승차위치와 하차위치 리스트 결합
res_lst = []

for i in range(len(coord_lst)):
    res_lst.append(list(coord_lst[i][0] + coord_lst[i][1]))

In [26]:
# 샘플 출력
res_lst[0]

[40.7072, -74.0027, 40.6802, -74.00163]

In [27]:
# 결과를 DataFrame 객체에 저장
import pandas as pd
df = pd.DataFrame(res_lst, columns=["latitude_pu", "longitude_pu",
                                    "latitude_do", "longitude_do"
                                    ])

In [28]:
df

Unnamed: 0,latitude_pu,longitude_pu,latitude_do,longitude_do
0,40.707200,-74.002700,40.6802,-74.00163
1,40.707200,-74.002700,40.6802,-74.00163
2,40.707200,-74.002700,40.6802,-74.00163
3,40.707200,-74.002700,40.6802,-74.00163
4,40.707200,-74.002700,40.6802,-74.00163
...,...,...,...,...
4712821,40.862964,-73.858008,40.8468,-73.78750
4712822,40.862964,-73.858008,40.8468,-73.78750
4712823,40.754932,-73.984016,40.8468,-73.78750
4712824,40.689500,-73.764400,40.8468,-73.78750


In [29]:
# pydeck 설치 명령어
!pip install pydeck



In [30]:
# pydeck 임포트
import pydeck as pdk

In [31]:
# 시각화 옵션
GREEN_RGB = [0, 255, 0, 40]
RED_RGB = [240, 100, 0, 40]

In [None]:
%%time

# ArcLayer 시각화 설정
arc_layer = pdk.Layer(
    "ArcLayer",
    data=df.sample(100000),
    get_width="S000 * 2",
    get_source_position=["longitude_pu", "latitude_pu"],
    get_target_position=["longitude_do", "latitude_do"],
    get_tilt=15,
    get_source_color=RED_RGB,
    get_target_color=GREEN_RGB,
    pickable=True,
    auto_highlight=True,
)

# 초기화면 설정
view_state = pdk.ViewState(
    latitude=40.6928,
    longitude=-74.1854,
    bearing=45,
    pitch=50,
    zoom=8,
)


# 렌더링 옵션
TOOLTIP_TEXT = {"html": "{S000} trips <br /> Pickup Locations in red; Dropoff Locations in green"}
r = pdk.Deck(arc_layer, initial_view_state=view_state, tooltip=TOOLTIP_TEXT)
r.to_html("arc_layer.html")