### Ref
- https://dslyh01.tistory.com/ 
- https://assaeunji.github.io/python/2022-03-26-pyspark/



### Spark 설치

- 1. Java 설치 : https://www.oracle.com/java/technologies/downloads/#java11
- 2. Spark 3.0 설치 : https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
- 3. Hadoop 설치 : https://github.com/cdarlint/winutils/blob/master/hadoop-2.7.7/bin/winutils.exe
- 4. 환경변수 설정을 위한 경로 설정
  - 1) C:\Spark 에서 압축 해제
  - 2) C:\Hadoop\bin 으로 winutils(Hadoop 파일) 이동
  - 3) 사용자 변수에 HADOOP_HOME, SPARK_HOME 추가
  - 4) 사용자 변수 Path - 편집 - %SPARK_HOME%\bin / %HADOOP_HOME%\bin 추가
  - 5) 시스템 변수에 JAVA_HOME 추가 (KoNLPy 설치에서 추가 완료)
- 5. pip install pyspark
- 6. pyspark 로 실행 확인
- 7. pip install findspark

#### 기타 사항
- import findspark ; findspark.init()을 통해 pyspark 임포트 오류 해결 가능
- 반드시 import findspark ; findspark.init() 코드가 import pyspark보다 선행 필요



### Spark Workflow
- 1. User : Spark 실행 (PySpark 코드 실행)
- 2. Driver Node : 드라이버 프로그램이 클러스터 관리자와 협력하여 리소스 할당
- 3. Cluster Manager : 리소스가 할당되면 작업은 실행을 위해 워커 노드로 전송
- 4. Worker Node : 로컬에서 데이터를 처리하고 결과를 다시 드라이버 노드로 전송
- 5. Driver Node : 작업 완료 후 결과를 유저에 반환



### Spark 특징
- 인메모리 컴퓨팅 : 중간 데이터를 메모리(RAM)에 저장하여 I/O 시간 감소
- 최적화된 실행 계획 : 카탈리스트 옵티마이저를 통해 작업 실행 방법을 지능적으로 계획하여 중복 작업 최소화
- 지연 평가 : Spark의 작업은 느리게 평가되므로 변환이 즉시 실행되지 않는 대신, 논리적 실행 계획을 수립하고 작업(예를 들면 count 또는 saveAsTextFile 등)이 호출될 때만 작동하므로 Workflow 최적화 가능
- 병렬 처리: 데이터를 여러 청크로 분할하고 여러 노드에서 동시에(여러 작업을 동시에) 분할된 청크를 처리 가능
- 데이터 셔플링 감소 : 데이터 셔플링(파티션 간에 데이터를 재분배하는 작업)은 비용이 많이 드는 작업이지만 Spark는 불필요한 셔플을 최소화하여 성능 향상

In [1]:
import findspark
findspark.init() 

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
# 스파크 세션 생성
spark = SparkSession.builder.master('local').appName('SparkSQL').getOrCreate()

# 로그 레벨 정의
spark.sparkContext.setLogLevel('ERROR')

In [4]:
# Example
data = [('001','Smith','M',40,'DA',4000),
        ('002','Rose','M',35,'DA',3000),
        ('003','Williams','M',30,'DE',2500),
        ('004','Anne','F',30,'DE',3000),
        ('005','Mary','F',35,'BE',4000),
        ('006','James','M',30,'FE',3500)]

columns = ["cd","name","gender","age","div","salary"]

# Spark의 DataFrame 생성
df = spark.createDataFrame(data = data, schema = columns)

In [5]:
# 스키마 출력
df.printSchema()

root
 |-- cd: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: long (nullable = true)
 |-- div: string (nullable = true)
 |-- salary: long (nullable = true)



In [6]:
# DataFrame 출력
df.show()

+---+--------+------+---+---+------+
| cd|    name|gender|age|div|salary|
+---+--------+------+---+---+------+
|001|   Smith|     M| 40| DA|  4000|
|002|    Rose|     M| 35| DA|  3000|
|003|Williams|     M| 30| DE|  2500|
|004|    Anne|     F| 30| DE|  3000|
|005|    Mary|     F| 35| BE|  4000|
|006|   James|     M| 30| FE|  3500|
+---+--------+------+---+---+------+



In [7]:
df.createOrReplaceTempView("EMP_INFO") # 테이블 이름 지정

In [8]:
# SQL 구문 활용 조회
spark.sql("select cd, name, gender, div, salary from emp_info").show()

