In [80]:
import pyspark
import os
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from scipy.stats import norm

os.environ["PYSPARK_PYTHON"]='C:\\Users\\201910810\\Anaconda3\\python.exe'
os.environ["PYSPARK_DRIVER_PYTHON"]='C:\\Users\\201910810\\Anaconda3\\python.exe'

myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

# 1-1 성적데이터로 DataFrame을 생성.

In [81]:
data=[
    "'김하나','English', 100",
    "'김하나','Math', 80",
    "'임하나','English', 70",
    "'임하나','Math', 100",
    "'김갑돌','English', 82.3",
    "'김갑돌','Math', 98.5"
]
marks = spark.sparkContext.parallelize(data)

In [82]:
cols = ['name','subject','mark']
marksDf=spark.createDataFrame(marks.map(lambda x:x.split(',')), cols)

marksDf.show()

+--------+---------+-----+
|    name|  subject| mark|
+--------+---------+-----+
|'김하나'|'English'|  100|
|'김하나'|   'Math'|   80|
|'임하나'|'English'|   70|
|'임하나'|   'Math'|  100|
|'김갑돌'|'English'| 82.3|
|'김갑돌'|   'Math'| 98.5|
+--------+---------+-----+



# 1-2 zscore 컬럼을 생성.

In [83]:
marksMean = marksDf.agg({"mark":"mean"}).take(1)[0][0]
marksStd = marksDf.agg({"mark":"stddev"}).take(1)[0][0]

In [84]:
floatUdf = udf(lambda x: float(x), FloatType())
marksDf=marksDf.withColumn("markFloat", floatUdf(marksDf['mark']))
zscoreUdf = udf(lambda x: (x-marksMean)/marksStd, FloatType())
marksDf=marksDf.withColumn("zscore", zscoreUdf(marksDf['markFloat']))
marksDf = marksDf.drop("markFloat")
marksDf.show()

+--------+---------+-----+-----------+
|    name|  subject| mark|     zscore|
+--------+---------+-----+-----------+
|'김하나'|'English'|  100| 0.90201485|
|'김하나'|   'Math'|   80|-0.66217273|
|'임하나'|'English'|   70| -1.4442666|
|'임하나'|   'Math'|  100| 0.90201485|
|'김갑돌'|'English'| 82.3|-0.48229116|
|'김갑돌'|   'Math'| 98.5| 0.78470075|
+--------+---------+-----+-----------+



# 1-3 cdf 컬럼을 생성.

In [85]:
cdfUdf = udf(lambda x: float(norm.cdf(x)))
marksDf=marksDf.withColumn("cdf", cdfUdf(marksDf['zscore']))
marksDf.show()

+--------+---------+-----+-----------+-------------------+
|    name|  subject| mark|     zscore|                cdf|
+--------+---------+-----+-----------+-------------------+
|'김하나'|'English'|  100| 0.90201485| 0.8164755032306112|
|'김하나'|   'Math'|   80|-0.66217273|0.25393026395894597|
|'임하나'|'English'|   70| -1.4442666|0.07433201139097229|
|'임하나'|   'Math'|  100| 0.90201485| 0.8164755032306112|
|'김갑돌'|'English'| 82.3|-0.48229116|0.31479956212721427|
|'김갑돌'|   'Math'| 98.5| 0.78470075| 0.7836854804484268|
+--------+---------+-----+-----------+-------------------+

