In [1]:
"""
스파크를 사용하면 빅데이터 프로그램을 쉽게 개발할 수 있음.
spark-submit 명령을 사용해 대화형 셸에서 개발한 프로그램을 운영용 애플리케이션으로 쉽게 전환할 수 있음.
사용자는 스파크가 지원하는 프로그래밍 언어로 애플리케이션을 개발한 다음 실행 할 수 있음.
가장 간단한 방법은 로컬 머신에서 애플리케이션을 실행하는것으로, 아래는 스파크와 함께 제공되는 스칼라 애플리케이션 예제를 실행하는 예제이다.

파이값을 특정 자릿수까지 계산하는 애플리케이션으로 매직커맨드로 실행
"""
%run C:/Users/Namsik/Anaconda3/Lib/site-packages/pyspark/examples/src/main/python/pi.py

Pi is roughly 3.144300


```
# %load C:/Users/Namsik/Anaconda3/Lib/site-packages/pyspark/examples/src/main/python/pi.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()
```

In [2]:
import os
import pyspark
import findspark
from pyspark.sql import SparkSession

In [4]:
try:
    findspark.init()
except:
    try:
        print(os.environ['SPARK_HOME'])
    except:
        os.environ['SPARK_HOME'] = 'C:\spark-3.0.0-preview2-bin-hadoop2.7\spark-3.0.0-preview2-bin-hadoop2.7'
        print(os.environ['SPARK_HOME'])
    try:
        print(os.environ['JAVA_HOME'])
    except KeyError:
        os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_231'
        print(os.environ['JAVA_HOME'])
    print(os.environ['PATH'])

C:\spark-3.0.0-preview2-bin-hadoop2.7\spark-3.0.0-preview2-bin-hadoop2.7
C:\Program Files\Java\jdk1.8.0_231
C:\Users\Namsik\Anaconda3;C:\Users\Namsik\Anaconda3\Library\mingw-w64\bin;C:\Users\Namsik\Anaconda3\Library\usr\bin;C:\Users\Namsik\Anaconda3\Library\bin;C:\Users\Namsik\Anaconda3\Scripts;C:\Users\Namsik\Anaconda3\bin;C:\Users\Namsik\Anaconda3\condabin;C:\Program Files\Rockwell Software\RSCommon;C:\Program Files (x86)\Common Files\Oracle\Java\javapath;C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v10.1\bin;C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v10.1\libnvvp;C:\windows\system32;C:\windows;C:\windows\System32\Wbem;C:\windows\System32\WindowsPowerShell\v1.0;C:\windows\System32\OpenSSH;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;C:\Program Files\NVIDIA Corporation\NVIDIA NvDLISR;C:\Program Files\NVIDIA Corporation\Nsight Compute 2019.4.0;";C:\Users\Namsik\AppData\Local\Microsoft\WindowsApps\";C:\Program Files\Git\cmd;C:\Program Files\Intel\WiFi\bin;C:\Pro

In [5]:
if 'spark_data' in os.listdir():pass
else:
    import wget
    import zipfile
    wget.download('https://www.dropbox.com/sh/5c9daz43f5lstzd/AAAPN_iMkKzgqJ1k3tml7FNHa?dl=1')
    with zipfile.ZipFile('spark_data.zip', 'r') as zip_ref:
        zip_ref.extractall('spark_data')

In [6]:
spark = SparkSession.builder.master("local[*]")\
.appName('exmaple_app')\
.getOrCreate()

In [8]:
"""
정적 데이터셋의 데이터를 분석해 DataFrame을 생성함.
이때 정적 데이터셋의 스키마도 함께 생성하며 스트림 처리과정에서 스키마를 추론하는 방법은 5장에서 자세히 알아봄.

아래의 데이터는 시계열 데이터이기에 데이터를 그룹화하고 집계하는 방법을 알아볼 필요가 있음.
이를 위해 특정 고객이 대량으로 구매하는 영업시간을 살펴보고자함.
예를 들어 총 구매비용 컬럼을 추가하고 고객이 가장 많이 소비한 날을 찾음.
"""

staticDataFrame = spark.read.format('csv')\
.option('header','true')\
.option('inferSchema','true')\
.load('spark_data/retail-data/by-day/*.csv')

staticDataFrame.createOrReplaceTempView('retail_data')
staticSchema = staticDataFrame.schema

In [9]:
staticDataFrame.show(10)
print(staticDataFrame.count())

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [10]:
"""
윈도우 함수는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성함.
윈도우는 간격을 통해 처리 요건을 명시할 수 이기 때문에 날짜와 타임스탬프 처리에 유용함.
스파크는 관련 날짜의 데이터를 그룹화함
"""

from pyspark.sql.functions import window, col

staticDataFrame.selectExpr('CustomerID','(UnitPrice * Quantity) as total_cost','InvoiceDate')\
.groupby(col('CustomerID'),window(col('InvoiceDate'),'1 day'))\
.sum('total_cost').show(5)

