In [2]:
# 로컬이니까 파티션 수를 줄인다.
spark.conf.set("spark.sql.shuffle.partitions", "5")

# 예제 파일 가져오자!
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/Users/talysa/Documents/Study/Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

# 가져온 데이터 테이블로 저장하자. 스키마도 뽑아보자.
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema=staticDataFrame.schema
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [3]:
# 윈도우 함수 쓸거다. 임포트하자.
from pyspark.sql.functions import window, col

# 고객 아이디, 총 지른 금액, 지른 시간 정보 뽑아온다.
# 고객 아이디 기준으로 정렬하는데, 지른 시간 기준으로 합산? 일단 돌려보자.
staticDataFrame.selectExpr(
    "CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
.groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(10)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   13417.0|[2011-12-04 09:00...|            404.83|
|   15358.0|[2011-12-05 09:00...| 830.0600000000003|
|   15392.0|[2011-12-05 09:00...|304.40999999999997|
|   15290.0|[2011-12-05 09:00...|263.02000000000004|
|   16811.0|[2011-12-05 09:00...|             232.3|
|   12748.0|[2011-12-05 09:00...| 363.7899999999999|
|   16500.0|[2011-12-05 09:00...| 52.74000000000001|
|   16873.0|[2011-12-05 09:00...|1854.8300000000002|
|   14060.0|[2011-12-05 09:00...|297.47999999999996|
|   14649.0|[2011-12-05 09:00...| 513.9899999999998|
+----------+--------------------+------------------+
only showing top 10 rows



In [4]:
# 스트리밍을 해보자. read 대신 readStream을 쓴다.
# 아까 빼논 스키마 쌔벼쓰자!
# streamingDataFrame=spark.readStream\
#    .schema(staticSchema)\
#    .option("maxFilesPerTrigger", 1)\
#    .format("csv")\
#    .option("header", "true")\
#    .load("/Users/talysa/Documents/Study/Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

# 스트리밍 유형인지 확인. 맞으면 true 반환.
# streamingDataFrame.isStreaming

# 이것도 맥북을 이륙시켜서 주석 처리.

In [15]:
# 스트리밍으로 아까 뽑은 데이터 다시 뽑아보자. 
# show는 못한다. 했더니 에러 난다! 지연 연산이라서 액션을 호출하기 전엔 스파크가 안 돌려서 그런다.
# purchaseByCustomerPerHour = streamingDataFrame\
# .selectExpr(
#     "CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
# .groupBy(
#     col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
# .sum("total_cost")

In [24]:
# 위에 코드는 이렇게 돌리라고 시켜야 돌린다. 메모리 어딘가에 쓰라고 시켰다. 한 번 실행하고 나서 두 번째 실행하면 에러난다! 이미 있다고!
# 콘솔에 작업한 내용 보여지게 하고 싶으면  format의 값을 "console"로 변경한다. 
# 근데 이렇게 하면 맥북 이륙함. 그래서 주석처리 해 놓았다.
# purchaseByCustomerPerHour.writeStream\
#    .format("memory")\
#    .queryName("customer_purchases")\
#    .outputMode("complete")\
#    .start()

In [25]:
# 아무튼 한 번 돌려서 메모리에 스트리밍한 거 올라갔다. 뽑아보자.
# spark.sql("""
#    SELECT *
#    FROM customer_purchases
#    ORDER BY 'sum total_cost' DESC
# """)\
# .show(10)

In [5]:
# 머!신!러!닝!을 해보자.
# 임포트하자.
from pyspark.sql.functions import date_format, col

# 파일에서 불러온 데이터셋을 날짜 가지고 뭔가 한다. 이게 뭐지??? 찾아보니 날짜 컬럼 가지고 합치나보다.
preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
    .coalesce(5)
preppedDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|     Monday|
+---------+---------+-------------------

In [7]:
# 데이터를 학습용과 테스트용으로 나눈다.
trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

#각각 몇 개인지 보자!
print("Train : ", trainDataFrame.count())
print("Test : ", testDataFrame.count())

Train :  245903
Test :  296006


In [8]:
# 트랜스포메이션
from pyspark.ml.feature import StringIndexer

# 요일을 수치형으로 반환, 월요일은 1, 토요일은 6
indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

In [10]:
# 어떤 요일인지 여부를 알려주는 식으로 하려면 이렇게 하면 된다.
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_encoded")

In [15]:
# 입력 데이터를 벡터로 구성하자! 품목 가격, 수량, 요일을 넣었다. 결과물은 features라는 이름의 벡터 하나로 뱉는다.
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
    .setOutputCol("features")

In [17]:
# 형 변환을 시키는 파이프라인 세팅
# 요일은 숫자로, 그 다음 요일 확인하고, 벡터에 집어 넣는 식인가보다.
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [19]:
# 학습 데이터 형태를 잡아줄 파이프라인을 만든다. 아까 만든 파이프라인을 집어 넣음.
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [46]:
# 이제야 다 다듬어진 학습 데이터를 준비!
transformedTraining = fittedPipeline.transform(trainDataFrame)

# 데이터 캐싱은 이렇게 한다는데 이번 챕터에서는 빠졌다. 아무튼 실행해보면 각 컬럼의 자료형을 알 수 있다. 
# transformedTraining.cache()

In [47]:
# 머신러닝을 실행할 인스턴스를 만든다. 오오 인스턴스 오오
# KMeans라는 알고리즘을 써서 KMeansModel을 만들거다!
from pyspark.ml.clustering import KMeans

# .setK에서 에러나! 그래서 좀 고침.
kmeans = KMeans().setK(2).setSeed(1)
kmModel = kmeans.fit(transformedTraining)

In [48]:
# 학습 후 비용을 계산해보자. 군집 비용이라는 거라는데 먹는 건가...
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)

8921808206.576424

In [50]:
# 이번에는 DataFrame 말고 RDD를 가지고 놀자.
# 병렬로 RDD를 맹그러서 DF로 변환까지 시킨다.
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]

In [None]:
# SparkR은 패스. 아직 R 책 다 안 봤다.
# 아무튼 스트리밍은 계속 막 퍼오고 난리도 아니라서 맥북이 이륙한다. 자세한 사용법을 알고 써야겠다.