# 성적데이터는 n이 적지만, 정규분포를 이룬다고 가정하자.

marks=[

    "김하나, English, 100",

    "김하나, Math, 80",

    "임하나, English, 70",

    "임하나, Math, 100",

    "김갑돌, English, 82.3",

    "김갑돌, Math, 98.5"

]



* 제출: ipynb 파일 1개 (zip하지 마세요).

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.sql import functions as F

sparkConfg = pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master('local')\
    .appName('myApp')\
    .config(conf=sparkConfg)\
    .getOrCreate()

# * 1-1 성적데이터로 DataFrame을 생성.

In [30]:
marks=[
    "김하나, English, 100",
    "김하나, Math, 80",
    "임하나, English, 70",
    "임하나, Math, 100",
    "김갑돌, English, 82.3",
    "김갑돌, Math, 98.5"
]

_marksRdd = spark.sparkContext.parallelize(marks)\
            .map(lambda x : x.split(", "))

In [31]:
_marksDf=spark.createDataFrame(_marksRdd, schema=["name", "subject", "mark"])

In [32]:
_marksDf.printSchema()

root
 |-- name: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- mark: string (nullable = true)



In [12]:
_marksDf.show()

+------+-------+----+
|  name|subject|mark|
+------+-------+----+
|김하나|English| 100|
|김하나|   Math|  80|
|임하나|English|  70|
|임하나|   Math| 100|
|김갑돌|English|82.3|
|김갑돌|   Math|98.5|
+------+-------+----+



# * 1-2 zscore 컬럼을 생성.

zscore를 계산하려면, 평균과 표준편차를 알아야 한다.

계산식에 F함수를 직접 사용하면 오류가 발생한다. 따로 평균과 표준편차를 구해서 계산식에서 사용해야 한다.

In [36]:
_marksDf = _marksDf.withColumn('markF', _marksDf['mark'].cast(FloatType()))

In [37]:
from pyspark.sql import functions as F

_markStats = _marksDf.select(
    F.mean('markF').alias('mean'),
    F.stddev('markF').alias('std')
).collect()

In [38]:
_markStats[0]

Row(mean=88.46666717529297, std=12.786190172956093)

In [39]:
meanMark = _markStats[0]['mean']
stdMark = _markStats[0]['std']

In [40]:
# x = x-평균 / 표준편차
zscoreUdf = F.udf(lambda x: (x-meanMark)/stdMark, FloatType()) 

_marksDf=_marksDf.withColumn("zscore", zscoreUdf(_marksDf['markF']))

In [41]:
_marksDf.show()

+------+-------+----+-----+-----------+
|  name|subject|mark|markF|     zscore|
+------+-------+----+-----+-----------+
|김하나|English| 100|100.0|  0.9020148|
|김하나|   Math|  80| 80.0| -0.6621728|
|임하나|English|  70| 70.0| -1.4442666|
|임하나|   Math| 100|100.0|  0.9020148|
|김갑돌|English|82.3| 82.3|-0.48229098|
|김갑돌|   Math|98.5| 98.5| 0.78470075|
+------+-------+----+-----+-----------+



# * 1-3 cdf 컬럼을 생성.

In [42]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
from scipy.stats import norm

# 누적 확률 스파크는 float64 지원안해서 float 로 변환 후 넣어줌
normCdf = F.udf(lambda x: float(norm.cdf(x))) 

In [43]:
_marksDf=_marksDf.withColumn("cdf", normCdf(_marksDf['zscore']))

In [44]:
_marksDf.show()

+------+-------+----+-----+-----------+-------------------+
|  name|subject|mark|markF|     zscore|                cdf|
+------+-------+----+-----+-----------+-------------------+
|김하나|English| 100|100.0|  0.9020148| 0.8164754981807292|
|김하나|   Math|  80| 80.0| -0.6621728| 0.2539302463290559|
|임하나|English|  70| 70.0| -1.4442666| 0.0743320011235712|
|임하나|   Math| 100|100.0|  0.9020148| 0.8164754981807292|
|김갑돌|English|82.3| 82.3|-0.48229098|0.31479962882028223|
|김갑돌|   Math|98.5| 98.5| 0.78470075| 0.7836854740814176|
+------+-------+----+-----+-----------+-------------------+

