In [49]:
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

## 문제: 서울시 지하철 호선별 역별 유/무임 승하차 인원 분석



* (1-1) '서울시 지하철 호선별 역별 유_무임 승하차 인원 정보.csv'를 데이터프레임으로 읽고, 스키마를 출력

In [50]:
import os

myDf = spark\
            .read.option("charset", "euc-kr")\
            .option("header", "true")\
            .option("inferschema","true")\
            .csv(os.path.join("data","서울시 지하철 호선별 역별 유_무임 승하차 인원 정보.csv"))


In [51]:
myDf.printSchema()

root
 |-- 사용월: integer (nullable = true)
 |-- 호선명: string (nullable = true)
 |-- 지하철역: string (nullable = true)
 |-- 유임승차인원: integer (nullable = true)
 |-- 무임승차인원: integer (nullable = true)
 |-- 유임하차인원: integer (nullable = true)
 |-- 무임하차인원: integer (nullable = true)
 |-- 작업일자: integer (nullable = true)



* (1-2) 건수를 출력

In [52]:
myDf.count()

55171

* (1-3) '호선명'별 '유임승차인원'의 순위를 매기고, 그 결과를 30줄 출력 (마지막 'rank' 컬럼)

In [53]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

myDf.withColumn("rank", F.rank().over(Window().partitionBy(['호선명']).orderBy(F.col("유임승차인원").desc()))).show(30)

+------+------+--------+------------+------------+------------+------------+--------+----+
|사용월|호선명|지하철역|유임승차인원|무임승차인원|유임하차인원|무임하차인원|작업일자|rank|
+------+------+--------+------------+------------+------------+------------+--------+----+
|201605|일산선|    화정|      515925|      139163|      535460|      142410|20160608|   1|
|201512|일산선|    화정|      508044|      136734|      529106|      139931|20160108|   2|
|201603|일산선|    화정|      502144|      140596|      526030|      144088|20160408|   3|
|201703|일산선|    화정|      499911|      141327|      529944|      145502|20170403|   4|
|201510|일산선|    화정|      499388|      520595|      140545|      144148|20151108|   5|
|201905|일산선|    화정|      499218|      149790|      520966|      152915|20190603|   6|
|201612|일산선|    화정|      497514|      132807|      521734|      136086|20170108|   7|
|201805|일산선|    화정|      494994|      139239|      515748|      142997|20180603|   8|
|201610|일산선|    화정|      492146|      137462|      513849|      140303|201611

* (1-4) '호선명'별 '유임승차인원'이 순위가 가장 높은 경우만 30건 출력 (마지막 'rank' 컬럼이 모두 1)

In [54]:
myDf.withColumn("rank", F.rank().over(Window().partitionBy(['호선명']).orderBy(F.col("유임승차인원").desc())))\
    .filter(F.col("rank")==1)\
    .show(30)

+------+--------------+--------------------+------------+------------+------------+------------+--------+----+
|사용월|        호선명|            지하철역|유임승차인원|무임승차인원|유임하차인원|무임하차인원|작업일자|rank|
+------+--------------+--------------------+------------+------------+------------+------------+--------+----+
|201605|        일산선|                화정|      515925|      139163|      535460|      142410|20160608|   1|
|201905|        장항선|                아산|      109243|       12901|       93737|       11808|20190603|   1|
|201512|        경부선|              영등포|     1439049|      280623|     1545631|      280969|20160108|   1|
|201905|    우이신설선|        북한산보국문|      161867|       45041|      141742|       45814|20190603|   1|
|201512|        분당선|                야탑|      769442|      157846|      809197|      156034|20160108|   1|
|201512|         7호선|      가산디지털단지|     1245731|       82588|     1243464|       78424|20160108|   1|
|201905|        수인선|              인하대|      200474|       28246|      189885|   

* (1-5) '호선명'별 '유임승차인원'의 zscore를 계산하고 출력. 윈도우 함수를 사용하여야 한다.

