In [1]:
import os
os.environ["PYSPARK_PYTHON"]="C:\\Anaconda3\\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"]="C:\\Anaconda3\\python.exe"

In [2]:
import pyspark

myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

### 1-1 성적데이터로 DataFrame을 생성

In [3]:
marks=[
    "김하나, English, 100",
    "김하나, Math, 80",
    "임하나, English, 70",
    "임하나, Math, 100",
    "김갑돌, English, 82.3",
    "김갑돌, Math, 98.5"
]

score = spark.sparkContext.parallelize(marks)
scores = score.map(lambda line: line.split(","))

scores = scores.toDF()

scores = scores.withColumn("name", scores._1.cast("string"))\
                .withColumn("subject", scores['_2'].cast("string"))\
                .withColumn("mark", scores['_3'].cast("double"))


scores = scores.drop('_1').drop('_2').drop('_3')

In [4]:
scores.printSchema()
scores.show()

root
 |-- name: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- mark: double (nullable = true)

+------+--------+-----+
|  name| subject| mark|
+------+--------+-----+
|김하나| English|100.0|
|김하나|    Math| 80.0|
|임하나| English| 70.0|
|임하나|    Math|100.0|
|김갑돌| English| 82.3|
|김갑돌|    Math| 98.5|
+------+--------+-----+



### 1-2 zscore 컬럼을 생성

In [5]:
from pyspark.sql import functions as F

mean_std = scores.select(F.mean('mark'), F.stddev('mark')).collect()

mean_scores = mean_std[0][0]
std_scores = mean_std[0][1]

In [6]:
from pyspark.sql.types import FloatType

zscore = F.udf(lambda x: (x-mean_scores)/std_scores, FloatType())

In [7]:
scores = scores.withColumn("zscore", zscore(scores['mark']))

In [8]:
scores.show()

+------+--------+-----+-----------+
|  name| subject| mark|     zscore|
+------+--------+-----+-----------+
|김하나| English|100.0| 0.90201485|
|김하나|    Math| 80.0|-0.66217273|
|임하나| English| 70.0| -1.4442666|
|임하나|    Math|100.0| 0.90201485|
|김갑돌| English| 82.3|-0.48229116|
|김갑돌|    Math| 98.5| 0.78470075|
+------+--------+-----+-----------+



### 1-3 cdf 컬럼을 생성

In [9]:
from scipy.stats import norm

cdf_cal = F.udf(lambda x: float(norm.cdf(x)))
scores = scores.withColumn("cdf", cdf_cal(scores['zscore']))

In [10]:
scores.show()

+------+--------+-----+-----------+-------------------+
|  name| subject| mark|     zscore|                cdf|
+------+--------+-----+-----------+-------------------+
|김하나| English|100.0| 0.90201485| 0.8164755032306112|
|김하나|    Math| 80.0|-0.66217273|0.25393026395894597|
|임하나| English| 70.0| -1.4442666|0.07433201139097229|
|임하나|    Math|100.0| 0.90201485| 0.8164755032306112|
|김갑돌| English| 82.3|-0.48229116|0.31479956212721427|
|김갑돌|    Math| 98.5| 0.78470075| 0.7836854804484268|
+------+--------+-----+-----------+-------------------+

