집계와 조인
이 교시를 마치면 다음을 할 수 있습니다:

groupBy()와 agg()로 그룹별 집계를 수행할 수 있다
count(), sum(), avg(), min(), max() 집계 함수를 활용할 수 있다
다양한 조인 타입(inner, left, right, outer)을 이해하고 적용할 수 있다
pivot()으로 데이터를 피벗 테이블 형태로 변환할 수 있다
Spark SQL을 DataFrame과 함께 활용할 수 있다

In [7]:
# -----------------------------------------------------------------------------
# 환경 설정: SparkSession 및 테스트 데이터 준비
# -----------------------------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, when, count, sum, avg, min, max,
    countDistinct, first, last, collect_list, collect_set,
    round as spark_round, expr
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pandas as pd
import numpy as np
import os

# SparkSession 생성
spark = SparkSession.builder \
    .appName("PySpark-Aggregation-Join") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", 10) \
    .getOrCreate()

# 테스트 데이터 디렉토리
os.makedirs("/tmp/spark_tutorial", exist_ok=True)

np.random.seed(42)

# -----------------------------------------------------------------------------
# 테스트 데이터 1: 직원 정보
# -----------------------------------------------------------------------------
employees = pd.DataFrame({
    "emp_id": [f"E{i:03d}" for i in range(1, 51)],
    "name": [f"Employee_{i}" for i in range(1, 51)],
    "department": np.random.choice(
        ["Engineering", "Sales", "Marketing", "HR", "Finance"], 50
    ),
    "salary": np.random.randint(40000, 120000, 50),
    "age": np.random.randint(25, 55, 50),
    "hire_year": np.random.choice([2020, 2021, 2022, 2023, 2024], 50),
})
employees.to_csv("/tmp/spark_tutorial/employees.csv", index=False)

# -----------------------------------------------------------------------------
# 테스트 데이터 2: 부서 정보 (조인용)
# -----------------------------------------------------------------------------
departments = pd.DataFrame({
    "dept_name": ["Engineering", "Sales", "Marketing", "HR", "Finance", "Legal"],
    "dept_head": ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank"],
    "budget": [500000, 300000, 200000, 150000, 400000, 100000],
    "location": ["Seoul", "Busan", "Seoul", "Daegu", "Seoul", "Incheon"],
})
departments.to_csv("/tmp/spark_tutorial/departments.csv", index=False)

# -----------------------------------------------------------------------------
# 테스트 데이터 3: 매출 데이터 (시계열)
# -----------------------------------------------------------------------------
sales = pd.DataFrame({
    "date": pd.date_range("2026-01-01", periods=100, freq="D").strftime("%Y-%m-%d"),
    "product": np.random.choice(["A", "B", "C"], 100),
    "region": np.random.choice(["Seoul", "Busan", "Daegu"], 100),
    "amount": np.random.randint(100, 1000, 100),
    "quantity": np.random.randint(1, 20, 100),
})
sales.to_csv("/tmp/spark_tutorial/sales.csv", index=False)

# DataFrame 로드
df_emp = spark.read.csv("/tmp/spark_tutorial/employees.csv", header=True, inferSchema=True)
df_dept = spark.read.csv("/tmp/spark_tutorial/departments.csv", header=True, inferSchema=True)
df_sales = spark.read.csv("/tmp/spark_tutorial/sales.csv", header=True, inferSchema=True)

print("데이터 로드 완료!")
print(f"직원: {df_emp.count()}명, 부서: {df_dept.count()}개, 매출: {df_sales.count()}건")

데이터 로드 완료!
직원: 50명, 부서: 6개, 매출: 100건


##View(뷰)

In [None]:
# -----------------------------------------------------------------------------
# createOrReplaceTempView(): SQL 테이블 등록
# -----------------------------------------------------------------------------
#
# createOrReplaceTempView(이름): 임시 뷰로 등록
# - 해당 SparkSession 내에서만 유효
# - 세션 종료 시 자동 삭제
# - 같은 이름의 뷰가 있으면 덮어쓰기 (Replace)

# DataFrame을 SQL 테이블(뷰)로 등록
df_emp.createOrReplaceTempView("employees")
df_dept.createOrReplaceTempView("departments")

print("SQL 테이블 등록 완료: employees, departments")

# -----------------------------------------------------------------------------
# 등록 후 사용 예시
# -----------------------------------------------------------------------------
# 이제 SQL 문법으로 DataFrame을 조회할 수 있습니다!
# spark.sql("SELECT * FROM employees WHERE salary > 5000")
# spark.sql("SELECT dept_id, AVG(salary) FROM employees GROUP BY dept_id")

SQL 테이블 등록 완료: employees, departments


26/01/20 06:34:34 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
# -----------------------------------------------------------------------------
# spark.sql(): SQL 쿼리 실행
# -----------------------------------------------------------------------------

# spark.sql(쿼리문): SQL 실행 후 DataFrame 반환
# 복잡한 로직을 SQL로 작성 가능

# SQL로 부서별 통계
sql_result = spark.sql("""
    SELECT
        department,
        COUNT(*) as `인원수`,
        ROUND(AVG(salary), 2) as `평균급여`,
        MAX(salary) as `최고급여`
    FROM employees
    GROUP BY department
    ORDER BY `인원수` DESC
""")

print("=== SQL 쿼리 결과 ===")
sql_result.show()

=== SQL 쿼리 결과 ===
+-----------+------+--------+--------+
| department|인원수|평균급여|최고급여|
+-----------+------+--------+--------+
|         HR|    13|85255.23|  112409|
|  Marketing|    10| 76457.5|  117189|
|      Sales|    10| 70509.7|  111211|
|    Finance|    10| 82034.8|  118953|
|Engineering|     7|79437.14|  107563|
+-----------+------+--------+--------+



