성적데이터는 n이 적지만, 정규분포를 이룬다고 가정하자.

marks=[

    "김하나, English, 100",

    "김하나, Math, 80",

    "임하나, English, 70",

    "임하나, Math, 100",

    "김갑돌, English, 82.3",

    "김갑돌, Math, 98.5"

]

In [1]:
import pyspark

myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

## 1-1 성적데이터로 DataFrame을 생성.

In [2]:
marks = ["김하나, English, 100",
       "김하나, Math, 80",
       "임하나, English, 70",
       "임하나, Math, 100",
       "김갑돌, English, 82.3",
       "김갑돌, Math, 98.5" ]
marksRdd = spark.sparkContext.parallelize(marks).map(lambda x:x.split(','))

In [3]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, DoubleType
schema = StructType([
    StructField("name", StringType(), True),
    StructField("subject", StringType(), True),
    StructField("mark_str", StringType(), True)
])
marksDf = spark.createDataFrame(marksRdd, schema)
marksDf = marksDf.withColumn('mark', marksDf.mark_str.cast("float"))
marksDf = marksDf.drop(marksDf.mark_str)

In [4]:
marksDf.show()

+------+--------+-----+
|  name| subject| mark|
+------+--------+-----+
|김하나| English|100.0|
|김하나|    Math| 80.0|
|임하나| English| 70.0|
|임하나|    Math|100.0|
|김갑돌| English| 82.3|
|김갑돌|    Math| 98.5|
+------+--------+-----+



## 1-2 zscore 컬럼을 생성.

zscore를 계산하려면, 평균과 표준편차를 알아야 한다.

계산식에 F함수를 직접 사용하면 오류가 발생한다. 따로 평균과 표준편차를 구해서 계산식에서 사용해야 한다.

In [5]:
import numpy as np
marksArr = marksDf.select('mark').collect()
marksArr = np.array(marksArr).reshape(-1)
avr = np.mean(marksArr)
std = np.std(marksArr)

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

zscore = udf(lambda x: float((x-avr)/std), FloatType())
marksDf = marksDf.withColumn('zscore', zscore(marksDf['mark']))

In [6]:
marksDf.show()

+------+--------+-----+----------+
|  name| subject| mark|    zscore|
+------+--------+-----+----------+
|김하나| English|100.0| 0.9881077|
|김하나|    Math| 80.0|-0.7253739|
|임하나| English| 70.0|-1.5821148|
|임하나|    Math|100.0| 0.9881077|
|김갑돌| English| 82.3|-0.5283233|
|김갑돌|    Math| 98.5| 0.8595966|
+------+--------+-----+----------+



## 1-3 cdf 컬럼을 생성.

scipy.stats.norm.cdf() 함수는 데이터타입을 float로 맞추어 주어야 한다.

cdf는 평균=0, 표준편차=1을 기본 값으로 누적확률을 계산한다.

In [7]:
from scipy.stats import norm
_cdf = udf(lambda x: float(norm.cdf(x)), FloatType())
marksDf = marksDf.withColumn('cdf', _cdf(marksDf['zscore']))

In [8]:
marksDf.show()

+------+--------+-----+----------+-----------+
|  name| subject| mark|    zscore|        cdf|
+------+--------+-----+----------+-----------+
|김하나| English|100.0| 0.9881077|  0.8384501|
|김하나|    Math| 80.0|-0.7253739| 0.23411132|
|임하나| English| 70.0|-1.5821148|0.056811687|
|임하나|    Math|100.0| 0.9881077|  0.8384501|
|김갑돌| English| 82.3|-0.5283233| 0.29863748|
|김갑돌|    Math| 98.5| 0.8595966|  0.8049943|
+------+--------+-----+----------+-----------+



In [9]:
spark.stop()