In [None]:
_marksDf = _marksDf.withColumn("zscore2", (F.col('markF')-F.avg('markF').over(byAll))/F.stddev('markF').over(byAll))

## 문제2: 회귀분석

* sklearn의 make_regression함수를 다음과 같이 사용하여 데이터를 생성하고, 문제를 푸세요.

In [56]:
from sklearn.datasets import make_regression

X, y, coef = make_regression(n_samples = 200,

                            n_features = 4,

                            n_informative = 3,

                            n_targets = 1,

                            noise = 0.0,

                            coef = True)

* (2-1) gradient 방법으로 회귀식을 계산하고, 계수를 출력.
또한 [식-1]의 'coef'를 출력하고, gradient방법에서 도출된 계수와 일치하는지 비교하세요.



In [80]:
import random

def computeMSE(a,b,c,d,e,x,y):
    totalError = 0
    for i in range(0, len(x)):
        totalError += (y[i] - (a + b* x[i][0])+ c* x[i][1]+ d* x[i][2]+ e* x[i][3]) ** 2
    return totalError / float(len(x))

#x: attribute, 1d float array or list
#y: class, 1d int array
#alpha: learning rate
def gradientDescentL(x,y,alpha,iter):
    a=random.random()
    b=random.random()
    c=random.random()
    d=random.random()
    e=random.random()
    alpha=0.001
    n=len(x)
    for j in range(iter):
        aGradient = 0
        bGradient = 0
        cGradient = 0
        dGradient = 0
        eGradient = 0
        for i in range(n):
            aGradient += (2./n) * (((a + b* x[i][0]+ c* x[i][1]+ d* x[i][2]+ e* x[i][3])) - y[i])*(x[i][3])
            bGradient += (2./n) * (((a + b* x[i][0]+ c* x[i][1]+ d* x[i][2]+ e* x[i][3])) - y[i])*(x[i][2])
            cGradient += (2./n) * (((a + b* x[i][0]+ c* x[i][1]+ d* x[i][2]+ e* x[i][3])) - y[i])*(x[i][1])
            dGradient += (2./n) * (((a + b* x[i][0]+ c* x[i][1]+ d* x[i][2]+ e* x[i][3])) - y[i])*(x[i][0])
            eGradient += (2./n) * (((a + b* x[i][0]+ c* x[i][1]+ d* x[i][2]+ e* x[i][3])) - y[i])*(1)
            
            #aGradient += (2./n) * (y[i] - ((a + b * x[i])))*(-1)
            #bGradient += (2./n) * (y[i] - ((a + b * x[i])))*(-x[i])
        
        a = a - (alpha * aGradient)
        b = b - (alpha * bGradient)
        c = c - (alpha * cGradient)
        d = d - (alpha * dGradient)
        e = e - (alpha * eGradient)
        
        if (j%100==0):
            print ("iter:{0} AvgError={1:.3f}".format(j,computeMSE(a,b,c,d,e,x,y)))
    return a,b,c,d,e

In [81]:
a,b,c,d,e = gradientDescentL(X, y, alpha, 1000)

iter:0 AvgError=10703.018
iter:100 AvgError=13153.058
iter:200 AvgError=15812.046
iter:300 AvgError=18486.782
iter:400 AvgError=21078.030
iter:500 AvgError=23550.161
iter:600 AvgError=25912.617
iter:700 AvgError=28210.152
iter:800 AvgError=30520.178
iter:900 AvgError=32956.747


In [79]:
print("a={0}, b={1}, c={2}, d={3}, e={4}\n".format(a,b,c,d,e))
print("coef: ",coef)

a=15.623259187794936, b=17.8365854809513, c=67.79566685521836, d=62.13947140029701, e=-17.904549714526702

coef:  [55.3360286  83.78717654 39.08701593  0.        ]


* (2-2) 스파크 데이터프레임을 생성 (이후 계속 스파크로 푸세요)

