In [1]:
sc

In [2]:
#집계 함수
#키나 그룹을 지정하고 하나 이상의 컬럼을 변환, 여러 입력값이 주어지면 그룹별로 결과를 생성
#평균값, 수치형 데이터 요약, 합산, 곱셈, 카운팅 등

In [3]:
#리파티셔닝, 캐싱 (파티션 수를 줄이는 이유: 적은 양의 데이터를 가진 수많은 파일이 존재하기 때문)
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("file:///home/ubuntu/Spark-The-Definitive-Guide/data/retail-data/all/*.csv")\
  .coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

In [4]:
df.count()
#count메서드는 트랜스포메이션이 아닌 액션이므로 결과를 즉시 반환

541909

In [5]:
#집계함수
#count
#두 가지 방식 -> 특정 컬럼 지정, 또는 count(*)나 count(1) 사용
from pyspark.sql.functions import count
df.select(count("StockCode")).show() # 541909

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [6]:
#countDistinct
#전체 레코드 수가 아닌 고유 레코드 수를 구해야 할 때 사용, 개별 컬럼을 처리하는 데 더 적합
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show() # 4070

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [7]:
#approx_count_distinct
#어느 정도 수준의 정확도를 가지는 근사치만으로도 유의미할 때 사용하여 근사치 계산
#최대 추정 오류율이라는 파라미터 사용
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() # 3364

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [8]:
#first, last
#로우를 기반으로 동작
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [9]:
#min, max
from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [10]:
#sum
#특정 컬럼의 모든 값 합산
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show() # 5176450

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [11]:
#sumDistinct
#고윳값 합산
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show() # 29310

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [12]:
#avg
from pyspark.sql.functions import sum, count, avg, expr

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
  .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



In [13]:
#분산과 표준편차
#variance, stddev -> 표본표준분산, 표본표준편차
#var_pop, stddev_pop -> 모표준분산, 모표준편차
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
  stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660923| 47559.39140929892|  218.08095663447835|   218.08115785023455|
+-----------------+------------------+--------------------+---------------------+



In [14]:
#비대칭도와 첨도
#데이터의 변곡점을 측정하는 방법
#비대칭도: 데이터 평균의 비대칭 정도 측정
#첨도: 데이터 끝 부분을 측정
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.26407557610528376|119768.05495530753|
+--------------------+------------------+



In [15]:
#공분산과 상관관계
#cov: 공분산
#corr: 상관관계
#covar_pop -> 모공분산
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
    covar_pop("InvoiceNo", "Quantity")).show()


+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085640497E-4|             1052.7280543915997|            1052.7260778754955|
+-------------------------+-------------------------------+------------------------------+



In [16]:
#복합 데이터 타입의 집계
from pyspark.sql.functions import collect_set, collect_list
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



In [18]:
#그룹화
#카테고리형 데이터 사용
#하나 이상의 컬럼을 그룹화하고(-> RelationalGroupedDataset 반환), 집계 연산을 수행(DataFrame 반환)
df.groupBy("invoiceNo", "CustomerId").count().show()

+---------+----------+-----+
|invoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
|   544578|     12365|    1|
|   545165|     16339|   20|
|   545289|     14732|   30|
+---------+----------+-----+
only showing top 20 rows



In [19]:
#표현식을 이용한 그룹화
#agg-> 여러 집계 처리를 한 번에 지정, 집계에 표현식을 사용할 수 있음
from pyspark.sql.functions import count

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



In [20]:
#맵을 이용한 그룹화
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
  .show()


+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

In [21]:
#윈도우 함수
#데이터의 특정 '윈도우'를 대상으로 고유의 집계 연산을 수행
#'윈도우': 현재 데어터에 대한 참조를 사용해 정의
#윈도우 명세는 함수에 전달될 로우를 결정
#groupby와의 차이점: 프레임(로우 그룹 기반의 테이블에 입력되는 모든 로우에 대한 결괏값 계산
#스파크는 랭크함수, 분석함수, 집계함수 3가지 종류 윈도우 함수 지원

In [22]:
#(윈도우 예제)
#주문일자 컬럼을 변환해 'date'컬럼 만들기
from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [23]:
#(윈도우 예제)
#윈도우 명세 만들기
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
  .partitionBy("CustomerId", "date")\
  .orderBy(desc("Quantity"))\
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [24]:
#(윈도우 예제)
#집계 함수를 사용
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [25]:
#(윈도우 예제)
#구매량 순위 만들기
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [26]:
#(윈도우 예제)
#select 메서드를 사용해 계산된 윈도우값 확인
from pyspark.sql.functions import col

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

In [27]:
#그룹화 셋
#여러 집계를 결합하는 저수준 기능
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [28]:
#롤업
#group-by 스타일의 다양한 연산을 수행할 수 잇는 다차원 집계 기능
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
  .orderBy("Date")
rolledUpDF.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|          EIRE|           243|
|2010-12-01|   Netherlands|            97|
|2010-12-01|       Germany|           117|
|2010-12-01|     Australia|           107|
|2010-12-01|        France|           449|
|2010-12-01|        Norway|          1852|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|          null|         26814|
|2010-12-02|          EIRE|             4|
|2010-12-02|          null|         21023|
|2010-12-02|       Germany|           146|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|   Switzerland|           110|
|2010-12-03|        France|           239|
|2010-12-03|       Germany|           170|
|2010-12-03|       Belgium|           528|
|2010-12-03|         Spain|           400|
|2010-12-03|        Poland|           140|
|2010-12-03|          null|         14830|
+----------

In [29]:
#큐브
#롤업을 고차원적으로 사용할 수 있게 해줌
#모든 차원에 대한 동일한 작업 수행
from pyspark.sql.functions import sum

dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|            Portugal|        16180|
|null|           Australia|        83653|
|null|             Finland|        10666|
|null|                 RSA|          352|
|null|               Japan|        25218|
|null|United Arab Emirates|          982|
|null|           Hong Kong|         4769|
|null|             Germany|       117448|
|null|             Lebanon|          386|
|null|                null|      5176450|
|null|      Czech Republic|          592|
|null|              Norway|        19247|
|null|  European Community|          497|
|null|              Cyprus|         6317|
|null|           Singapore|         5234|
|null|     Channel Islands|         9479|
|null|         Unspecified|         3300|
|null|             Denmark|         8188|
|null|               Spain|        26824|
|null|                 USA|         1034|
+----+--------------------+-------

In [30]:
#그룹화 메타데이터
#grouping_id: 결과 데이터셋의 집계 수준을 명시하는 컬럼 제공
#그룹화 ID -> p.221

In [31]:
#피벗
#로우를 컬럼으로 변환
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [32]:
#사용자 정의 집계 함수
#직접 제작한 함수나 비즈니스 규칙에 기반을 둔 자체 집계 함수를 정의하는 방법
#UDAF를 사용해서 입력 데이터 그룹에 직접 개발한 연산을 수행할 수 있음
#입력 데이터의 모든 구룹의 중간 결과를 단일 AggregationBuffer에 저장해 관리
#p.223-224 참고