In [4]:
# -----------------------------------------------------------------------------
# spark.sql(): SQL 쿼리 실행
# -----------------------------------------------------------------------------

# spark.sql(쿼리문): SQL 실행 후 DataFrame 반환
# 복잡한 로직을 SQL로 작성 가능

# SQL로 부서별 통계
sql_result = spark.sql("""
    SELECT
        department,
        COUNT(*) as `인원수`,
        ROUND(AVG(salary), 2) as `평균급여`,
        MAX(salary) as `최고급여`
    FROM employees
    GROUP BY department
    ORDER BY `인원수` DESC
""")

print("=== SQL 쿼리 결과 ===")
sql_result.show()

=== SQL 쿼리 결과 ===
+-----------+------+--------+--------+
| department|인원수|평균급여|최고급여|
+-----------+------+--------+--------+
|         HR|    13|85255.23|  112409|
|  Marketing|    10| 76457.5|  117189|
|      Sales|    10| 70509.7|  111211|
|    Finance|    10| 82034.8|  118953|
|Engineering|     7|79437.14|  107563|
+-----------+------+--------+--------+



In [5]:
# -----------------------------------------------------------------------------
# SQL: 조인
# -----------------------------------------------------------------------------

# SQL로 조인 쿼리
sql_join = spark.sql("""
    SELECT
        e.name,
        e.department,
        e.salary,
        d.dept_head,
        d.location
    FROM employees e
    JOIN departments d ON e.department = d.dept_name
    WHERE e.salary >= 70000
    ORDER BY e.salary DESC
""")

print("=== SQL 조인 결과 ===")
sql_join.show(10)

=== SQL 조인 결과 ===
+-----------+-----------+------+---------+--------+
|       name| department|salary|dept_head|location|
+-----------+-----------+------+---------+--------+
|Employee_10|    Finance|118953|      Eve|   Seoul|
| Employee_9|  Marketing|117189|  Charlie|   Seoul|
|Employee_26|  Marketing|114065|  Charlie|   Seoul|
|Employee_15|         HR|112409|    Diana|   Daegu|
|Employee_16|      Sales|111211|      Bob|   Busan|
| Employee_6|      Sales|109479|      Bob|   Busan|
|Employee_39|Engineering|107563|    Alice|   Seoul|
| Employee_5|    Finance|107121|      Eve|   Seoul|
| Employee_8|  Marketing|106557|  Charlie|   Seoul|
|Employee_17|         HR|105697|    Diana|   Daegu|
+-----------+-----------+------+---------+--------+
only showing top 10 rows



In [None]:
# -----------------------------------------------------------------------------
# expr(): SQL 표현식을 DataFrame에서 사용
# -----------------------------------------------------------------------------

# expr(): SQL 표현식을 Column으로 변환
# select(), withColumn() 등에서 SQL 문법 사용 가능

# SQL 표현식으로 새 컬럼 추가
df_with_expr = df_emp.withColumn(
    "salary_grade",
    expr("CASE WHEN salary >= 80000 THEN 'High' ELSE 'Normal' END")
).withColumn(
    "bonus",
    expr("salary * 0.1")  # 급여의 10%
)

print("=== expr() 사용 예시 ===")
df_with_expr.select("name", "salary", "salary_grade", "bonus").show(5)

### 실습 6-1: 임시 뷰 등록
#### df_emp를 "emp"라는 이름의 임시 뷰로 등록하세요.

In [6]:
df_emp.createOrReplaceTempView("emp")

#### 실습 6-2: SQL 쿼리 실행
##### SQL로 employees 테이블에서 salary가 70000 이상인 직원의 name과 salary를 조회하세요

In [None]:
sql_result = spark.sql('''
            SELECT
                name,
                salary
            FROM employees
            WHERE salary >= 70000
'''
)
sql_result.show(5)

+----------+------+
|      name|salary|
+----------+------+
|Employee_2| 88555|
|Employee_4| 75920|
|Employee_5|107121|
|Employee_6|109479|
|Employee_8|106557|
+----------+------+
only showing top 5 rows



#### 실습 6-3: SQL 집계
##### SQL로 부서별 평균 급여를 조회하세요.

In [13]:
sql_avg_result = spark.sql('''
            SELECT
                department,
                ROUND(AVG(salary), 2) as `평균급여`
            FROM employees 
            GROUP BY department
''')
sql_avg_result.show(5)

+-----------+--------+
| department|평균급여|
+-----------+--------+
|         HR|85255.23|
|  Marketing| 76457.5|
|      Sales| 70509.7|
|Engineering|79437.14|
|    Finance| 82034.8|
+-----------+--------+



#### 실습 6-4: expr 사용
##### expr()을 사용하여 salary의 5%를 "bonus" 컬럼으로 추가하세요.

In [15]:
df_emp.withColumn("bonus", expr("salary * 0.05")).select("name", "salary", "bonus").show(5)

+----------+------+-------+
|      name|salary|  bonus|
+----------+------+-------+
|Employee_1| 63483|3174.15|
|Employee_2| 88555|4427.75|
|Employee_3| 57159|2857.95|
|Employee_4| 75920|3796.00|
|Employee_5|107121|5356.05|
+----------+------+-------+
only showing top 5 rows



### 채워야함

