### 일반 모델링

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Logit').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/13 11:58:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.ml.classification import LogisticRegression

In [4]:
df = spark.read.format('libsvm').load('sample_libsvm_data.txt')

22/06/13 11:58:15 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
                                                                                

In [5]:
logit = LogisticRegression()

In [6]:
logit_model = logit.fit(df)

22/06/13 11:58:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/06/13 11:58:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/06/13 11:58:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/06/13 11:58:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/06/13 11:58:27 WARN BlockManager: Asked to remove block broadcast_25_piece0, which does not exist


In [7]:
lg = logit_model.summary

In [8]:
lg.predictions.show()



+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[20.3777627514872...|[0.99999999858729...|       0.0|
|  1.0|(692,[158,159,160...|[-21.114014198868...|[6.76550380000472...|       1.0|
|  1.0|(692,[124,125,126...|[-23.743613234676...|[4.87842678716177...|       1.0|
|  1.0|(692,[152,153,154...|[-19.192574012720...|[4.62137287298144...|       1.0|
|  1.0|(692,[151,152,153...|[-20.125398874699...|[1.81823629113068...|       1.0|
|  0.0|(692,[129,130,131...|[20.4890549504196...|[0.99999999873608...|       0.0|
|  1.0|(692,[158,159,160...|[-21.082940212814...|[6.97903542823766...|       1.0|
|  1.0|(692,[99,100,101,...|[-19.622713503550...|[3.00582577446132...|       1.0|
|  0.0|(692,[154,155,156...|[21.1594863606582...|[0.99999999935352...|       0.0|
|  0.0|(692,[127

In [9]:
train, test = df.randomSplit([0.7,0.3])

In [10]:
model = logit.fit(train)

In [11]:
r = model.evaluate(test)

In [12]:
r.accuracy

0.9743589743589743

In [13]:
r.precisionByLabel

[0.9411764705882353, 1.0]

In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [15]:
bi_eval = BinaryClassificationEvaluator()

In [20]:
r_pred = r.predictions

In [22]:
bi_roc = bi_eval.evaluate(r.predictions)



In [23]:
bi_roc

0.9972826086956521

### 타이타닉 예제


In [24]:
# 데이터 불러오기
df = spark.read.csv('titanic.csv', inferSchema = True, header = True)

In [25]:
# 스키마 확인
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [26]:
# 컬럼 확인
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [28]:
cols = df.select(['Survived',
 'Pclass',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Fare',
 'Embarked'])

In [45]:
from pyspark.sql.functions import isnan, when, count, col

# null값 확인
for i, j in zip(cols.columns, cols.select([count(when(col(c).isNull(), c)) for c in cols.columns]).collect()[0]):
    print(i, j)

Survived 0
Pclass 0
Sex 0
Age 177
SibSp 0
Parch 0
Fare 0
Embarked 2


In [46]:
# 데이터 개수 파악
cols.count()

891

In [62]:
# Age를 파악할 수 있을지 Feature 별 상관계수 파악
for i in cols.columns:
    if str(cols.schema[i].dataType) != 'StringType' :
        print(i, cols.corr('Age', i))

Survived 0.010539215871285685
Pclass -0.36135321538780957
Age 1.0
SibSp -0.18466352835224448
Parch -0.04878608272014969
Fare 0.135515853527051


상관계수가 특출나게 높은 feature가 없으므로, mean 또는 삭제해서 진행해야함


In [64]:
# 결측치 삭제처리
cols = cols.na.drop()

In [65]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

In [70]:
# 성별을 StringIndexer로 OrdinalEncoding하기
gender_idxer = StringIndexer(inputCol = 'Sex', outputCol = 'SexIndex')

# 성별 number를 OneHotEncoding 하기
gender_encoder = OneHotEncoder(inputCol = 'SexIndex',outputCol = 'SexVec')

In [71]:
# embark를 StringIndexer로 OrdinalEncoding하기
embark_idxer = StringIndexer(inputCol = 'Embarked', outputCol = 'EmbarkIndex')

# embarkIndex를 OneHotEncoding 하기
embark_encoder = OneHotEncoder(inputCol = 'EmbarkIndex',outputCol = 'EmbarkVec')

In [73]:
# 전체 벡터화 어셈블러 정의
assembler = VectorAssembler(inputCols=['Pclass','SexVec','EmbarkVec','Age','SibSp','Parch','Fare'], outputCol='features')

In [74]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [77]:
# 복잡한 구조를 한번에 만들어 줄 파이프라인 활용

logit = LogisticRegression(featuresCol='features', labelCol= 'Survived')

In [81]:
pipeline = Pipeline(stages = [gender_idxer, # 성별 수치화
                              embark_idxer, # embark 수치화
                              gender_encoder, # 성별 벡터화(원핫)
                              embark_encoder, # embark 벡터화(원핫)
                              assembler, # 전체 feature 벡터화
                              logit]) # 로지스틱 회귀모델

In [82]:
train, test = cols.randomSplit([0.8,0.2])

In [83]:
logit_model = pipeline.fit(train)

In [84]:
results = logit_model.transform(test)

In [85]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [86]:
# 평가 진행하기 위한 prediction columne을 지정한 이유는 항상 예측(transform)을 진행하면 prediction col이 생기기 때문
eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol= 'Survived')

In [89]:
# 예측값과 실제값 비교해보기
results.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [91]:
# label 분포 파악해보기
cols.groupBy('Survived').count().orderBy('count').show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  288|
|       0|  424|
+--------+-----+



In [92]:
auc = eval.evaluate(results)

In [93]:
auc

0.8171355498721228

이상 이항분류평가를 이용한 작업이며, 다항분류평가를 이용해 다양한(정밀도, 재현율, 정확도) 메트릭을 파악할 수 있음