In [115]:
import pandas as pd
column_names = ['c1','c2','c3','c4']
pDf1 = pd.DataFrame(X, columns = column_names)
pDf2 = pd.DataFrame(y, columns = ['label'])
pDf = pd.concat([pDf1,pDf2],axis = 1)
myDf = spark.createDataFrame(pDf)

In [116]:
myDf.show()

+--------------------+-------------------+--------------------+--------------------+-------------------+
|                  c1|                 c2|                  c3|                  c4|              label|
+--------------------+-------------------+--------------------+--------------------+-------------------+
| -0.5512403857978609|-0.5965031205554089|-0.03364710408861073|-0.29776578515002167| -81.79793091627283|
|  0.1480724023308886| 1.3696862884074434|-0.06062070640317007|  0.6074273918565379| 120.58640302306809|
|0.013323315921294604|-0.0932709415354416|   1.085331164129407|  0.9385017786465617|  35.34470705162554|
| -0.5902230222526916| 1.8676691134617172|  2.8052006040792103| -0.6826307783894384|  233.4730443928036|
|  0.7265051462610764| -1.669587779785849| -1.2191792912549058|  1.4481460981561571|-147.34221688232523|
|  0.9183160600969698|-1.0090764786305508|   1.509863487999698|-0.22673783247861345| 25.284352922658954|
|-0.13428669127646833|-0.5835265175464377| -0.486948573

* (2-3) 훈련, 테스트 데이터를 6:4 비율로 분리하고,

In [117]:
(trainDf, testDf) = myDf.randomSplit([0.6,0.4])
print("trainDf: {}, testDf: {}".format(trainDf.count(), testDf.count()))

trainDf: 116, testDf: 84


In [118]:
trainDf

DataFrame[c1: double, c2: double, c3: double, c4: double, label: double]

* (2-4) 회귀모델링하고 계수와 절편 출력. 

In [119]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols = ["c1","c2","c3","c4"],
                                   outputCol = 'features')

In [120]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [121]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[va, lr])

In [122]:
model = pipeline.fit(trainDf)
modelTrainDf = model.transform(trainDf)


In [123]:
print("Coefficients: {}".format(model.stages[-1].coefficients))
print("Intercept: {:.3f}".format(model.stages[-1].intercept))

Coefficients: [55.00133172771247,83.49451353486926,38.853993265201375,0.0]
Intercept: 0.024


In [124]:
modelTrainDf.select('label','prediction').show(10)

+-------------------+-------------------+
|              label|         prediction|
+-------------------+-------------------+
| -241.6841454869239| -240.3167474512355|
|-217.01780831752205| -215.8120855419073|
| -7.822469937755862| -7.627551789559692|
| -48.39870580667795|-47.761582061913764|
|-144.16434072579023|-143.42221578964248|
|  90.17808404484983|  89.95528082401484|
| -164.0553757050139| -163.2867671673788|
| -1.195234751855196|-1.0431309036502148|
| -54.69345936658635|-54.297835869381494|
| -96.28753259921422| -95.84630430737563|
+-------------------+-------------------+
only showing top 10 rows



* (2-5) 테스트 데이터에 대해 R2를 구해서 출력하세요.

In [125]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator=RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

In [126]:
f"r2: {evaluator.evaluate(modelTestDf):.3f}"

'r2: 1.000'

## 문제3: 텍스트 분류

* (3-1) 데이터프레임을 생성하세요.

In [133]:
_tRdd=spark.sparkContext\
    .textFile(os.path.join('data','review2.txt'))

trdd = _tRdd.map(lambda x: x.split('|')).map(lambda x:[float(x[0]),x[1], x[2]])
myDf2 = spark.createDataFrame(trdd, ['cls','pos','neg'])

myDf2.show()