In [52]:
# -----------------------------------------------------------------------------
# 환경 설정
# -----------------------------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, when,
    # NULL 처리
    coalesce, isnan,
    # 문자열 함수
    concat, concat_ws, substring, length, trim, ltrim, rtrim,
    upper, lower, initcap, regexp_replace, regexp_extract, split,
    lpad, rpad,
    # 날짜/시간 함수
    current_date, current_timestamp, to_date, to_timestamp, date_format,
    year, month, dayofmonth, dayofweek, hour, minute,
    date_add, date_sub, datediff, months_between, trunc,
    # 집계/윈도우
    count, sum, avg, min, max, first, last,
    row_number, rank, dense_rank, lag, lead,
    # 기타
    round as spark_round, expr,
)
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
import pandas as pd
import numpy as np
import os

# SparkSession 생성
spark = SparkSession.builder \
    .appName("PySpark-Advanced-Patterns") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", 10) \
    .getOrCreate()

os.makedirs("/tmp/spark_tutorial", exist_ok=True)
np.random.seed(42)

# -----------------------------------------------------------------------------
# 테스트 데이터: 다양한 상황을 포함한 직원 데이터
# -----------------------------------------------------------------------------
employees = pd.DataFrame({
    "emp_id": [f"E{i:03d}" for i in range(1, 31)],
    "name": [f"  Employee {i}  " for i in range(1, 31)],  # 앞뒤 공백
    "email": [f"emp{i}@company.com" for i in range(1, 31)],
    "department": np.random.choice(
        ["Engineering", "Sales", "Marketing", None], 30
    ),
    "salary": [
        50000, None, 70000, 80000, None,  # NULL 포함
        60000, 90000, 55000, None, 75000,
        65000, 85000, None, 72000, 68000,
        None, 95000, 62000, 78000, None,
        58000, 82000, 67000, None, 73000,
        69000, 88000, None, 76000, 71000
    ],
    "join_date": pd.date_range("2020-01-15", periods=30, freq="45D").strftime("%Y-%m-%d"),
})
employees.to_csv("/tmp/spark_tutorial/employees_advanced.csv", index=False)

# 매출 시계열 데이터
sales = pd.DataFrame({
    "date": pd.date_range("2026-01-01", periods=90, freq="D").strftime("%Y-%m-%d"),
    "product": np.random.choice(["A", "B", "C"], 90),
    "region": np.random.choice(["Seoul", "Busan", "Daegu"], 90),
    "amount": np.random.randint(100, 1000, 90),
})
sales.to_csv("/tmp/spark_tutorial/sales_ts.csv", index=False)

# DataFrame 로드
df = spark.read.csv("/tmp/spark_tutorial/employees_advanced.csv", header=True, inferSchema=True)
df_sales = spark.read.csv("/tmp/spark_tutorial/sales_ts.csv", header=True, inferSchema=True)

print("데이터 로드 완료!")
print(f"직원: {df.count()}명, 매출: {df_sales.count()}건")
df.show(10)

데이터 로드 완료!
직원: 30명, 매출: 90건
+------+---------------+-----------------+-----------+-------+----------+
|emp_id|           name|            email| department| salary| join_date|
+------+---------------+-----------------+-----------+-------+----------+
|  E001|   Employee 1  | emp1@company.com|  Marketing|50000.0|2020-01-15|
|  E002|   Employee 2  | emp2@company.com|       NULL|   NULL|2020-02-29|
|  E003|   Employee 3  | emp3@company.com|Engineering|70000.0|2020-04-14|
|  E004|   Employee 4  | emp4@company.com|  Marketing|80000.0|2020-05-29|
|  E005|   Employee 5  | emp5@company.com|  Marketing|   NULL|2020-07-13|
|  E006|   Employee 6  | emp6@company.com|       NULL|60000.0|2020-08-27|
|  E007|   Employee 7  | emp7@company.com|Engineering|90000.0|2020-10-11|
|  E008|   Employee 8  | emp8@company.com|Engineering|55000.0|2020-11-25|
|  E009|   Employee 9  | emp9@company.com|  Marketing|   NULL|2021-01-09|
|  E010|  Employee 10  |emp10@company.com|      Sales|75000.0|2021-02-23|
+------+--

#### Window 함수

In [10]:
# -----------------------------------------------------------------------------
# Window 기본: 순위 함수
# -----------------------------------------------------------------------------

from pyspark.sql.window import Window

# Window 정의: 부서별로 그룹화, 급여 내림차순 정렬
window_rank = Window.partitionBy("department").orderBy(col("salary").desc())

# 순위 함수 적용
# row_number(): 동점이어도 순차적 번호 (1, 2, 3, 4...)
# rank(): 동점은 같은 순위, 다음 순위 건너뜀 (1, 1, 3, 4...)
# dense_rank(): 동점은 같은 순위, 순위 안 건너뜀 (1, 1, 2, 3...)
df_ranked = df.filter(col("salary").isNotNull()).withColumn(
    "row_num",row_number().over(window_rank)
).withColumn(
    "rank",rank().over(window_rank)
).withColumn(
    "dense_rank",dense_rank().over(window_rank)
)

print("=== 부서별 급여 순위 ===")
df_ranked.select(
    "department", "name", "salary", "row_num", "rank", "dense_rank"
).orderBy("department", "row_num").show(15)