+---+--------+------+---+------+
| cd|    name|gender|div|salary|
+---+--------+------+---+------+
|001|   Smith|     M| DA|  4000|
|002|    Rose|     M| DA|  3000|
|003|Williams|     M| DE|  2500|
|004|    Anne|     F| DE|  3000|
|005|    Mary|     F| BE|  4000|
|006|   James|     M| FE|  3500|
+---+--------+------+---+------+



In [9]:
# PySpark 함수 활용 조회
df.filter("gender == 'F'").show()

+---+----+------+---+---+------+
| cd|name|gender|age|div|salary|
+---+----+------+---+---+------+
|004|Anne|     F| 30| DE|  3000|
|005|Mary|     F| 35| BE|  4000|
+---+----+------+---+---+------+



In [10]:
from pyspark.sql import functions as F

# 여러 조건 활용 가능
df.filter((F.col('div') == 'DA') & (F.col('salary') > 3500)).show()

+---+-----+------+---+---+------+
| cd| name|gender|age|div|salary|
+---+-----+------+---+---+------+
|001|Smith|     M| 40| DA|  4000|
+---+-----+------+---+---+------+



In [11]:
# 카운트
df.filter((F.col('div') == 'DA') & (F.col('salary') > 3500)).count()

1

In [12]:
# Group By
df.groupby('div').count().show()

+---+-----+
|div|count|
+---+-----+
| DE|    2|
| DA|    2|
| FE|    1|
| BE|    1|
+---+-----+



In [13]:
# Order By = Sort
df.groupby('div').count().sort('count', ascending=True).show()

+---+-----+
|div|count|
+---+-----+
| FE|    1|
| BE|    1|
| DE|    2|
| DA|    2|
+---+-----+



In [14]:
# To Pandas : Pandas DataFrame으로 변형 가능
df.groupby('div').count().sort('count', ascending=True).toPandas()

Unnamed: 0,div,count
0,FE,1
1,BE,1
2,DE,2
3,DA,2


In [15]:
df.describe().show(truncate=False)

+-------+------------------+--------+------+------------------+----+------------------+
|summary|cd                |name    |gender|age               |div |salary            |
+-------+------------------+--------+------+------------------+----+------------------+
|count  |6                 |6       |6     |6                 |6   |6                 |
|mean   |3.5               |null    |null  |33.333333333333336|null|3333.3333333333335|
|stddev |1.8708286933869707|null    |null  |4.0824829046386295|null|605.5300708194983 |
|min    |001               |Anne    |F     |30                |BE  |2500              |
|max    |006               |Williams|M     |40                |FE  |4000              |
+-------+------------------+--------+------+------------------+----+------------------+



In [16]:
# select : 특정 컬럼만 출력
df.select("name").show()

+--------+
|    name|
+--------+
|   Smith|
|    Rose|
|Williams|
|    Anne|
|    Mary|
|   James|
+--------+



In [17]:
# drop : 특정 컬럼 제외 출력
df.drop("name").show()

+---+------+---+---+------+
| cd|gender|age|div|salary|
+---+------+---+---+------+
|001|     M| 40| DA|  4000|
|002|     M| 35| DA|  3000|
|003|     M| 30| DE|  2500|
|004|     F| 30| DE|  3000|
|005|     F| 35| BE|  4000|
|006|     M| 30| FE|  3500|
+---+------+---+---+------+



In [18]:
# count : 총 개수
# countDistinct : 중복 제거 총 개수 = unique()
df.select(F.count("gender"), F.countDistinct("gender")).show()

+-------------+----------------------+
|count(gender)|count(DISTINCT gender)|
+-------------+----------------------+
|            6|                     2|
+-------------+----------------------+



In [19]:
# withColumn : 컬럼 정보 수정 혹은 신규 추가
change_type = df.withColumn("age", df.age.cast("String"))
change_type.show()

+---+--------+------+---+---+------+
| cd|    name|gender|age|div|salary|
+---+--------+------+---+---+------+
|001|   Smith|     M| 40| DA|  4000|
|002|    Rose|     M| 35| DA|  3000|
|003|Williams|     M| 30| DE|  2500|
|004|    Anne|     F| 30| DE|  3000|
|005|    Mary|     F| 35| BE|  4000|
|006|   James|     M| 30| FE|  3500|
+---+--------+------+---+---+------+



In [20]:
change_type.printSchema()

root
 |-- cd: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- div: string (nullable = true)
 |-- salary: long (nullable = true)



