In [2]:
!pip install pyspark 



- appName() : set application name 
- getOrCreate() : returns a Spark Session 
- master() : cluster 위에서 실행시키는 경우 master 이름을 사용할 수 있다. 
- local[x] 라는 이름은 Standalone mode 에서 사용가능하다 
- x 값은 숫자값이 되어야 하고 0보다 커야 한다 
- 이는 RDD, DataFrame, Dataset 을 생성할 때 파티션의 개수를 뜻한다 

In [3]:
import pyspark 
from pyspark.sql import SparkSession 
# SparkSession : entry point to programming Spark with the Dataset and DataFrame API 

spark = SparkSession.builder.appName('SparkExample.com').getOrCreate() 


### Spark RDD의 동작 2가지 
1. Transformation : map(), filter()
2. Action : collect(), count()

In [4]:
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

# Transformation 입니다. 
df = spark.read.load("C://Users\MZC01-MINJIWOO\Downloads\customer_dataset\marketing_campaign.csv",
                    format="csv",
                    sep="\t",
                    inferSchema="true",
                    header="true")

In [5]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: string (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- AcceptedCmp3: integer (nullable = true)
 |-- AcceptedCmp4: integer (nullable = true)
 |-- AcceptedC

In [6]:
df.count()

2240

In [7]:
df.toPandas()

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,...,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,5524,1957,Graduation,Single,58138.0,0,0,04-09-2012,58,635,...,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344.0,1,1,08-03-2014,38,11,...,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613.0,0,0,21-08-2013,26,426,...,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646.0,1,0,10-02-2014,26,11,...,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293.0,1,0,19-01-2014,94,173,...,5,0,0,0,0,0,0,3,11,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2235,10870,1967,Graduation,Married,61223.0,0,1,13-06-2013,46,709,...,5,0,0,0,0,0,0,3,11,0
2236,4001,1946,PhD,Together,64014.0,2,1,10-06-2014,56,406,...,7,0,0,0,1,0,0,3,11,0
2237,7270,1981,Graduation,Divorced,56981.0,0,0,25-01-2014,91,908,...,6,0,1,0,0,0,0,3,11,0
2238,8235,1956,Master,Together,69245.0,0,1,24-01-2014,8,428,...,3,0,0,0,0,0,0,3,11,0


In [8]:
# Transformation - 아직 실행되지 않는다. 
# 사용자가 데이터를 나중에 가공하겠다는 요청을 해놓는 것을 Transformation 이라고 함.
dfSelected = df.select(
    col("ID").alias("id"),
    col("Year_Birth").alias("year_birth"),
    col("Education").alias("education"),
    col("Kidhome").alias("count_kid"),
    col("Teenhome").alias("count_teen"),
    col("Dt_Customer").alias("date_customer"),
    col("Recency").alias("days_last_login")
)
dfSelected

DataFrame[id: int, year_birth: int, education: string, count_kid: int, count_teen: int, date_customer: string, days_last_login: int]

In [9]:
dfSelected.explain("formatted")

== Physical Plan ==
* Project (2)
+- Scan csv  (1)


(1) Scan csv 
Output [7]: [ID#17, Year_Birth#18, Education#19, Kidhome#22, Teenhome#23, Dt_Customer#24, Recency#25]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/MZC01-MINJIWOO/Downloads/customer_dataset/marketing_campaign.csv]
ReadSchema: struct<ID:int,Year_Birth:int,Education:string,Kidhome:int,Teenhome:int,Dt_Customer:string,Recency:int>

(2) Project [codegen id : 1]
Output [7]: [ID#17 AS id#138, Year_Birth#18 AS year_birth#139, Education#19 AS education#140, Kidhome#22 AS count_kid#141, Teenhome#23 AS count_teen#142, Dt_Customer#24 AS date_customer#143, Recency#25 AS days_last_login#144]
Input [7]: [ID#17, Year_Birth#18, Education#19, Kidhome#22, Teenhome#23, Dt_Customer#24, Recency#25]




In [10]:
# Transformation 
dfConverted = df.withColumn("date_joined",
                           add_months(to_date(col("dt_customer"), 
                                              "d-M-yyyy"), 1))
dfConverted

DataFrame[ID: int, Year_Birth: int, Education: string, Marital_Status: string, Income: int, Kidhome: int, Teenhome: int, Dt_Customer: string, Recency: int, MntWines: int, MntFruits: int, MntMeatProducts: int, MntFishProducts: int, MntSweetProducts: int, MntGoldProds: int, NumDealsPurchases: int, NumWebPurchases: int, NumCatalogPurchases: int, NumStorePurchases: int, NumWebVisitsMonth: int, AcceptedCmp3: int, AcceptedCmp4: int, AcceptedCmp5: int, AcceptedCmp1: int, AcceptedCmp2: int, Complain: int, Z_CostContact: int, Z_Revenue: int, Response: int, date_joined: date]

In [11]:
dfConverted.explain("formatted")

== Physical Plan ==
* Project (2)
+- Scan csv  (1)


(1) Scan csv 
Output [29]: [ID#17, Year_Birth#18, Education#19, Marital_Status#20, Income#21, Kidhome#22, Teenhome#23, Dt_Customer#24, Recency#25, MntWines#26, MntFruits#27, MntMeatProducts#28, MntFishProducts#29, MntSweetProducts#30, MntGoldProds#31, NumDealsPurchases#32, NumWebPurchases#33, NumCatalogPurchases#34, NumStorePurchases#35, NumWebVisitsMonth#36, AcceptedCmp3#37, AcceptedCmp4#38, AcceptedCmp5#39, AcceptedCmp1#40, AcceptedCmp2#41, Complain#42, Z_CostContact#43, Z_Revenue#44, Response#45]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/MZC01-MINJIWOO/Downloads/customer_dataset/marketing_campaign.csv]
ReadSchema: struct<ID:int,Year_Birth:int,Education:string,Marital_Status:string,Income:int,Kidhome:int,Teenhome:int,Dt_Customer:string,Recency:int,MntWines:int,MntFruits:int,MntMeatProducts:int,MntFishProducts:int,MntSweetProducts:int,MntGoldProds:int,NumDealsPurchases:int,NumWebPurchases:int,NumCatalogPurchases:int

In [12]:
dfConverted.explain("cost")

== Optimized Logical Plan ==
Project [ID#17, Year_Birth#18, Education#19, Marital_Status#20, Income#21, Kidhome#22, Teenhome#23, Dt_Customer#24, Recency#25, MntWines#26, MntFruits#27, MntMeatProducts#28, MntFishProducts#29, MntSweetProducts#30, MntGoldProds#31, NumDealsPurchases#32, NumWebPurchases#33, NumCatalogPurchases#34, NumStorePurchases#35, NumWebVisitsMonth#36, AcceptedCmp3#37, AcceptedCmp4#38, AcceptedCmp5#39, AcceptedCmp1#40, ... 6 more fields], Statistics(sizeInBytes=220.0 KiB)
+- Relation [ID#17,Year_Birth#18,Education#19,Marital_Status#20,Income#21,Kidhome#22,Teenhome#23,Dt_Customer#24,Recency#25,MntWines#26,MntFruits#27,MntMeatProducts#28,MntFishProducts#29,MntSweetProducts#30,MntGoldProds#31,NumDealsPurchases#32,NumWebPurchases#33,NumCatalogPurchases#34,NumStorePurchases#35,NumWebVisitsMonth#36,AcceptedCmp3#37,AcceptedCmp4#38,AcceptedCmp5#39,AcceptedCmp1#40,... 5 more fields] csv, Statistics(sizeInBytes=215.0 KiB)

== Physical Plan ==
*(1) Project [ID#17, Year_Birth#18, 

- Transformation 요청을 Action 시점으로 미루는 것을 Spark 에서는 Lazy Evaluation 이라 부릅니다.
- Trnasformation 에서는 Spark 가 연산을 DAG에 추가한다. 
- 큰 데이터를 다룰 때 map, filter 함수가 즉시 연산이 된다면 문제 발생 
- One advantage of this is that Spark can make many optimization decisions after it had a chance to look at the DAG in entirety. 따라서 최적화 할 수 있는 연산의 경우 최적화를 하여 빅데이터 처리에 유리하도록 연산한다. 
- This helps avoid too much of computation and makes way for optimization.

In [13]:
dfSelected.rdd.id

<bound method RDD.id of MapPartitionsRDD[25] at javaToPython at NativeMethodAccessorImpl.java:0>

In [14]:
dfConverted.rdd.id

<bound method RDD.id of MapPartitionsRDD[31] at javaToPython at NativeMethodAccessorImpl.java:0>

- 물리적인 데이터는 여전히 csv 파일로 존재하며 RDD 액션을 수행할 때 데이터를 메모로 읽어와서 처리하게 된다. 
- Dataframe 은 논리적인 테이블이다. 

In [15]:
dfConverted.explain("extended")

== Parsed Logical Plan ==
'Project [ID#17, Year_Birth#18, Education#19, Marital_Status#20, Income#21, Kidhome#22, Teenhome#23, Dt_Customer#24, Recency#25, MntWines#26, MntFruits#27, MntMeatProducts#28, MntFishProducts#29, MntSweetProducts#30, MntGoldProds#31, NumDealsPurchases#32, NumWebPurchases#33, NumCatalogPurchases#34, NumStorePurchases#35, NumWebVisitsMonth#36, AcceptedCmp3#37, AcceptedCmp4#38, AcceptedCmp5#39, AcceptedCmp1#40, ... 6 more fields]
+- Relation [ID#17,Year_Birth#18,Education#19,Marital_Status#20,Income#21,Kidhome#22,Teenhome#23,Dt_Customer#24,Recency#25,MntWines#26,MntFruits#27,MntMeatProducts#28,MntFishProducts#29,MntSweetProducts#30,MntGoldProds#31,NumDealsPurchases#32,NumWebPurchases#33,NumCatalogPurchases#34,NumStorePurchases#35,NumWebVisitsMonth#36,AcceptedCmp3#37,AcceptedCmp4#38,AcceptedCmp5#39,AcceptedCmp1#40,... 5 more fields] csv

== Analyzed Logical Plan ==
ID: int, Year_Birth: int, Education: string, Marital_Status: string, Income: int, Kidhome: int, Teen

- Spark 는 실행 계획을 생성 하고 그것을 기반으로 분석하고, 최적화 한 뒤 데이터를 읽거나 가공하는 등 물리적으로 실행한다 
- RDD (Resilient Distributed Dataset) 는 DataFrame 과 유사하게 Spark 에서 분산 데이터를 다룰 수 있도록 제공하는 Low-level API
- RDD는 DataFrame이 column 기반으로 Table 형태로 데이터를 쉽게 다를 수 있도록 추상화 된 반면에 레코드 기반으로 데이터를 가공할 수 있는 low level API 를 제공

In [16]:
dfConverted.select("education").limit(5).explain('extended')

== Parsed Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
   +- Project [education#19]
      +- Project [ID#17, Year_Birth#18, Education#19, Marital_Status#20, Income#21, Kidhome#22, Teenhome#23, Dt_Customer#24, Recency#25, MntWines#26, MntFruits#27, MntMeatProducts#28, MntFishProducts#29, MntSweetProducts#30, MntGoldProds#31, NumDealsPurchases#32, NumWebPurchases#33, NumCatalogPurchases#34, NumStorePurchases#35, NumWebVisitsMonth#36, AcceptedCmp3#37, AcceptedCmp4#38, AcceptedCmp5#39, AcceptedCmp1#40, ... 6 more fields]
         +- Relation [ID#17,Year_Birth#18,Education#19,Marital_Status#20,Income#21,Kidhome#22,Teenhome#23,Dt_Customer#24,Recency#25,MntWines#26,MntFruits#27,MntMeatProducts#28,MntFishProducts#29,MntSweetProducts#30,MntGoldProds#31,NumDealsPurchases#32,NumWebPurchases#33,NumCatalogPurchases#34,NumStorePurchases#35,NumWebVisitsMonth#36,AcceptedCmp3#37,AcceptedCmp4#38,AcceptedCmp5#39,AcceptedCmp1#40,... 5 more fields] csv

== Analyzed Logical Plan ==
education: string
Global

- 데이터를 읽는 시점에 저장소 (또는 Parquet 와 같은 일부 파일 포맷) 단위에서 필터링이 가능하다면 더 적은양의 데이터만 읽어오면 되므로 네트워크 비용 절감 등 성능상의 이점이 있다. 

In [17]:
dfConverted.explain("codegen")

Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:1947; maxConstantPoolSize:145(0.22% used); numInnerClasses:0) ==
*(1) Project [ID#17, Year_Birth#18, Education#19, Marital_Status#20, Income#21, Kidhome#22, Teenhome#23, Dt_Customer#24, Recency#25, MntWines#26, MntFruits#27, MntMeatProducts#28, MntFishProducts#29, MntSweetProducts#30, MntGoldProds#31, NumDealsPurchases#32, NumWebPurchases#33, NumCatalogPurchases#34, NumStorePurchases#35, NumWebVisitsMonth#36, AcceptedCmp3#37, AcceptedCmp4#38, AcceptedCmp5#39, AcceptedCmp1#40, ... 6 more fields]
+- FileScan csv [ID#17,Year_Birth#18,Education#19,Marital_Status#20,Income#21,Kidhome#22,Teenhome#23,Dt_Customer#24,Recency#25,MntWines#26,MntFruits#27,MntMeatProducts#28,MntFishProducts#29,MntSweetProducts#30,MntGoldProds#31,NumDealsPurchases#32,NumWebPurchases#33,NumCatalogPurchases#34,NumStorePurchases#35,NumWebVisitsMonth#36,AcceptedCmp3#37,AcceptedCmp4#38,AcceptedCmp5#39,AcceptedCmp1#40,... 5 more fields] Batched: false

### Action 
- Transformation 이 누적되면서 데이터를 어떻게 가공할지가 DataFrame 의 실행 계획으로 기록된다. 
- 사용자는 최종 시점에 여태까지 작업했던 데이터를 보거나 다른 곳으로 저장하는 행동을 취하게 된다. 

In [18]:
print(f"Partition Count of Dataframe df:\t\t{df.rdd.getNumPartitions()}")
print(f"Partition Count of Dataframe dfSelected:\t\t{dfSelected.rdd.getNumPartitions()}")
print(f"Partition Count of Dataframe dfConverted:\t\t{dfConverted.rdd.getNumPartitions()}")

Partition Count of Dataframe df:		1
Partition Count of Dataframe dfSelected:		1
Partition Count of Dataframe dfConverted:		1


In [19]:
# partition 숫자 늘리기 
dfPartitioned = dfConverted.repartition(5)
print(f"Partition Count of Dataframe dfPartitioned:\t\t{dfPartitioned.rdd.getNumPartitions()}")

Partition Count of Dataframe dfPartitioned:		5


In [20]:
dfConverted.repartition(col("id"))

DataFrame[ID: int, Year_Birth: int, Education: string, Marital_Status: string, Income: int, Kidhome: int, Teenhome: int, Dt_Customer: string, Recency: int, MntWines: int, MntFruits: int, MntMeatProducts: int, MntFishProducts: int, MntSweetProducts: int, MntGoldProds: int, NumDealsPurchases: int, NumWebPurchases: int, NumCatalogPurchases: int, NumStorePurchases: int, NumWebVisitsMonth: int, AcceptedCmp3: int, AcceptedCmp4: int, AcceptedCmp5: int, AcceptedCmp1: int, AcceptedCmp2: int, Complain: int, Z_CostContact: int, Z_Revenue: int, Response: int, date_joined: date]

### repartition() vs coalesce() 
- repartition : Partition 을 늘리고 줄일 수 있음. 데이터 재배치가 발생함. 네트워크 비용이 비싸다. 
- coalesce : Partition 을 현재 숫자 이하로만 줄일 수 있음. 대신 줄이는 과정에서 옮길 필요가 없는 데이터가 있다면 옮기지 않으므로 비싼 네트워크 연산을 피할 수 있음. 

In [21]:
dfSelected.describe().show()

+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
|summary|                id|        year_birth|education|          count_kid|        count_teen|date_customer|  days_last_login|
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
|  count|              2240|              2240|     2240|               2240|              2240|         2240|             2240|
|   mean| 5592.159821428571|1968.8058035714287|     null|0.44419642857142855|           0.50625|         null|        49.109375|
| stddev|3246.6621975643416|11.984069456885827|     null| 0.5383980977345935|0.5445382307698761|         null|28.96245280837821|
|    min|                 0|              1893| 2n Cycle|                  0|                 0|   01-01-2013|                0|
|    max|             11191|              1996|      PhD|                  2|                 2| 

## spark 로 데이터 가공하기 

In [22]:
dfSelected.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year_birth: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- count_kid: integer (nullable = true)
 |-- count_teen: integer (nullable = true)
 |-- date_customer: string (nullable = true)
 |-- days_last_login: integer (nullable = true)



In [23]:
dfSelected.describe().show()

+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
|summary|                id|        year_birth|education|          count_kid|        count_teen|date_customer|  days_last_login|
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
|  count|              2240|              2240|     2240|               2240|              2240|         2240|             2240|
|   mean| 5592.159821428571|1968.8058035714287|     null|0.44419642857142855|           0.50625|         null|        49.109375|
| stddev|3246.6621975643416|11.984069456885827|     null| 0.5383980977345935|0.5445382307698761|         null|28.96245280837821|
|    min|                 0|              1893| 2n Cycle|                  0|                 0|   01-01-2013|                0|
|    max|             11191|              1996|      PhD|                  2|                 2| 

In [24]:
dfConverted1 = dfSelected.withColumn("count_children", coalesce("count_kid", lit(0)) + coalesce("count_teen", lit(0))) 

- coalesce() 함수에서 NULL 값 대신 default 값을 0 으로 설정한다. 

In [25]:
dfConverted1\
    .select(col("id"), col("count_kid"), col("count_teen"), col("count_children"))\
    .limit(5)\
    .show()

+----+---------+----------+--------------+
|  id|count_kid|count_teen|count_children|
+----+---------+----------+--------------+
|5524|        0|         0|             0|
|2174|        1|         1|             2|
|4141|        0|         0|             0|
|6182|        1|         0|             1|
|5324|        1|         0|             1|
+----+---------+----------+--------------+



In [26]:
# education column 의 값 종류 
dfConverted1\
    .select("education")\
    .distinct()\
    .show()

+----------+
| education|
+----------+
|  2n Cycle|
|       PhD|
|    Master|
|Graduation|
|     Basic|
+----------+



In [27]:
educationInvalid = '2n Cycle'
educationDefault = 'NONE'
# withColumn 으로 새로운 컬럼 추가 
dfConverted2 = dfConverted1.withColumn(
    "education",
    when(col("education") == educationInvalid, educationDefault).otherwise(col("education"))
)

In [28]:
dfConverted2.select("education").distinct().show()

+----------+
| education|
+----------+
|       PhD|
|    Master|
|Graduation|
|     Basic|
|      NONE|
+----------+



In [29]:
dfConverted2.show()

+----+----------+----------+---------+----------+-------------+---------------+--------------+
|  id|year_birth| education|count_kid|count_teen|date_customer|days_last_login|count_children|
+----+----------+----------+---------+----------+-------------+---------------+--------------+
|5524|      1957|Graduation|        0|         0|   04-09-2012|             58|             0|
|2174|      1954|Graduation|        1|         1|   08-03-2014|             38|             2|
|4141|      1965|Graduation|        0|         0|   21-08-2013|             26|             0|
|6182|      1984|Graduation|        1|         0|   10-02-2014|             26|             1|
|5324|      1981|       PhD|        1|         0|   19-01-2014|             94|             1|
|7446|      1967|    Master|        0|         1|   09-09-2013|             16|             1|
| 965|      1971|Graduation|        0|         1|   13-11-2012|             34|             1|
|6177|      1985|       PhD|        1|         0| 

- selectExpr() 로 SQL 문법 바로 사용하기 

In [30]:
dfConverted3 = dfConverted1.selectExpr("*", 
                f"CASE WHEN education == '{educationInvalid}' THEN '{educationDefault}' ELSE education END as education"
    )

In [31]:
dfWithJoined = dfConverted2.withColumn("date_joined", add_months(to_date(col("date_customer"), "d-M-yyyy"), 72))

In [32]:
# 기존 값에 6년을 더함 
dfWithJoined.select("date_customer", "date_joined").limit(5).show()

+-------------+-----------+
|date_customer|date_joined|
+-------------+-----------+
|   04-09-2012| 2018-09-04|
|   08-03-2014| 2020-03-08|
|   21-08-2013| 2019-08-21|
|   10-02-2014| 2020-02-10|
|   19-01-2014| 2020-01-19|
+-------------+-----------+



In [33]:
dfWithJoined.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year_birth: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- count_kid: integer (nullable = true)
 |-- count_teen: integer (nullable = true)
 |-- date_customer: string (nullable = true)
 |-- days_last_login: integer (nullable = true)
 |-- count_children: integer (nullable = false)
 |-- date_joined: date (nullable = true)



# DataFrame, Dataset and SQL

- DataFrame은 Dataset[Row] 이다. Row라는 타입의 Dataset이 DataFrame이다. 


- Dataset 은 Java  / Scala 언어로만 사용가능함. RDD와 같이 map, flatmap, filter 등 함수형 Collection API 의 일부를 사용할 수 있으며 DataFrame과 같이 SQL Optimizer 에 의해 최적화 됨 
- Streaming 이나 Batch Application 만들 때 주로 사용됨. 데이터 탐색이나 프로토타이핑시에는 생산성을 이유로 pyspark 가 자주 사용됨

## Reference 
- spark : https://1ambda.gitbook.io/practical-data-pipeline/02-processing/2.2-batch/2.1.2-spark-tutorial