=== 부서별 급여 순위 ===
+-----------+---------------+-------+-------+----+----------+
| department|           name| salary|row_num|rank|dense_rank|
+-----------+---------------+-------+-------+----+----------+
|       NULL|  Employee 17  |95000.0|      1|   1|         1|
|       NULL|  Employee 19  |78000.0|      2|   2|         2|
|       NULL|  Employee 29  |76000.0|      3|   3|         3|
|       NULL|  Employee 25  |73000.0|      4|   4|         4|
|       NULL|  Employee 30  |71000.0|      5|   5|         5|
|       NULL|  Employee 15  |68000.0|      6|   6|         6|
|       NULL|  Employee 18  |62000.0|      7|   7|         7|
|       NULL|   Employee 6  |60000.0|      8|   8|         8|
|Engineering|   Employee 7  |90000.0|      1|   1|         1|
|Engineering|  Employee 22  |82000.0|      2|   2|         2|
|Engineering|   Employee 3  |70000.0|      3|   3|         3|
|Engineering|   Employee 8  |55000.0|      4|   4|         4|
|  Marketing|  Employee 12  |85000.0|      1|   1|  

#### lag(), lead(): 이전/다음 행 참조

In [11]:
# -----------------------------------------------------------------------------
# lag(), lead(): 이전/다음 행 참조
# -----------------------------------------------------------------------------

# lag(컬럼, n): n행 이전 값
# lead(컬럼, n): n행 이후 값
# 시계열 분석, 전일 대비 비교 등에 유용

# 날짜순 정렬된 매출 데이터로 실습
df_sales_parsed = df_sales.withColumn("date_parsed", to_date(col("date")))

# Window: 제품별, 날짜순
window_sales = Window.partitionBy("product").orderBy("date_parsed")

df_with_prev = df_sales_parsed.withColumn(
    "prev_amount", lag("amount", 1).over(window_sales)      # 전일 매출
).withColumn(
    "next_amount", lead("amount", 1).over(window_sales)     # 익일 매출
).withColumn(
    # 전일 대비 증감
    "change", col("amount") - col("prev_amount")
)

print("=== lag/lead: 전일 대비 비교 ===")
df_with_prev.filter(col("product") == "A").select(
    "date_parsed", "product", "amount", "prev_amount", "change"
).show(10)

=== lag/lead: 전일 대비 비교 ===
+-----------+-------+------+-----------+------+
|date_parsed|product|amount|prev_amount|change|
+-----------+-------+------+-----------+------+
| 2026-01-01|      A|   791|       NULL|  NULL|
| 2026-01-02|      A|   212|        791|  -579|
| 2026-01-05|      A|   541|        212|   329|
| 2026-01-06|      A|   663|        541|   122|
| 2026-01-07|      A|   367|        663|  -296|
| 2026-01-19|      A|   741|        367|   374|
| 2026-01-21|      A|   665|        741|   -76|
| 2026-01-24|      A|   324|        665|  -341|
| 2026-01-25|      A|   484|        324|   160|
| 2026-01-28|      A|   229|        484|  -255|
+-----------+-------+------+-----------+------+
only showing top 10 rows



#### 누적 합계 / 이동 평균

In [12]:
# -----------------------------------------------------------------------------
# 누적 합계 / 이동 평균
# -----------------------------------------------------------------------------

# 범위 지정: rowsBetween(시작, 끝)
# unboundedPreceding: 파티션 시작부터
# currentRow: 현재 행까지
# unboundedFollowing: 파티션 끝까지
# 숫자: 현재 행 기준 상대 위치 (-3, 0, 2 등)

# 누적 합계: 시작부터 현재까지
window_cumsum = Window.partitionBy("product").orderBy("date_parsed") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 7일 이동 평균: 최근 7일
window_ma7 = Window.partitionBy("product").orderBy("date_parsed") \
    .rowsBetween(-6, Window.currentRow)  # 현재 포함 7일

df_cumsum = df_sales_parsed.withColumn(
    "cumsum", sum("amount").over(window_cumsum)
).withColumn(
    "ma7", spark_round(avg("amount").over(window_ma7), 2)
)

print("=== 누적합 / 7일 이동평균 ===")
df_cumsum.filter(col("product") == "A").select(
    "date_parsed", "product", "amount", "cumsum", "ma7"
).show(15)

=== 누적합 / 7일 이동평균 ===
+-----------+-------+------+------+------+
|date_parsed|product|amount|cumsum|   ma7|
+-----------+-------+------+------+------+
| 2026-01-01|      A|   791|   791| 791.0|
| 2026-01-02|      A|   212|  1003| 501.5|
| 2026-01-05|      A|   541|  1544|514.67|
| 2026-01-06|      A|   663|  2207|551.75|
| 2026-01-07|      A|   367|  2574| 514.8|
| 2026-01-19|      A|   741|  3315| 552.5|
| 2026-01-21|      A|   665|  3980|568.57|
| 2026-01-24|      A|   324|  4304|501.86|
| 2026-01-25|      A|   484|  4788|540.71|
| 2026-01-28|      A|   229|  5017|496.14|
| 2026-02-01|      A|   771|  5788|511.57|
| 2026-02-03|      A|   515|  6303|532.71|
| 2026-02-07|      A|   302|  6605| 470.0|
| 2026-02-11|      A|   866|  7471|498.71|
| 2026-02-19|      A|   851|  8322| 574.0|
+-----------+-------+------+------+------+
only showing top 15 rows



#### 실무 패턴: 그룹별 Top N

In [13]:
# -----------------------------------------------------------------------------
# 실무 패턴: 그룹별 Top N
# -----------------------------------------------------------------------------

# 부서별 급여 Top 3 뽑기
# 1. row_number()로 순위 매기기
# 2. filter로 순위 <= 3 필터링

window_top = Window.partitionBy("department").orderBy(col("salary").desc())

df_top3 = df.filter(col("salary").isNotNull()).withColumn(
    "rank", row_number().over(window_top)
).filter(
    col("rank") <= 3
)

print("=== 부서별 급여 Top 3 ===")
df_top3.select("department", "name", "salary", "rank") \
    .orderBy("department", "rank").show()