+----------+--------------------+-----------------+
|CustomerID|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 09:00...|            -37.6|
|   14126.0|[2011-11-29 09:00...|643.6300000000001|
|   13500.0|[2011-11-16 09:00...|497.9700000000001|
|   17160.0|[2011-11-08 09:00...|516.8499999999999|
|   15608.0|[2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [11]:
"""
번외. 위 코드를 뜯어보자

그리고 날짜를 바탕으로 정렬해보자
"""

from pyspark.sql.functions import desc

staticDataFrame.show(5)
print('='*100)
first_action = staticDataFrame.selectExpr('CustomerID','(UnitPrice * Quantity) as total_cost','InvoiceDate')
first_action.show(5)
## staticDataFrame에서 CustomerID와 UnitPrice * Quantity인 total_cost, InvoiceDAte 컬럼만 추출
print('='*100)
second_action = first_action.groupby(col('CustomerID'),window(col('InvoiceDate'),'1 day')).sum('total_cost')
second_action.show(5)
## 추출된 DataFrame에서 1일단위의 일자별 구매ID별 total_cost 합계를 추출
print('='*100)
third_action = second_action.sort(desc('window'))
third_action.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows

+--

In [12]:
"""
로컬모드로 이 코드를 실행하려면 로컬 모드에 적합한 셔플 파티션 수를 설정하는 것이 좋음.
셔플 파티션 수는 셔플 이후에 생성될 파티션 수를 의미함. 기본값은 200이지만 로컬모드에서는 많이 필요없으므로 5로 설정
2장에서 한번다뤘으니 참고
"""

spark.conf.set('spark.sql.shuffle.paririons','5')

In [13]:
"""
위의 코드를 스트리밍 형식으로 살펴보고자함.
코드는 거의 바뀌지않으며 read메서드 대신 readStream 메서드를 사용하는것과 maxFilesPerTrigger 옵션을 추가로 지정하는것이 차이점.
maxFilesPerTrigger를 통해 한번에 읽을 파일수를 설정할 수 있음.
이번 예제를 스트리밍답게 만드는 옵션이지만 운영 환경에 적용하는것은 추천하지 않음.
"""

streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option('maxFilesPerTrigger',1)\
.format('csv')\
.option('header','true')\
.load('spark_data/retail-data/by-day/*.csv')

In [14]:
"""
만들어진 DataFrame이 스트리밍 유형인지 확인
"""
streamingDataFrame.isStreaming

True

In [15]:
"""
기존 DataFrame처리와 동일한 비즈니스 로직을 적용.
아래의 코드는 총 판매 금액을 계산
이 작업 역시 지연 연산이므로 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야함
"""

purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr('CustomerID','(UnitPrice * Quantity) as total_cost','InvoiceDate')\
.groupby(col('CustomerID'),window(col('InvoiceDate'),'1 day'))\
.sum('total_cost')

In [20]:
"""
스트리밍 액션은 어딘가에 데이터를 채워 넣어야하므로 count 메서드와 같은 일반적인 정적 액션과는 조금 다른 특성을 가짐.
여기서 사용할 스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장함.
아래 코드의 경우 파일마다 트리거를 실행함. 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신하므로
언제나 가장 큰값을 얻을 수 있음

안되서....추후 원인을 알면 수정하겠습니다...colab에선 되는데 잘모르겠네요. 제 local 경로 문제인것같습니다.
"""

# purchaseByCustomerPerHour.writeStream\
# .format("memory")\
# .queryName("customer_purchases")\
# .outputMode("complete")\
# .start()

# spark.sql("""SELECT * FROM customer_purchases ORDER BY 'sum(total_cost)' DESC""").show(5)

'\n스트리밍 액션은 어딘가에 데이터를 채워 넣어야하므로 count 메서드와 같은 일반적인 정적 액션과는 조금 다른 특성을 가짐.\n여기서 사용할 스트리밍 액션은 트리거가 실행된 다음 데이터를 갱신하게 될 인메모리 테이블에 데이터를 저장함.\n아래 코드의 경우 파일마다 트리거를 실행함. 스파크는 이전 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신하므로\n언제나 가장 큰값을 얻을 수 있음\n\n안되서....추후 원인을 알면 수정하겠습니다...colab에선 되는데 잘모르겠네요. 제 local 경로 문제인것같습니다.\n'

In [22]:
"""
스파크는 데이터 전처리에 사용하는 다양한 메서드를 제공함. 아래 예제는 원본 데이터를 올바른 포맷으로 만드는
트랜스포메이션을 정의하고 실제로 모델을 학습한 다음 예측을 수행
"""
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [34]:
"""
결측치를 0으로 채우고, 요일 컬럼에 해당날짜의 요일값 추가
"""

from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn('day_of_week',date_format(col('InvoiceDate'),'EEEE'))\
.coalesce(5)