+---+----------------------------------+----------------------------------+
|cls|                               pos|                               neg|
+---+----------------------------------+----------------------------------+
|0.0|        진짜 꼭 보세요 최고의 영화| 고구마 먹은 영화 유치해서 못보...|
|0.0|  엔드게임 이후 마블이 한 번 더...|  노잼이네요.. 왜 배우들이 출연...|
|0.0| 정말 오랜만에 감동 울었네... 펑펑|   제발 뻘짓 좀 그만하셨으면......|
|0.0|연출에서 등장인물 하나하나의 아...| 스토리가 매우 매우 아쉽다 지루...|
|0.0|어렸을때는 그냥 액션과 화려함에...|             영화 약간 이해가 안됨|
|0.0|               꼭 보세여 넘 재밌어|좋아하는 배우들인데 최근 영화라...|
|0.0| 어릴적 영화관에서 처음으로 본 ...| 그지같네 도대체 이 따위 영화를...|
|0.0|올해 본 영화 중 최고의 영화입니다.|  스크린보다 시계를 더 많이 봄....|
|0.0|  극장에서 보는 거 적극 추천 최고!| 이때까지 본 영화중에 제일 최악...|
|0.0|    인생영화이고 평생 못잊을거같다|    나만 짜증.. 줄거리 한줄평.....|
|0.0|  그냥 진짜 재밌어 진짜 잘 만든...|로맨틱 코미디로 방향을 잡았으면...|
|0.0|  자연과 인간, 아버지와 아들 등...| 똥 싸다가 막힌 느낌 안보는거를...|
|0.0|진정한 아름다움은 완벽이 아니라...|  어우 핵노잼 영화가 왜 이렇게 ...|
|0.0|                띵작입니다 ㄹㅇ!!!| 솔직히 재미없음 감독이 뭘 말할...|
|0.0|        감동 그 자체 스파이더맨 짱| 어설픈 연출력 어설픈 결말 현실...|

* (3-2) 불용어 제거, 불필요 단어 정리

In [136]:
from pyspark.ml.feature import RegexTokenizer

regexTok = RegexTokenizer(inputCol = 'pos',outputCol = 'posRex', pattern="\\s+")
_tokDf = regexTok.transform(myDf2)

regexTok = RegexTokenizer(inputCol = 'neg',outputCol = 'negRex', pattern="\\s+")
_tokDf = regexTok.transform(_tokDf)
_tokDf.show()

+---+----------------------------------+----------------------------------+----------------------------------+--------------------------------+
|cls|                               pos|                               neg|                            posRex|                          negRex|
+---+----------------------------------+----------------------------------+----------------------------------+--------------------------------+
|0.0|        진짜 꼭 보세요 최고의 영화| 고구마 먹은 영화 유치해서 못보...|     [진짜, 꼭, 보세요, 최고의,...|  [고구마, 먹은, 영화, 유치해...|
|0.0|  엔드게임 이후 마블이 한 번 더...|  노잼이네요.. 왜 배우들이 출연...|    [엔드게임, 이후, 마블이, 한...|  [노잼이네요.., 왜, 배우들이...|
|0.0| 정말 오랜만에 감동 울었네... 펑펑|   제발 뻘짓 좀 그만하셨으면......|    [정말, 오랜만에, 감동, 울었...|  [제발, 뻘짓, 좀, 그만하셨으...|
|0.0|연출에서 등장인물 하나하나의 아...| 스토리가 매우 매우 아쉽다 지루...|  [연출에서, 등장인물, 하나하나...|  [스토리가, 매우, 매우, 아쉽...|
|0.0|어렸을때는 그냥 액션과 화려함에...|             영화 약간 이해가 안됨|    [어렸을때는, 그냥, 액션과, ...|      [영화, 약간, 이해가, 안됨]|
|0.0|               꼭 보세여 넘 재밌어|좋아하는 배우들인데 최근 영화라...|          [꼭, 보세여

In [137]:
from pyspark.sql import functions as f
import re

def trim(wordList):
    regex = re.compile('\d+')
    cleaned = list()
    for w in wordList:
        if not regex.match(w):
            cleaned.append(w.lstrip('‘').rstrip('’').rstrip('.').rstrip(',').replace("'","").replace('"','').replace("(","").replace(")",""))
    return cleaned