In [0]:
my_range = spark.range(1000).toDF('number')

In [0]:
divisBy2 = my_range.where("number % 2 = 0")

In [0]:
divisBy2.count()

Out[4]: 500

In [0]:
flightData2015 = spark \
                    .read\
                    .option("inferSchema", "true")\
                    .option("header", "true")\
                    .csv("/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv")

In [0]:
flightData2015.take(3)

Out[6]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [0]:
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#57 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#57 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=92]
      +- FileScan csv [DEST_COUNTRY_NAME#55,ORIGIN_COUNTRY_NAME#56,count#57] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2015-s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

Out[8]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "10")
flightData2015.sort("count").take(2)

Out[9]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "1")
flightData2015.sort("count").take(2)

Out[10]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [0]:
# sql

flightData2015.createOrReplaceTempView("flight_data_2015")

In [0]:
sqlWay = spark.sql("""
    select DEST_COUNTRY_NAME, count(1)
    from flight_data_2015
    group by DEST_COUNTRY_NAME
""")


dataFrameWay = flightData2015.groupBy("DEST_COUNTRY_NAME").count()



sqlWay.explain()
print("------------------------------")
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#55], functions=[finalmerge_count(merge count#86L) AS count(1)#74L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#55, 1), ENSURE_REQUIREMENTS, [plan_id=132]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#55], functions=[partial_count(1) AS count#86L])
         +- FileScan csv [DEST_COUNTRY_NAME#55] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2015-s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


------------------------------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#55], functions=[finalmerge_count(merge count#88L) AS count(1)#81L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#55, 1), ENSURE_REQUIREMENTS, [plan_id=153]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#55], functions=[p

In [0]:
spark.sql("select max(count) from flight_data_2015").take(1)

Out[13]: [Row(max(count)=370002)]

In [0]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

Out[14]: [Row(max(count)=370002)]

In [0]:
from pyspark.sql.functions import desc

flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [0]:
example = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)", "destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)

example.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#141L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#55,destination_total#141L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#55], functions=[finalmerge_sum(merge sum#145L) AS sum(count#57)#137L])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#55, 1), ENSURE_REQUIREMENTS, [plan_id=282]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#55], functions=[partial_sum(count#57) AS sum#145L])
            +- FileScan csv [DEST_COUNTRY_NAME#55,count#57] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/databricks-datasets/definitive-guide/data/flight-data/csv/2015-s..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [0]:
staticDataFrame = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("databricks-datasets/definitive-guide/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [0]:
staticDataFrame.show(3)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows



In [0]:
from pyspark.sql.functions import window, col

# 총 구매비용 칼럼을 추가하고 고객이 가장 많이 소비한 날 찾기

staticDataFrame\
    .selectExpr(
        #"CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        #col("CustomerId"), 
        window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .orderBy(col("sum(total_cost)").desc())\
    .show(5)

+--------------------+------------------+
|              window|   sum(total_cost)|
+--------------------+------------------+
|{2011-11-14 00:00...|112141.10999999996|
|{2011-09-20 00:00...|109286.20999999993|
|{2011-12-08 00:00...| 81417.77999999982|
|{2011-11-23 00:00...|  78480.6999999997|
|{2011-10-05 00:00...| 75244.42999999986|
+--------------------+------------------+
only showing top 5 rows



In [0]:
# 스트리밍을 위한 코드로 수정하기

streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("databricks-datasets/definitive-guide/data/retail-data/by-day/*.csv")

In [0]:
streamingDataFrame.isStreaming

Out[29]: True

In [0]:
purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), 
        window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .orderBy(col("sum(total_cost)").desc())

In [0]:
purchaseByCustomerPerHour.writeStream\
    .format("console")\ # .foramt(memory) : 메모리에
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

# 스파크가 데이터를 처라하는 시점이 아닌 이벤트 시간에 따라 윈도우를 구성하는 방식을 사용함
# 이를 통해 기존 스파크 스트리밍의 단점을 구조적 스트리밍으로 보완할 수 있음

Out[38]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc21020ab80>

In [0]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
# 데이터 불러오기
from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
    .coalesce(5)

In [0]:
# 학습, 테스트 데이터 셋 구분(TranValidationSplit, CrossValidator ..)

trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

In [0]:
trainDataFrame.count()

Out[46]: 245903

In [0]:
testDataFrame.count()

Out[47]: 296006

In [0]:
# string to indexer

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

In [0]:
# one-hot encoder 1, 0
# sklearn과는 다르게 string to one-hot으로 바로 변환 불가능
# 숫자로 바꿔준 후 해야함(setInputCol에 day_of_week이 아니라, day_of_week_index가 들어간 이유)

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_index_encoded")

In [0]:
#spark의 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용

from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice", "Quantity", "day_of_week_index_encoded"])\
    .setOutputCol("features")

In [0]:
# 파이프라인 설정

from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [0]:
# 학습 데이터 pipeline 태우기
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [0]:
# 데이터 변환
transformedTraning = fittedPipeline.transform(trainDataFrame)

In [0]:
# 캐싱 중간 변환된 데이터셋의 복사본을 메모리에 저장하고, 하이퍼 파라미터 튜닝값을 적용
transformedTraning.cache()

Out[75]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_index_encoded: vector, features: vector]

In [0]:
# 모델 학습

from pyspark.ml.clustering import KMeans

kmeans = KMeans()\
    .setK(20)\
    .setSeed(1)

In [0]:
# 학습 전 알고리즘 명칭 : Algorithm
# 학습 후 알고리즘 명칭 : AlgorithmModel

kmModel = kmeans.fit(transformedTraning)

In [0]:
# kmModel.computeCost(transformedTraning)
# conputeCost는 3.0이상 버전에서 사용되지 않음
# sklean과 비슷한 느낌으로 실루엣 계수로 판단 가능

from pyspark.ml.evaluation import ClusteringEvaluator

transformedTest = fittedPipeline.transform(testDataFrame)
predictions = kmModel.transform(transformedTest)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)

In [0]:
print(silhouette)

0.5427938390491533


In [0]:
# tutorial만 보자면, python np, pd 사용법이랑 크게 다르지는 않아보임
# stream과 분산 처리가 추가된 느낌

# 저수준 API RDD : 원시 데이터를 읽거나 다루는 용도로 RDD 사용 가능, 하지만 구조적 API를 사용하는 것이 좋음
# 지금까지로는 numpy, pandas -> 구조적 API
# python code -> 저수준 API 처럼 느껴짐

# 더 공부해보면서 알아가자~