### 엘리스 환경에서의 Pyspark 환경 구성
---

엘리스에서 Pyspark 실습을 위해 환경을 구성합니다. <br>
실습 코드 실행 전 **아래의 셀을 꼭 실행**해주세요.

1. pip를 활용하여 Koalas와 Pyspark `3.x` 버전을 설치합니다. 이후 Spark Session을 준비합니다.
<br>**실습에서는 2.4.7 버전을 설치**하였으나 **Python 3.8 버전과의 호환**을 위해 최신 버전을 설치합니다.

In [1]:
!pip install pyspark
!pip install Koalas

import databricks.koalas as ks
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Collecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
Installing collected packages: py4j
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.2
    Uninstalling py4j-0.10.9.2:
      Successfully uninstalled py4j-0.10.9.2
Successfully installed py4j-0.10.7






2. Pyspark 환경이 잘 구성되었는지 확인합니다.

In [2]:
spark

### Data read
---

1. Spark의 read.format을 활용하여 `CSV` 파일을 불러옵니다.

In [5]:
df = spark.read.format("csv").load("data/delivery_data.csv")

2. `printSchema`을 활용하여 스키마를 확인합니다.<br>현재 데이터의 column명과 데이터가 이상한 것을 확인할 수 있습니다.

In [6]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



3. `head`를 활용하여 상위 5개의 값을 출력합니다.<br>현재 column 명이 첫번째 행에 붙어있음을 확인할 수 있습니다.

In [7]:
df.head(5)

[Row(_c0='2019-08-01', _c1='00', _c2='강원도', _c3='속초시', _c4='교동', _c5='1'),
 Row(_c0='2019-08-01', _c1='00', _c2='경기도', _c3='고양시 일산동구', _c4='마두동', _c5='4'),
 Row(_c0='2019-08-01', _c1='00', _c2='경기도', _c3='고양시 일산동구', _c4='백석동', _c5='28'),
 Row(_c0='2019-08-01', _c1='00', _c2='경기도', _c3='고양시 일산동구', _c4='식사동', _c5='3'),
 Row(_c0='2019-08-01', _c1='00', _c2='경기도', _c3='고양시 일산동구', _c4='장항동', _c5='4')]

### column 명 조정
---

1. pyspark의 sql을 활용하여 헤더를 붙여줍니다.

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([\
    StructField("날짜", StringType(), True),\
    StructField("시간대별시간", IntegerType(), True),\
    StructField("배달상점광역시도명", StringType(), True),\
    StructField("배달상점시군구명", StringType(), True),\
    StructField("배달상점법정동명", StringType(), True),\
    StructField("주문건수", IntegerType(), True)])

In [10]:
df = spark.read.options(delimiter=',').option("header", True).schema(schema).csv("data/delivery_data.csv")

2. 수정한 결과를 확인합니다.

In [11]:
df.head(5)

[Row(날짜='2019-08-01', 시간대별시간=0, 배달상점광역시도명='경기도', 배달상점시군구명='고양시 일산동구', 배달상점법정동명='마두동', 주문건수=4),
 Row(날짜='2019-08-01', 시간대별시간=0, 배달상점광역시도명='경기도', 배달상점시군구명='고양시 일산동구', 배달상점법정동명='백석동', 주문건수=28),
 Row(날짜='2019-08-01', 시간대별시간=0, 배달상점광역시도명='경기도', 배달상점시군구명='고양시 일산동구', 배달상점법정동명='식사동', 주문건수=3),
 Row(날짜='2019-08-01', 시간대별시간=0, 배달상점광역시도명='경기도', 배달상점시군구명='고양시 일산동구', 배달상점법정동명='장항동', 주문건수=4),
 Row(날짜='2019-08-01', 시간대별시간=0, 배달상점광역시도명='경기도', 배달상점시군구명='고양시 일산동구', 배달상점법정동명='정발산동', 주문건수=2)]

In [12]:
df.printSchema()

root
 |-- 날짜: string (nullable = true)
 |-- 시간대별시간: integer (nullable = true)
 |-- 배달상점광역시도명: string (nullable = true)
 |-- 배달상점시군구명: string (nullable = true)
 |-- 배달상점법정동명: string (nullable = true)
 |-- 주문건수: integer (nullable = true)



### Infer Schema 개념 설명
--- 

- Infer Schema는 Spark에서 데이터를 읽을 때, 자동으로 스키마의 데이터 타입을 추론해서 지정하는 것을 말합니다.
- 간편하고 좋지만, 가끔 원하는 타입으로 넣어지지 않을 때가 있으므로, 꼭 다시 한 번 확인해줘야 합니다.
- 스키마가 꼬여버리면, 나중의 코드 작업에서 에러가 발생할 확률이 높습니다!

1.추론된 스키마가 정확한지 확인합니다.

In [13]:
df.printSchema()

root
 |-- 날짜: string (nullable = true)
 |-- 시간대별시간: integer (nullable = true)
 |-- 배달상점광역시도명: string (nullable = true)
 |-- 배달상점시군구명: string (nullable = true)
 |-- 배달상점법정동명: string (nullable = true)
 |-- 주문건수: integer (nullable = true)



In [14]:
df.show(5)

+----------+------------+------------------+----------------+----------------+--------+
|      날짜|시간대별시간|배달상점광역시도명|배달상점시군구명|배달상점법정동명|주문건수|
+----------+------------+------------------+----------------+----------------+--------+
|2019-08-01|           0|            경기도| 고양시 일산동구|          마두동|       4|
|2019-08-01|           0|            경기도| 고양시 일산동구|          백석동|      28|
|2019-08-01|           0|            경기도| 고양시 일산동구|          식사동|       3|
|2019-08-01|           0|            경기도| 고양시 일산동구|          장항동|       4|
|2019-08-01|           0|            경기도| 고양시 일산동구|        정발산동|       2|
+----------+------------+------------------+----------------+----------------+--------+
only showing top 5 rows