=== 부서별 급여 Top 3 ===
+-----------+---------------+-------+----+
| department|           name| salary|rank|
+-----------+---------------+-------+----+
|       NULL|  Employee 17  |95000.0|   1|
|       NULL|  Employee 19  |78000.0|   2|
|       NULL|  Employee 29  |76000.0|   3|
|Engineering|   Employee 7  |90000.0|   1|
|Engineering|  Employee 22  |82000.0|   2|
|Engineering|   Employee 3  |70000.0|   3|
|  Marketing|  Employee 12  |85000.0|   1|
|  Marketing|   Employee 4  |80000.0|   2|
|  Marketing|  Employee 14  |72000.0|   3|
|      Sales|  Employee 27  |88000.0|   1|
|      Sales|  Employee 10  |75000.0|   2|
|      Sales|  Employee 26  |69000.0|   3|
+-----------+---------------+-------+----+



#### 실습 4-1: 순위 함수
##### 부서별로 급여 내림차순 순위(rank)를 매긴 "salary_rank" 컬럼을 추가하세요.

In [17]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
df.filter(col("salary").isNotNull()).withColumn(
    "salary_rank", row_number().over(window_spec)
).select("department", "name", "salary", "salary_rank").show()

+-----------+---------------+-------+-----------+
| department|           name| salary|salary_rank|
+-----------+---------------+-------+-----------+
|       NULL|  Employee 17  |95000.0|          1|
|       NULL|  Employee 19  |78000.0|          2|
|       NULL|  Employee 29  |76000.0|          3|
|       NULL|  Employee 25  |73000.0|          4|
|       NULL|  Employee 30  |71000.0|          5|
|       NULL|  Employee 15  |68000.0|          6|
|       NULL|  Employee 18  |62000.0|          7|
|       NULL|   Employee 6  |60000.0|          8|
|Engineering|   Employee 7  |90000.0|          1|
|Engineering|  Employee 22  |82000.0|          2|
|Engineering|   Employee 3  |70000.0|          3|
|Engineering|   Employee 8  |55000.0|          4|
|  Marketing|  Employee 12  |85000.0|          1|
|  Marketing|   Employee 4  |80000.0|          2|
|  Marketing|  Employee 14  |72000.0|          3|
|  Marketing|  Employee 11  |65000.0|          4|
|  Marketing|   Employee 1  |50000.0|          5|


#### 실습 4-2: lag 함수
##### 매출 데이터에서 제품별 전일 매출(prev_amount)을 추가하세요.

In [18]:
df_sales_parsed = df_sales.withColumn("date_parsed", to_date(col("date")))
window_spec = Window.partitionBy("product").orderBy("date_parsed")

df_sales_parsed.withColumn(
    "prev_amount", lag("amount", 1).over(window_spec)
).filter(col("product") == "A").show(10)

+----------+-------+------+------+-----------+-----------+
|      date|product|region|amount|date_parsed|prev_amount|
+----------+-------+------+------+-----------+-----------+
|2026-01-01|      A| Daegu|   791| 2026-01-01|       NULL|
|2026-01-02|      A| Busan|   212| 2026-01-02|        791|
|2026-01-05|      A| Seoul|   541| 2026-01-05|        212|
|2026-01-06|      A| Busan|   663| 2026-01-06|        541|
|2026-01-07|      A| Seoul|   367| 2026-01-07|        663|
|2026-01-19|      A| Daegu|   741| 2026-01-19|        367|
|2026-01-21|      A| Daegu|   665| 2026-01-21|        741|
|2026-01-24|      A| Busan|   324| 2026-01-24|        665|
|2026-01-25|      A| Daegu|   484| 2026-01-25|        324|
|2026-01-28|      A| Daegu|   229| 2026-01-28|        484|
+----------+-------+------+------+-----------+-----------+
only showing top 10 rows



#### 실습 4-3: 누적 합계
##### 제품별 날짜순 누적 매출(cumsum)을 계산하세요.

In [19]:
df_sales_parsed = df_sales.withColumn("date_parsed", to_date(col("date")))
window_cumsum = Window.partitionBy("product").orderBy("date_parsed") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_sales_parsed.withColumn(
    "cumsum", sum("amount").over(window_cumsum)
).filter(col("product") == "A").select("date_parsed", "amount", "cumsum").show(10)

+-----------+------+------+
|date_parsed|amount|cumsum|
+-----------+------+------+
| 2026-01-01|   791|   791|
| 2026-01-02|   212|  1003|
| 2026-01-05|   541|  1544|
| 2026-01-06|   663|  2207|
| 2026-01-07|   367|  2574|
| 2026-01-19|   741|  3315|
| 2026-01-21|   665|  3980|
| 2026-01-24|   324|  4304|
| 2026-01-25|   484|  4788|
| 2026-01-28|   229|  5017|
+-----------+------+------+
only showing top 10 rows



#### 실습 4-4: 그룹별 Top N
##### 부서별 급여 상위 2명만 필터링하세요.

In [20]:
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

df.filter(col("salary").isNotNull()).withColumn(
    "rank", row_number().over(window_spec)
).filter(col("rank") <= 2) \
    .select("department", "name", "salary", "rank").orderBy("department", "rank").show()

+-----------+---------------+-------+----+
| department|           name| salary|rank|
+-----------+---------------+-------+----+
|       NULL|  Employee 17  |95000.0|   1|
|       NULL|  Employee 19  |78000.0|   2|
|Engineering|   Employee 7  |90000.0|   1|
|Engineering|  Employee 22  |82000.0|   2|
|  Marketing|  Employee 12  |85000.0|   1|
|  Marketing|   Employee 4  |80000.0|   2|
|      Sales|  Employee 27  |88000.0|   1|
|      Sales|  Employee 10  |75000.0|   2|
+-----------+---------------+-------+----+