In [21]:
# 컬럼 신규 추가
# F.lit = fill
df.withColumn("country", F.lit("USA")).show() 

+---+--------+------+---+---+------+-------+
| cd|    name|gender|age|div|salary|country|
+---+--------+------+---+---+------+-------+
|001|   Smith|     M| 40| DA|  4000|    USA|
|002|    Rose|     M| 35| DA|  3000|    USA|
|003|Williams|     M| 30| DE|  2500|    USA|
|004|    Anne|     F| 30| DE|  3000|    USA|
|005|    Mary|     F| 35| BE|  4000|    USA|
|006|   James|     M| 30| FE|  3500|    USA|
+---+--------+------+---+---+------+-------+



In [22]:
# withColumn을 활용하여 CASE WHEN 구문 작성

df.withColumn("gender", F.when(df.gender == "F", "Female") # gender가 F일 때, Female
              .when(df.gender=="M", "Male")                # M일 때, Male
              .when(df.gender.isNull(), "")                # Null일 때, 공백
              .otherwise(df.gender)                        # 기타의 경우 gender의 값
              ).show()

+---+--------+------+---+---+------+
| cd|    name|gender|age|div|salary|
+---+--------+------+---+---+------+
|001|   Smith|  Male| 40| DA|  4000|
|002|    Rose|  Male| 35| DA|  3000|
|003|Williams|  Male| 30| DE|  2500|
|004|    Anne|Female| 30| DE|  3000|
|005|    Mary|Female| 35| BE|  4000|
|006|   James|  Male| 30| FE|  3500|
+---+--------+------+---+---+------+



In [23]:
# 구조 지정

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
        
