In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [8]:
spark = SparkSession.builder \
    .appName("PySpark Example") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read.csv("data/Subjects.csv", header=True, inferSchema=True)
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df.show()

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| bbb| 45| 45|  56|     98|  244|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| eee| 45| 65|  78|     98|  286|
|    1| fff| 78| 76|  98|     89|  341|
|    2| ggg| 87| 67|  65|     56|  275|
|    2| hhh| 89| 98|  78|     78|  343|
|    2| iii|100| 78|  56|     65|  299|
|    2| jjj| 99| 89|  87|     87|  362|
|    2| kkk| 98| 45|  56|     54|  253|
|    2| lll| 65| 89|  87|     78|  319|
+-----+----+---+---+----+-------+-----+



In [9]:
# 컬럼 추가 : 총점
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df.show()

# 평균점수 계산
df.groupBy().avg("total").show()

# 조건 필터링 : 총점 300 이상 합격
df.filter(col("total") >= 300).show()

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| bbb| 45| 45|  56|     98|  244|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| eee| 45| 65|  78|     98|  286|
|    1| fff| 78| 76|  98|     89|  341|
|    2| ggg| 87| 67|  65|     56|  275|
|    2| hhh| 89| 98|  78|     78|  343|
|    2| iii|100| 78|  56|     65|  299|
|    2| jjj| 99| 89|  87|     87|  362|
|    2| kkk| 98| 45|  56|     54|  253|
|    2| lll| 65| 89|  87|     78|  319|
+-----+----+---+---+----+-------+-----+

+-----------------+
|       avg(total)|
+-----------------+
|312.3333333333333|
+-----------------+

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| fff| 78| 7

In [10]:
# Temp View 생성
df.createOrReplaceTempView("students")

# SQL 쿼리
high_score = spark.sql("SELECT name, total FROM students WHERE total >= 300")
high_score.show()

+----+-----+
|name|total|
+----+-----+
| aaa|  342|
| ccc|  338|
| ddd|  346|
| fff|  341|
| hhh|  343|
| jjj|  362|
| lll|  319|
+----+-----+



In [11]:
spark.stop()
print("SparkSession 종료")

SparkSession 종료


In [14]:
# ============================================================
# PySpark 데이터 분석 + 머신러닝(MLlib) 실습 예제
# ============================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

In [16]:
## SparkSession 생성
# - SparkSession은 PySpark의 메인 진입점
# - master("Local[*]") --> CPU 코어 전체 사용

spark = SparkSession.builder \
    .appName("PySpark_ML_Example") \
    .master("local[*]") \
    .getOrCreate()

print("SparkSession 생성 완료")

SparkSession 생성 완료


## CSV 파일 로드

In [17]:
df = spark.read.csv("data/Subjects.csv", header=True, inferSchema=True)
# - header=True : 첫 번째 행을 컬럼명으로 사용
# - inferSchema=True : 데이터 타입 자동 추론

print("CSV 파일 로드 완료")
df.show()
                    

CSV 파일 로드 완료
+-----+----+---+---+----+-------+
|class|name|kor|eng|math|science|
+-----+----+---+---+----+-------+
|    1| aaa| 67| 87|  90|     98|
|    1| bbb| 45| 45|  56|     98|
|    1| ccc| 95| 59|  96|     88|
|    1| ddd| 65| 94|  89|     98|
|    1| eee| 45| 65|  78|     98|
|    1| fff| 78| 76|  98|     89|
|    2| ggg| 87| 67|  65|     56|
|    2| hhh| 89| 98|  78|     78|
|    2| iii|100| 78|  56|     65|
|    2| jjj| 99| 89|  87|     87|
|    2| kkk| 98| 45|  56|     54|
|    2| lll| 65| 89|  87|     78|
+-----+----+---+---+----+-------+



In [18]:
# ------------------------------------------------------------
# 데이터 전처리 및 파생변수 생성
# ------------------------------------------------------------
# -결측값은 0으로 대체
# -총점(total) 및 합격여부(pass) 컬럼 생성

df = df.fillna({"kor": 0, "eng": 0, "math": 0, "science": 0})
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df = df.withColumn("pass", when(col("total") >= 300, 1).otherwise(0))

print("데이터 처리 완료")
df.show()
                   


데이터 처리 완료
+-----+----+---+---+----+-------+-----+----+
|class|name|kor|eng|math|science|total|pass|
+-----+----+---+---+----+-------+-----+----+
|    1| aaa| 67| 87|  90|     98|  342|   1|
|    1| bbb| 45| 45|  56|     98|  244|   0|
|    1| ccc| 95| 59|  96|     88|  338|   1|
|    1| ddd| 65| 94|  89|     98|  346|   1|
|    1| eee| 45| 65|  78|     98|  286|   0|
|    1| fff| 78| 76|  98|     89|  341|   1|
|    2| ggg| 87| 67|  65|     56|  275|   0|
|    2| hhh| 89| 98|  78|     78|  343|   1|
|    2| iii|100| 78|  56|     65|  299|   0|
|    2| jjj| 99| 89|  87|     87|  362|   1|
|    2| kkk| 98| 45|  56|     54|  253|   0|
|    2| lll| 65| 89|  87|     78|  319|   1|
+-----+----+---+---+----+-------+-----+----+



In [21]:
# ------------------------------------------------------------
# 회귀모델 (Linear Regression)
# ------------------------------------------------------------
# - 입력 피처 : kor, eng, math, science
# - 타겟 : total
# - 목적 : 총점(total)을 예측하는 회귀모델

assembler = VectorAssembler(
    inputCols=["kor", "eng", "math", "science"],
    outputCol="features"
)

'''
Spark ML은 feature를 벡터 형태로 받아야 학습 가능
여러 컬럼(feature1, feature2...)도 한 컬럼(features)에 합침
'''
train_df = assembler.transform(df).select("features", "total") #x, y set

'''
featureCol : 독립 변수 벡터 컬럼
labelCol : 예측하려는 종속 변수
.fit(df) -> 회귀계수(weight)와 절편(intercept) 학습
'''
# 회귀 모델 생성 및 학습
lr = LinearRegression(featuresCol="features", labelCol="total")
lr_model = lr.fit(train_df)

# 예측 수행
''' .transform(df) → 학습한 모델로 df의 label 예측 '''
lr_predictions = lr_model.transform(train_df)

print("Linear Regression 모델 학습 완료")
lr_predictions.select("features", "total", "prediction").show(5)




Linear Regression 모델 학습 완료
+--------------------+-----+------------------+
|            features|total|        prediction|
+--------------------+-----+------------------+
|[67.0,87.0,90.0,9...|  342| 342.0000000000004|
|[45.0,45.0,56.0,9...|  244|243.99999999999918|
|[95.0,59.0,96.0,8...|  338|338.00000000000153|
|[65.0,94.0,89.0,9...|  346| 346.0000000000002|
|[45.0,65.0,78.0,9...|  286|285.99999999999903|
+--------------------+-----+------------------+
only showing top 5 rows