#### 자주 쓰는 실무 패턴
##### 패턴 1: 조건부 집계
##### groupBy + when 조합으로 조건별 카운트/합계

In [24]:
conditional_agg = df.groupBy("department").agg(
    count("*").alias("total"),
    # 조건별 카운트
    count(when(col("salary") >= 80000, 1)).alias("high_salary_cnt"),
    count(when(col("salary") < 50000, 1)).alias("low_salary_cnt"),
    # 조건별 합계
    sum(when(col("salary") >= 80000, col("salary"))).alias("high_salary_sum"),
)

print("=== 패턴 1: 조건부 집계 ===")
conditional_agg.show()

=== 패턴 1: 조건부 집계 ===
+-----------+-----+---------------+--------------+---------------+
| department|total|high_salary_cnt|low_salary_cnt|high_salary_sum|
+-----------+-----+---------------+--------------+---------------+
|  Marketing|    9|              2|             0|       165000.0|
|Engineering|    5|              2|             0|       172000.0|
|      Sales|    6|              1|             0|        88000.0|
|       NULL|   10|              1|             0|        95000.0|
+-----------+-----+---------------+--------------+---------------+



#### 패턴 2: 전체/그룹 대비 비율
#####  Window 함수로 그룹 전체 합계를 각 행에 추가

In [25]:
window_dept = Window.partitionBy("department")

df_ratio = df.filter(col("salary").isNotNull()).withColumn(
    "dept_total", sum("salary").over(window_dept)
).withColumn(
    "pct_of_dept", spark_round(col("salary") / col("dept_total") * 100, 2)  # 부서 내 비율
)

print("=== 패턴 2: 부서 내 급여 비율 ===")
df_ratio.select("department", "name", "salary", "dept_total", "pct_of_dept") \
    .orderBy("department", col("salary").desc()).show(10)

=== 패턴 2: 부서 내 급여 비율 ===
+-----------+---------------+-------+----------+-----------+
| department|           name| salary|dept_total|pct_of_dept|
+-----------+---------------+-------+----------+-----------+
|       NULL|  Employee 17  |95000.0|  583000.0|       16.3|
|       NULL|  Employee 19  |78000.0|  583000.0|      13.38|
|       NULL|  Employee 29  |76000.0|  583000.0|      13.04|
|       NULL|  Employee 25  |73000.0|  583000.0|      12.52|
|       NULL|  Employee 30  |71000.0|  583000.0|      12.18|
|       NULL|  Employee 15  |68000.0|  583000.0|      11.66|
|       NULL|  Employee 18  |62000.0|  583000.0|      10.63|
|       NULL|   Employee 6  |60000.0|  583000.0|      10.29|
|Engineering|   Employee 7  |90000.0|  297000.0|       30.3|
|Engineering|  Employee 22  |82000.0|  297000.0|      27.61|
+-----------+---------------+-------+----------+-----------+
only showing top 10 rows



#### 패턴 3: 데이터 품질 체크
##### 한 번에 여러 품질 지표 확인

In [26]:
quality_check = df.agg(
    count("*").alias("total_rows"),
    count("emp_id").alias("emp_id_non_null"),
    countDistinct("emp_id").alias("emp_id_unique"),
    count("salary").alias("salary_non_null"),
    sum(when(col("salary") < 0, 1).otherwise(0)).alias("negative_salary"),
    sum(when(col("department").isNull(), 1).otherwise(0)).alias("dept_null"),
)

print("=== 패턴 3: 데이터 품질 체크 ===")
quality_check.show()

=== 패턴 3: 데이터 품질 체크 ===
+----------+---------------+-------------+---------------+---------------+---------+
|total_rows|emp_id_non_null|emp_id_unique|salary_non_null|negative_salary|dept_null|
+----------+---------------+-------------+---------------+---------------+---------+
|        30|             30|           30|             22|              0|       10|
+----------+---------------+-------------+---------------+---------------+---------+



#### 패턴 4: 컬럼명 일괄 변경 (소문자, 공백→언더스코어)
#####  toDF()로 모든 컬럼명 한번에 변경
##### 리스트 컴프리헨션으로 변환 규칙 적용

In [27]:
# -----------------------------------------------------------------------------
# 패턴 4: 컬럼명 일괄 변경 (소문자, 공백→언더스코어)
# -----------------------------------------------------------------------------

# toDF()로 모든 컬럼명 한번에 변경
# 리스트 컴프리헨션으로 변환 규칙 적용

# 예시 DataFrame
df_messy_cols = spark.createDataFrame([
    (1, "A", 100),
    (2, "B", 200),
], ["User ID", "Product Name", "Total Amount"])

print("변경 전:", df_messy_cols.columns)

# 소문자 + 공백을 _로 변환
new_cols = [c.lower().replace(" ", "_") for c in df_messy_cols.columns]
df_clean_cols = df_messy_cols.toDF(*new_cols)

print("변경 후:", df_clean_cols.columns)

변경 전: ['User ID', 'Product Name', 'Total Amount']
변경 후: ['user_id', 'product_name', 'total_amount']


#### 패턴 5: Forward Fill (이전 값으로 NULL 채우기)

In [30]:
# -----------------------------------------------------------------------------
# 패턴 5: Forward Fill (이전 값으로 NULL 채우기)
# -----------------------------------------------------------------------------

# last(ignorenulls=True)를 Window와 함께 사용
# NULL을 직전 non-null 값으로 채움