schema = StructType([
     StructField('name', StructType([                          # Struct Field 내 Struct Type (Struct Field) 으로 스키마 세분화 가능
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
        StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True), # ArrayType  내 타입 지정 가능
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df = spark.createDataFrame(data = data, schema = schema)
df.show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|[James, , Smith]      |[Java, Scala, C++]|OH   |M     |
|[Anna, Rose, ]        |[Spark, Java, C++]|NY   |F     |
|[Julia, , Williams]   |[CSharp, VB]      |OH   |F     |
|[Maria, Anne, Jones]  |[CSharp, VB]      |NY   |M     |
|[Jen, Mary, Brown]    |[CSharp, VB]      |NY   |M     |
|[Mike, Mary, Williams]|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [24]:
# 컬럼명.세부 스키마명으로 디테일하게 접근 가능
df.select("name.firstname").show()

+---------+
|firstname|
+---------+
|    James|
|     Anna|
|    Julia|
|    Maria|
|      Jen|
|     Mike|
+---------+



In [25]:
df.select("name.middlename").show()

+----------+
|middlename|
+----------+
|          |
|      Rose|
|          |
|      Anne|
|      Mary|
|      Mary|
+----------+



In [26]:
df.filter(df.gender == "M").show()  # Python Pandas 형태

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [27]:
df.filter("gender = 'M'").show()    # SQL 형태

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [28]:
df.where(df.gender == "M").show()   # where 동일 결과

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [29]:
# 여러 조건의 경우도 다각적으로 접근 가능

df.filter((df.state == "OH") & (df.gender == "M")).show() # Python Pandas 형태

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [30]:
df.filter("state = 'OH' and gender = 'M'").show()          # SQL 형태

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [41]:
# IN 조건

df.filter(df.state.isin("OH", "CA", "DE")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [42]:
df[df.state.isin("OH", "CA", "DE")].show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [43]:
# NOT IN 조건
df[ ~ df.state.isin("OH", "CA", "DE")].show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [44]:
# LIKE 조건은 아래 함수를 통해 사용 가능

# startswith
# endswith
# contains
# like

In [45]:
# N으로 시작
df.filter(df.state.startswith("N")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [46]:
df.filter(df.state.like("N%")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F|
|[Maria, Anne, Jones]|      [CSharp, VB]|   NY|     M|
|  [Jen, Mary, Brown]|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [47]:
# H로 끝
df.filter(df.state.endswith("H")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [48]:
df.filter(df.state.like("%H")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [50]:
# H를 포함
df.filter(df.state.contains("H")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [51]:
df.filter(df.state.like("%H%")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    [James, , Smith]|[Java, Scala, C++]|   OH|     M|
| [Julia, , Williams]|      [CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [52]:
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)



In [53]:
# struct 타입 필터링 시 컬럼.세부 컬럼으로 접근
df.filter(df.name.lastname =="Williams").show()

+--------------------+------------+-----+------+
|                name|   languages|state|gender|
+--------------------+------------+-----+------+
| [Julia, , Williams]|[CSharp, VB]|   OH|     F|
|[Mike, Mary, Will...|[Python, VB]|   OH|     M|
+--------------------+------------+-----+------+



In [55]:
# array 타입 필터링 시 array_contains() 함수 사용
df.filter(F.array_contains(df.languages,'Java')).show()

+----------------+------------------+-----+------+
|            name|         languages|state|gender|
+----------------+------------------+-----+------+
|[James, , Smith]|[Java, Scala, C++]|   OH|     M|
|  [Anna, Rose, ]|[Spark, Java, C++]|   NY|     F|
+----------------+------------------+-----+------+



In [56]:
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [57]:
# groupBy

df.groupBy("department").count().show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|   Finance|    4|
| Marketing|    2|
+----------+-----+



In [58]:
df.groupBy("department").sum("salary").show()

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|     257000|
|   Finance|     351000|
| Marketing|     171000|
+----------+-----------+



In [59]:
df.groupBy("department").min("salary").show() 

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|     Sales|      81000|
|   Finance|      79000|
| Marketing|      80000|
+----------+-----------+



In [60]:
df.groupBy("department").max("salary").show() 

+----------+-----------+
|department|max(salary)|
+----------+-----------+
|     Sales|      90000|
|   Finance|      99000|
| Marketing|      91000|
+----------+-----------+



In [61]:
df.groupBy("department").avg("salary").show() 

+----------+-----------------+
|department|      avg(salary)|
+----------+-----------------+
|     Sales|85666.66666666667|
|   Finance|          87750.0|
| Marketing|          85500.0|
+----------+-----------------+



In [65]:
# Pandas와 같이 .agg 사용 가능

df.groupBy("department").agg(
    F.sum("salary").alias("sum_salary")
    , F.avg("salary").alias("avg_salary")
    , F.sum("bonus").alias("sum_bonus")
    , F.max("bonus").alias("max_bonus")
).show()

+----------+----------+-----------------+---------+---------+
|department|sum_salary|       avg_salary|sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|     Sales|    257000|85666.66666666667|    53000|    23000|
|   Finance|    351000|          87750.0|    81000|    24000|
| Marketing|    171000|          85500.0|    39000|    21000|
+----------+----------+-----------------+---------+---------+



In [66]:
# 여러 컬럼 집계 가능

df.groupBy("department","state").sum("salary","bonus").show()

+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|   Finance|   NY|     162000|     34000|
| Marketing|   NY|      91000|     21000|
|     Sales|   CA|      81000|     23000|
| Marketing|   CA|      80000|     18000|
|   Finance|   CA|     189000|     47000|
|     Sales|   NY|     176000|     30000|
+----------+-----+-----------+----------+



In [69]:
# orderBy
df.orderBy(F.asc("state"), F.desc("salary")).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
+-------------+----------+-----+------+---+-----+



In [70]:
# JOIN

emp = [(1,"Smith",-1,"2018","10","M",3000),
       (2,"Rose",1,"2010","20","M",4000),
       (3,"Williams",1,"2010","10","M",1000),
       (4,"Jones",2,"2005","10","F",2000),
       (5,"Brown",2,"2010","40","",-1),
       (6,"Brown",2,"2010","50","",-1)]

empColumns = ["emp_id", "name", "superior_emp_id", "year_joined",
              "emp_dept_id", "gender", "salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.show(truncate=False)

dept = [("Finance",10),
        ("Marketing",20),
        ("Sales",30),
        ("IT",40)]

deptColumns = ["dept_name", "dept_id"]

deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [71]:
# INNER JOIN
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id).show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [72]:
# FULL JOIN
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "full").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     6|   Brown|              2|       2010|         50|      |    -1|     null|   null|
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|  null|    null|           null|       null|       null|  null|  null|    Sales|     30|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [73]:
# LEFT JOIN
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "left").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     6|   Brown|              2|       2010|         50|      |    -1|     null|   null|
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [74]:
# RIGHT JOIN
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "right").show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|  null|    null|           null|       null|       null|  null|  null|    Sales|     30|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [75]:
# withColumnRenamed로 emp_dept_id의 컬럼명을 변경한 후 on을 활용해 JOIN
empDF.withColumnRenamed("emp_dept_id", "dept_id").join(deptDF, on = 'dept_id').show(truncate=False)

+-------+------+--------+---------------+-----------+------+------+---------+
|dept_id|emp_id|name    |superior_emp_id|year_joined|gender|salary|dept_name|
+-------+------+--------+---------------+-----------+------+------+---------+
|10     |1     |Smith   |-1             |2018       |M     |3000  |Finance  |
|10     |3     |Williams|1              |2010       |M     |1000  |Finance  |
|10     |4     |Jones   |2              |2005       |F     |2000  |Finance  |
|20     |2     |Rose    |1              |2010       |M     |4000  |Marketing|
|40     |5     |Brown   |2              |2010       |      |-1    |IT       |
+-------+------+--------+---------------+-----------+------+------+---------+



In [76]:
# 오른쪽 테이블의 dept_id가 NULL인 데이터를 출력 : left_anti
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "left_anti").show()

# SELECT *
# FROM empDF A 
# LEFT JOIN deptDF B ON A.emp_dept_id = B.dept_id 
# WHERE B.dept_id IS NULL

+------+-----+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|     6|Brown|              2|       2010|         50|      |    -1|
+------+-----+---------------+-----------+-----------+------+------+



In [77]:
data = [("James","Sales",34),
        ("Michael","Sales",56),
        ("Robert","Sales",30),
        ("Maria","Finance",24) ]

columns= ["name", "dept", "age"]

df1 = spark.createDataFrame(data = data, schema = columns)
df1.show()


data2=[("James","Sales","NY",9000),
       ("Maria","Finance","CA",9000),
       ("Jen","Finance","NY",7900),
       ("Jeff","Marketing","CA",8000)]

columns2= ["name", "dept", "state", "salary"]

df2 = spark.createDataFrame(data = data2, schema = columns2)
df2.show()

+-------+-------+---+
|   name|   dept|age|
+-------+-------+---+
|  James|  Sales| 34|
|Michael|  Sales| 56|
| Robert|  Sales| 30|
|  Maria|Finance| 24|
+-------+-------+---+

+-----+---------+-----+------+
| name|     dept|state|salary|
+-----+---------+-----+------+
|James|    Sales|   NY|  9000|
|Maria|  Finance|   CA|  9000|
|  Jen|  Finance|   NY|  7900|
| Jeff|Marketing|   CA|  8000|
+-----+---------+-----+------+



In [79]:
# UNION
# 컬럼의 개수 및 순서를 동일하게 해야함
# Spark의 UNION은 SQL과 달리 DISTINCT 한 행만 UNION 하지 않고 중복된 행을 허용 = UNION ALL

# UNION을 위한 컬럼 생성
for column in [column for column in df2.columns if column not in df1.columns]:
    df1 = df1.withColumn(column, F.lit(None))

for column in [column for column in df1.columns if column not in df2.columns]:
    df2 = df2.withColumn(column, F.lit(None))
    
df1.show()
df2.show()

+-------+-------+---+-----+------+
|   name|   dept|age|state|salary|
+-------+-------+---+-----+------+
|  James|  Sales| 34| null|  null|
|Michael|  Sales| 56| null|  null|
| Robert|  Sales| 30| null|  null|
|  Maria|Finance| 24| null|  null|
+-------+-------+---+-----+------+

+-----+---------+-----+------+----+
| name|     dept|state|salary| age|
+-----+---------+-----+------+----+
|James|    Sales|   NY|  9000|null|
|Maria|  Finance|   CA|  9000|null|
|  Jen|  Finance|   NY|  7900|null|
| Jeff|Marketing|   CA|  8000|null|
+-----+---------+-----+------+----+



In [80]:
# 순서 조정 X
df1.union(df2).show()

+-------+---------+---+-----+------+
|   name|     dept|age|state|salary|
+-------+---------+---+-----+------+
|  James|    Sales| 34| null|  null|
|Michael|    Sales| 56| null|  null|
| Robert|    Sales| 30| null|  null|
|  Maria|  Finance| 24| null|  null|
|  James|    Sales| NY| 9000|  null|
|  Maria|  Finance| CA| 9000|  null|
|    Jen|  Finance| NY| 7900|  null|
|   Jeff|Marketing| CA| 8000|  null|
+-------+---------+---+-----+------+



In [81]:
# 순서 조정
df1.select("name","dept","state","salary","age").union(df2).show()

+-------+---------+-----+------+----+
|   name|     dept|state|salary| age|
+-------+---------+-----+------+----+
|  James|    Sales| null|  null|  34|
|Michael|    Sales| null|  null|  56|
| Robert|    Sales| null|  null|  30|
|  Maria|  Finance| null|  null|  24|
|  James|    Sales|   NY|  9000|null|
|  Maria|  Finance|   CA|  9000|null|
|    Jen|  Finance|   NY|  7900|null|
|   Jeff|Marketing|   CA|  8000|null|
+-------+---------+-----+------+----+



In [82]:
# unionByName을 활용한 순서 조정
df1.unionByName(df2).show()

+-------+---------+----+-----+------+
|   name|     dept| age|state|salary|
+-------+---------+----+-----+------+
|  James|    Sales|  34| null|  null|
|Michael|    Sales|  56| null|  null|
| Robert|    Sales|  30| null|  null|
|  Maria|  Finance|  24| null|  null|
|  James|    Sales|null|   NY|  9000|
|  Maria|  Finance|null|   CA|  9000|
|    Jen|  Finance|null|   NY|  7900|
|   Jeff|Marketing|null|   CA|  8000|
+-------+---------+----+-----+------+



In [83]:
data = [("Banana",1000,"USA"),
        ("Carrots",1500,"USA"),
        ("Beans",1600,"USA"),
        ("Orange",2000,"USA"),
        ("Orange",2000,"USA"),
        ("Banana",400,"China"),
        ("Carrots",1200,"China"),
        ("Beans",1500,"China"),
        ("Orange",4000,"China"),
        ("Banana",2000,"Canada"),
        ("Carrots",2000,"Canada"),
        ("Beans",2000,"Mexico")]

columns= ["Product", "Amount", "Country"]

df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



In [84]:
# PIVOT

df.groupBy("Product").pivot("Country").sum("Amount").show(truncate=False)

# Product별 Country별 Amount 합계

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+



In [93]:
import time

start_time = time.time()

df.groupBy("Product").pivot("Country").sum("Amount").show(truncate=False)

end_time = time.time()

print(f'Processing Time : {end_time - start_time}')

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+

Processing Time : 5.07438588142395


In [94]:
# 가벼운 연산으로 계산 : pivot 컬럼 명시
countries = ["USA","China","Canada","Mexico"]

start_time = time.time()

df.groupBy("Product").pivot("Country", countries).sum("Amount").show(truncate=False)

end_time = time.time()

print(f'Processing Time : {end_time - start_time}')

+-------+----+-----+------+------+
|Product|USA |China|Canada|Mexico|
+-------+----+-----+------+------+
|Orange |4000|4000 |null  |null  |
|Beans  |1600|1500 |null  |2000  |
|Banana |1000|400  |2000  |null  |
|Carrots|1500|1200 |2000  |null  |
+-------+----+-----+------+------+

Processing Time : 3.3013641834259033


In [95]:
# UNPIVOT : expr 표현식을 활용하여 변형 가능

unpivotExpr = "stack(4,'USA',USA,'Canada',Canada,'China',China,'Mexico',Mexico) as (Country, Total)"

unPivotDF = pivotDF.select("Product", F.expr(unpivotExpr)).filter("total is not null").orderBy("Product")
unPivotDF.show()

+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
| Banana|    USA| 1000|
| Banana| Canada| 2000|
| Banana|  China|  400|
|  Beans|    USA| 1600|
|  Beans|  China| 1500|
|  Beans| Mexico| 2000|
|Carrots|    USA| 1500|
|Carrots| Canada| 2000|
|Carrots|  China| 1200|
| Orange|    USA| 4000|
| Orange|  China| 4000|
+-------+-------+-----+



In [97]:
# row_number()

from pyspark.sql.window import Window 

df.withColumn('row_num', F.row_number().over(Window.partitionBy('Product').orderBy(F.desc('Amount')))).show()

+-------+------+-------+-------+
|Product|Amount|Country|row_num|
+-------+------+-------+-------+
| Orange|  4000|  China|      1|
| Orange|  2000|    USA|      2|
| Orange|  2000|    USA|      3|
|  Beans|  2000| Mexico|      1|
|  Beans|  1600|    USA|      2|
|  Beans|  1500|  China|      3|
| Banana|  2000| Canada|      1|
| Banana|  1000|    USA|      2|
| Banana|   400|  China|      3|
|Carrots|  2000| Canada|      1|
|Carrots|  1500|    USA|      2|
|Carrots|  1200|  China|      3|
+-------+------+-------+-------+