### Pyspark DF를 Pandas DF로
---

- Dataframe에서 작업을 해도 되지만, 훨씬 작업하기 쉬운 Pandas Dataframe으로 변환해 보겠습니다.
- Pandas DF에서는 데이터 양이 적거나, 간단한 작업의 경우에만 사용하시는 것을 추천 드립니다. 
- Pyspark와 Pandas의 Dataframe만 사실상 이름만 같기에, Pyspark의 최적화 기능이 동작하지 않습니다.

In [15]:
pdf = df.toPandas()

In [16]:
pdf

Unnamed: 0,날짜,시간대별시간,배달상점광역시도명,배달상점시군구명,배달상점법정동명,주문건수
0,2019-08-01,0,경기도,고양시 일산동구,마두동,4
1,2019-08-01,0,경기도,고양시 일산동구,백석동,28
2,2019-08-01,0,경기도,고양시 일산동구,식사동,3
3,2019-08-01,0,경기도,고양시 일산동구,장항동,4
4,2019-08-01,0,경기도,고양시 일산동구,정발산동,2
...,...,...,...,...,...,...
426966,2019-12-31,23,충청북도,제천시,영천동,3
426967,2019-12-31,23,충청북도,제천시,장락동,6
426968,2019-12-31,23,충청북도,제천시,천남동,2
426969,2019-12-31,23,충청북도,제천시,청전동,5


### Pandas DF로 분석하기
---
- 광역시도 별 주문건수 top5 도시를 추출해주세요
- 분석 결과를 parquet 저장해주세요

In [17]:
pdf["배달상점광역시도명"].unique()

array(['경기도', '경상남도', '대전광역시', '서울특별시', '전라남도', '전라북도', '제주특별자치도', '충청남도',
       '충청북도', '경상북도', '대구광역시', '강원도', '부산광역시', '광주광역시', '인천광역시', '울산광역시',
       '세종특별자치시'], dtype=object)

In [18]:
pdf2 = pdf.groupby("배달상점광역시도명").sum().sort_values(by="주문건수", ascending=False).head(5)

In [19]:
pdf2

Unnamed: 0_level_0,시간대별시간,주문건수
배달상점광역시도명,Unnamed: 1_level_1,Unnamed: 2_level_1
경기도,2143549,1805058
서울특별시,1156813,751996
경상남도,909493,339035
경상북도,607105,170346
전라북도,219177,139037


In [20]:
pdf2.sort_values(by="주문건수", ascending=False).head(5)

Unnamed: 0_level_0,시간대별시간,주문건수
배달상점광역시도명,Unnamed: 1_level_1,Unnamed: 2_level_1
경기도,2143549,1805058
서울특별시,1156813,751996
경상남도,909493,339035
경상북도,607105,170346
전라북도,219177,139037


In [21]:
result_pdf = pdf.groupby("배달상점광역시도명").sum().sort_values(by="주문건수", ascending=False).head(5)

In [31]:
result_pdf.to_parquet(path="./city_top5.parquet")

### Koalas

In [23]:
kdf = df.to_koalas()

In [24]:
kdf.head(10)

Unnamed: 0,날짜,시간대별시간,배달상점광역시도명,배달상점시군구명,배달상점법정동명,주문건수
0,2019-08-01,0,경기도,고양시 일산동구,마두동,4
1,2019-08-01,0,경기도,고양시 일산동구,백석동,28
2,2019-08-01,0,경기도,고양시 일산동구,식사동,3
3,2019-08-01,0,경기도,고양시 일산동구,장항동,4
4,2019-08-01,0,경기도,고양시 일산동구,정발산동,2
5,2019-08-01,0,경기도,고양시 일산서구,일산동,7
6,2019-08-01,0,경기도,고양시 일산서구,탄현동,3
7,2019-08-01,0,경기도,광명시,광명동,9
8,2019-08-01,0,경기도,광명시,철산동,13
9,2019-08-01,0,경기도,광명시,하안동,7


In [25]:
kdf.shape

(426971, 6)

In [26]:
result = kdf.groupby("배달상점광역시도명").sum().sort_values(by="주문건수", ascending=False).head(5)

In [30]:
result.to_parquet("koalas_result")

Py4JJavaError: An error occurred while calling o857.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:202)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:136)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:160)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:157)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 644, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\chahn\Documents\이어드림\Spark\koalas_result\_temporary\0\_temporary\attempt_20211029192941_0017_m_000000_644\part-00000-bead0907-8029-4d6b-8249-78eac8a72b2b-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:240)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:171)
	... 32 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\chahn\Documents\이어드림\Spark\koalas_result\_temporary\0\_temporary\attempt_20211029192941_0017_m_000000_644\part-00000-bead0907-8029-4d6b-8249-78eac8a72b2b-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:240)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


---
본 페이지를 통해 제공하는 모든 자료는 저작권법에 의해 보호받는 ㈜엘리스의 자산이며, 무단 사용 및 도용, 복제 및 배포를 금합니다.

해당 실습에 활용된 데이터는 [KT 통신 빅데이터 플랫폼, 시간-지역별 배달 주문건수](https://www.bigdata-telecom.kr/invoke/SOKBP2603/?goodsCode=KGUTIMEORDER)의 자료를 활용하였습니다.

Copyright 2021 엘리스 Inc. All rights reserved.