df_with_null = spark.createDataFrame([
    (1, "2024-01-01", 100),
    (1, "2024-01-02", None),
    (1, "2024-01-03", None),
    (1, "2024-01-04", 200),
    (1, "2024-01-05", None),
], ["id", "date", "value"])

# Window: 시작~현재까지
window_ff = Window.partitionBy("id").orderBy("date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_filled = df_with_null.withColumn(
    "value_filled",
    # ignorenulls=True: NULL을 무시하고 마지막 non-null 값 반환
    last("value", ignorenulls=True).over(window_ff)
)

print("=== 패턴 5: Forward Fill ===")
df_filled.show()

=== 패턴 5: Forward Fill ===
+---+----------+-----+------------+
| id|      date|value|value_filled|
+---+----------+-----+------------+
|  1|2024-01-01|  100|         100|
|  1|2024-01-02| NULL|         100|
|  1|2024-01-03| NULL|         100|
|  1|2024-01-04|  200|         200|
|  1|2024-01-05| NULL|         200|
+---+----------+-----+------------+



In [42]:
# 샘플 데이터
sample_data = pd.DataFrame({
    "emp_id": [f"E{i:03d}" for i in range(1, 101)],
    "name": [f"Employee_{i}" for i in range(1, 101)],
    "department": np.random.choice(
        ["Engineering", "Sales", "Marketing", "HR", "Finance"], 100
    ),
    "salary": np.random.randint(40000, 120000, 100),
    "age": np.random.randint(25, 55, 100),
    "is_manager": np.random.choice([True, False], 100, p=[0.2, 0.8]),
})
# DataFrame 로드
df = spark.read.csv("/tmp/spark_tutorial/employees.csv", header=True, inferSchema=True)

print("데이터 로드 완료!")
print(f"행 수: {df.count()}, 컬럼: {df.columns}")
df.show(5)

데이터 로드 완료!
행 수: 50, 컬럼: ['emp_id', 'name', 'department', 'salary', 'age', 'hire_year']
+------+----------+----------+------+---+---------+
|emp_id|      name|department|salary|age|hire_year|
+------+----------+----------+------+---+---------+
|  E001|Employee_1|        HR| 63483| 50|     2024|
|  E002|Employee_2|   Finance| 88555| 33|     2022|
|  E003|Employee_3| Marketing| 57159| 52|     2024|
|  E004|Employee_4|   Finance| 75920| 31|     2023|
|  E005|Employee_5|   Finance|107121| 33|     2024|
+------+----------+----------+------+---+---------+
only showing top 5 rows



#### 패턴 6: 여러 컬럼에 같은 처리 일괄 적용

In [35]:
# -----------------------------------------------------------------------------
# 패턴 6: 여러 컬럼에 같은 처리 일괄 적용
# -----------------------------------------------------------------------------

# 리스트 컴프리헨션 + agg로 여러 컬럼에 같은 집계 적용
cols_to_sum = ["salary", "age"]

# 방법: 리스트 컴프리헨션으로 집계 함수 생성
agg_exprs = [sum(c).alias(f"total_{c}") for c in cols_to_sum]
agg_exprs += [avg(c).alias(f"avg_{c}") for c in cols_to_sum]

result = df.agg(*agg_exprs)

print("=== 패턴 6: 여러 컬럼 일괄 집계 ===")
result.show()

=== 패턴 6: 여러 컬럼 일괄 집계 ===
+------------+---------+----------+-------+
|total_salary|total_age|avg_salary|avg_age|
+------------+---------+----------+-------+
|     3954398|     1912|  79087.96|  38.24|
+------------+---------+----------+-------+



#### 과제: 직원 성과 분석 ETL 파이프라인

##### 시나리오
##### 데이터 엔지니어로서 직원 성과 분석 시스템을 구축해야 합니다. 원본 데이터는 품질 이슈(NULL, 공백, 형식 ##### 불일치)가 있어 정제가 필요하고, 날짜 기반 파생 변수와 Window 함수를 활용한 성과 지표 계산이 요구됩니다.

In [53]:
df_step1 = (
    df
    .withColumn("name", trim(col("name")))
    .fillna({"salary": 60000, "department": "Unassigned"})
    .withColumn("username", split(col("email"), "@")[0])
)

print("=== Step 1: 데이터 클렌징 완료 ===")
df_step1.select("name", "salary", "department").show(5)

# NULL 개수 확인
print("NULL 개수 확인:")
df_step1.select(
    count(when(col("salary").isNull(), 1)).alias("salary_null"),
    count(when(col("department").isNull(), 1)).alias("dept_null")
).show()

=== Step 1: 데이터 클렌징 완료 ===
+----------+-------+-----------+
|      name| salary| department|
+----------+-------+-----------+
|Employee 1|50000.0|  Marketing|
|Employee 2|60000.0| Unassigned|
|Employee 3|70000.0|Engineering|
|Employee 4|80000.0|  Marketing|
|Employee 5|60000.0|  Marketing|
+----------+-------+-----------+
only showing top 5 rows

NULL 개수 확인:
+-----------+---------+
|salary_null|dept_null|
+-----------+---------+
|          0|        0|
+-----------+---------+



#### Step 2: 날짜 파생 변수 생성

In [57]:

df_step2 = (
    df_step1.withColumn("join_date_dt", to_date(col("join_date"), "yyyy-MM-dd"))
    .withColumn("입사연도", year(col("join_date_dt")))
    .withColumn("입사월", month(col("join_date_dt")))
    .withColumn("근속일수", datediff(current_date(), col("join_date_dt")))
    .withColumn("근속년수", spark_round(col("근속일수") / 365, 1))
    .withColumn(
        "입사분기",
        when(col("입사월") <= 3, "Q1")
        .when(col("입사월") <= 6, "Q2")
        .when(col("입사월") <= 9, "Q3")
        .otherwise("Q4")
    )
) 

print("=== Step 2: 날짜 파생 변수 ===")
df_step2.select(
    "name", "join_date", "입사연도", "입사월", "입사분기", "근속일수", "근속년수"
).show(10)
df_step2.show(10)

=== Step 2: 날짜 파생 변수 ===
+-----------+----------+--------+------+--------+--------+--------+
|       name| join_date|입사연도|입사월|입사분기|근속일수|근속년수|
+-----------+----------+--------+------+--------+--------+--------+
| Employee 1|2020-01-15|    2020|     1|      Q1|    2198|     6.0|
| Employee 2|2020-02-29|    2020|     2|      Q1|    2153|     5.9|
| Employee 3|2020-04-14|    2020|     4|      Q2|    2108|     5.8|
| Employee 4|2020-05-29|    2020|     5|      Q2|    2063|     5.7|
| Employee 5|2020-07-13|    2020|     7|      Q3|    2018|     5.5|
| Employee 6|2020-08-27|    2020|     8|      Q3|    1973|     5.4|
| Employee 7|2020-10-11|    2020|    10|      Q4|    1928|     5.3|
| Employee 8|2020-11-25|    2020|    11|      Q4|    1883|     5.2|
| Employee 9|2021-01-09|    2021|     1|      Q1|    1838|     5.0|
|Employee 10|2021-02-23|    2021|     2|      Q1|    1793|     4.9|
+-----------+----------+--------+------+--------+--------+--------+
only showing top 10 rows

+------+--------

#### Step 3: Window 함수로 성과 지표 계산

In [56]:
from pyspark.sql.window import Window

# Window 정의
window_dept = Window.partitionBy("department")
window_dept_rank = Window.partitionBy("department").orderBy(col("salary").desc())

df_step3 = ( 
    df_step2.withColumn(
    "부서내순위", row_number().over(window_dept_rank))
    .withColumn("부서평균급여", spark_round(avg("salary").over(window_dept), 0))
    .withColumn("급여편차", col("salary") - col("부서평균급여"))
    .withColumn(
        "부서내비율",
        spark_round(col("salary") / sum("salary").over(window_dept) * 100, 1)
    )
)

print("=== Step 3: 성과 지표 ===")
df_step3.select(
    "department", "name", "salary", "부서내순위", "부서평균급여", "급여편차", "부서내비율"
).orderBy("department", "부서내순위").show(15)

=== Step 3: 성과 지표 ===
+-----------+-----------+-------+----------+------------+--------+----------+
| department|       name| salary|부서내순위|부서평균급여|급여편차|부서내비율|
+-----------+-----------+-------+----------+------------+--------+----------+
|Engineering| Employee 7|90000.0|         1|     71400.0| 18600.0|      25.2|
|Engineering|Employee 22|82000.0|         2|     71400.0| 10600.0|      23.0|
|Engineering| Employee 3|70000.0|         3|     71400.0| -1400.0|      19.6|
|Engineering|Employee 16|60000.0|         4|     71400.0|-11400.0|      16.8|
|Engineering| Employee 8|55000.0|         5|     71400.0|-16400.0|      15.4|
|  Marketing|Employee 12|85000.0|         1|     65778.0| 19222.0|      14.4|
|  Marketing| Employee 4|80000.0|         2|     65778.0| 14222.0|      13.5|
|  Marketing|Employee 14|72000.0|         3|     65778.0|  6222.0|      12.2|
|  Marketing|Employee 11|65000.0|         4|     65778.0|  -778.0|      11.0|
|  Marketing| Employee 5|60000.0|         5|     65778.0| -577

#### Step 4: 데이터 품질 검증

In [58]:
print("=== Step 4: 데이터 품질 검증 ===")

# 1. 전체 행 수
print(f"전체 행 수: {df_step3.count()}")

# 2. NULL 개수
print("\n[NULL 개수]")
df_step3.select(
    count(when(col("salary").isNull(), 1)).alias("salary_null"),
    count(when(col("department").isNull(), 1)).alias("dept_null"),
    count(when(col("join_date_dt").isNull(), 1)).alias("date_null")
).show()

# 3. 급여 통계
print("[급여 통계]")
df_step3.agg(
    min("salary").alias("최소급여"),
    max("salary").alias("최대급여"),
    spark_round(avg("salary"), 0).alias("평균급여")
).show()

# 4. 부서별 인원 분포
print("[부서별 인원 분포]")
df_step3.groupBy("department").count().orderBy(col("count").desc()).show()

# 5. 중복 emp_id 확인
dup_count = df_step3.groupBy("emp_id").count().filter(col("count") > 1).count()
print(f"[중복 emp_id 개수]: {dup_count}")

=== Step 4: 데이터 품질 검증 ===
전체 행 수: 30

[NULL 개수]
+-----------+---------+---------+
|salary_null|dept_null|date_null|
+-----------+---------+---------+
|          0|        0|        0|
+-----------+---------+---------+

[급여 통계]
+--------+--------+--------+
|최소급여|최대급여|평균급여|
+--------+--------+--------+
| 50000.0| 95000.0| 68967.0|
+--------+--------+--------+

[부서별 인원 분포]
+-----------+-----+
| department|count|
+-----------+-----+
| Unassigned|   10|
|  Marketing|    9|
|      Sales|    6|
|Engineering|    5|
+-----------+-----+

[중복 emp_id 개수]: 0
