# 라이브러리 호출

In [1]:
from pyspark.ml.feature import (
    StringIndexer,        # 범주형 → 수치형 인코딩
    OneHotEncoder,        # 원-핫 인코딩
    VectorAssembler,      # 여러 컬럼 → 하나의 feature 벡터
    StandardScaler,       # 표준 정규화
    MinMaxScaler,         # 최소/최대 스케일링
    Bucketizer,           # 연속형 변수 → 구간화
    QuantileDiscretizer,  # 분위수 기반 구간화
    PCA,                  # 주성분 분석
    PolynomialExpansion,  # 다항 특성 생성
    ChiSqSelector         # 카이제곱 기반 피처 선택
)
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    GBTClassifier,
    NaiveBayes,
    MultilayerPerceptronClassifier
)

from pyspark.ml.regression import (
    LinearRegression,
    DecisionTreeRegressor,
    RandomForestRegressor,
    GBTRegressor
)
from pyspark.ml.clustering import (
    KMeans,
    GaussianMixture,
    BisectingKMeans,
    LDA  # Latent Dirichlet Allocation (토픽 모델링)
)
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
    RegressionEvaluator,
    ClusteringEvaluator
)
from pyspark.ml import Pipeline  # 전체 파이프라인 구성

from pyspark.ml.tuning import (   # 모델 튜닝
    ParamGridBuilder,
    CrossValidator,
    TrainValidationSplit
)
from pyspark.ml.linalg import Vectors, DenseVector, SparseVector  # 벡터 수동 생성
from pyspark.ml.stat import Correlation, ChiSquareTest            # 통계 테스트

# 스파크 세션

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('classificationExample1').getOrCreate()

# 타이타닉 데이터를 이용한 생존여부 예측모델

## 로지스틱 회귀 모델

In [3]:
# 데이터 불러오기
data = spark.read.csv("learning_spark_data/titanic.csv", header=True, inferSchema = True)
data.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|Gender| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [4]:
from pyspark.sql.functions import col, sum, when, isnan

# 결측치 처리
null_counts = data.select(
                    [
                        sum( when(col(c).isNull() | isnan(c),1).otherwise(0) ).alias(c)
                    for c in data.columns
                    ]
                )
null_counts.show()

+-----------+--------+------+----+------+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Gender|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+------+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|     0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+------+---+-----+-----+------+----+-----+--------+



In [5]:
# feature selection
data_1= data.select('Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare')
data_1.show(3)

+--------+------+------+----+-----+-----+-------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|
+--------+------+------+----+-----+-----+-------+
|       0|     3|  male|22.0|    1|    0|   7.25|
|       1|     1|female|38.0|    1|    0|71.2833|
|       1|     3|female|26.0|    0|    0|  7.925|
+--------+------+------+----+-----+-----+-------+
only showing top 3 rows



In [6]:
# age 결측치 처리 - 평균값으로 대체
mean_age = data_1.select('Age').agg( {
    "Age" : "mean"
}).collect()[0][0]
mean_age

29.69911764705882

In [7]:
data_1 = data_1.fillna( {"Age" : mean_age})
data_1.show(10)

+--------+------+------+-----------------+-----+-----+-------+
|Survived|Pclass|Gender|              Age|SibSp|Parch|   Fare|
+--------+------+------+-----------------+-----+-----+-------+
|       0|     3|  male|             22.0|    1|    0|   7.25|
|       1|     1|female|             38.0|    1|    0|71.2833|
|       1|     3|female|             26.0|    0|    0|  7.925|
|       1|     1|female|             35.0|    1|    0|   53.1|
|       0|     3|  male|             35.0|    0|    0|   8.05|
|       0|     3|  male|29.69911764705882|    0|    0| 8.4583|
|       0|     1|  male|             54.0|    0|    0|51.8625|
|       0|     3|  male|              2.0|    3|    1| 21.075|
|       1|     3|female|             27.0|    0|    2|11.1333|
|       1|     2|female|             14.0|    1|    0|30.0708|
+--------+------+------+-----------------+-----+-----+-------+
only showing top 10 rows



In [8]:
# 데이터 인코딩 StringIndexer
indexer = StringIndexer( inputCol='Gender', outputCol='SexIndexer')
data_1 = indexer.fit(data_1).transform(data_1)
data_1.show(5)

+--------+------+------+----+-----+-----+-------+----------+
|Survived|Pclass|Gender| Age|SibSp|Parch|   Fare|SexIndexer|
+--------+------+------+----+-----+-----+-------+----------+
|       0|     3|  male|22.0|    1|    0|   7.25|       0.0|
|       1|     1|female|38.0|    1|    0|71.2833|       1.0|
|       1|     3|female|26.0|    0|    0|  7.925|       1.0|
|       1|     1|female|35.0|    1|    0|   53.1|       1.0|
|       0|     3|  male|35.0|    0|    0|   8.05|       0.0|
+--------+------+------+----+-----+-----+-------+----------+
only showing top 5 rows



In [9]:
# FeatureVector 생성
assembler = VectorAssembler(
    inputCols=['Pclass', 'SexIndexer', 'Age', 'SibSp', 'Parch','Fare' ],
    outputCol='features'
)
data_1 = assembler.transform(data_1)
data_1.select('features','Survived').show(5)

+--------------------+--------+
|            features|Survived|
+--------------------+--------+
|[3.0,0.0,22.0,1.0...|       0|
|[1.0,1.0,38.0,1.0...|       1|
|[3.0,1.0,26.0,0.0...|       1|
|[1.0,1.0,35.0,1.0...|       1|
|[3.0,0.0,35.0,0.0...|       0|
+--------------------+--------+
only showing top 5 rows



In [10]:
# 데이터 셋 분할
train_data, test_data = data_1.randomSplit([0.8,0.2], seed=42)
train_data.show(5), test_data.show(5)

+--------+------+------+----+-----+-----+------+----------+--------------------+
|Survived|Pclass|Gender| Age|SibSp|Parch|  Fare|SexIndexer|            features|
+--------+------+------+----+-----+-----+------+----------+--------------------+
|       0|     1|female| 2.0|    1|    2|151.55|       1.0|[1.0,1.0,2.0,1.0,...|
|       0|     1|female|25.0|    1|    2|151.55|       1.0|[1.0,1.0,25.0,1.0...|
|       0|     1|  male|18.0|    1|    0| 108.9|       0.0|[1.0,0.0,18.0,1.0...|
|       0|     1|  male|19.0|    1|    0|  53.1|       0.0|[1.0,0.0,19.0,1.0...|
|       0|     1|  male|19.0|    3|    2| 263.0|       0.0|[1.0,0.0,19.0,3.0...|
+--------+------+------+----+-----+-----+------+----------+--------------------+
only showing top 5 rows

+--------+------+------+-----------------+-----+-----+-------+----------+--------------------+
|Survived|Pclass|Gender|              Age|SibSp|Parch|   Fare|SexIndexer|            features|
+--------+------+------+-----------------+-----+-----+--

(None, None)

In [11]:
# 모델
lr = LogisticRegression(featuresCol = 'features', labelCol = 'Survived')
lr_model = lr.fit(train_data)
predic = lr_model.transform(test_data)
predic.select('features', 'Survived', 'prediction').show(5)

+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|[1.0,1.0,50.0,0.0...|       0|       1.0|
|[1.0,0.0,21.0,0.0...|       0|       1.0|
|[1.0,0.0,24.0,0.0...|       0|       1.0|
|[1.0,0.0,29.0,0.0...|       0|       1.0|
|[1.0,0.0,29.69911...|       0|       1.0|
+--------------------+--------+----------+
only showing top 5 rows



In [14]:
from pyspark.sql.functions import expr
comp = predic.withColumn('correct', expr('case when Survived = prediction then 1 else 0 end'))
comp.where('correct=0').count()

28

In [15]:
# 틀린 데이터만 필터링
predic.filter( col('Survived') != col('prediction')).show()

+--------+------+------+-----------------+-----+-----+--------+----------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass|Gender|              Age|SibSp|Parch|    Fare|SexIndexer|            features|       rawPrediction|         probability|prediction|
+--------+------+------+-----------------+-----+-----+--------+----------+--------------------+--------------------+--------------------+----------+
|       0|     1|female|             50.0|    0|    0| 28.7125|       1.0|[1.0,1.0,50.0,0.0...|[-1.9520233772457...|[0.12433289705143...|       1.0|
|       0|     1|  male|             21.0|    0|    1| 77.2875|       0.0|[1.0,0.0,21.0,0.0...|[-0.5063625084573...|[0.37604662481402...|       1.0|
|       0|     1|  male|             24.0|    0|    0|    79.2|       0.0|[1.0,0.0,24.0,0.0...|[-0.5000095386390...|[0.37753842718518...|       1.0|
|       0|     1|  male|             29.0|    0|    0|    30.0|       0.0|[1.0,0.0,29.0,0.0...|[-0.1615540

In [16]:
# 정확도 평가
comp.selectExpr('avg(correct) as accuracy').collect()[0]['accuracy']

0.8068965517241379

In [17]:
eval = BinaryClassificationEvaluator(labelCol='Survived',
                                     rawPredictionCol='rawPrediction',
                                     metricName='areaUnderROC')
auc = eval.evaluate(predic)
auc

0.8664129586260734

AUROC -> X축 FPR, y축 TPR 의 곡선 아래면적, 1에 가까울 수록 좋은 모델

In [18]:
spark.stop()

# libsvm 형식의 파일처리
- 텍스트 파일 형식, 희소데이터용 압축파일이다. 메모리, 처리속도 개선 - 머신러닝에서 활용되는 형식
- 레이블 행:값  행:값 -> 이런 형식의 데이터 파일

In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('classificationExample2').getOrCreate()
spark

In [20]:
data2 = spark.read.format('libsvm').load("learning_spark_data/sample_libsvm_data.txt")
data2.count()

100

In [22]:
data2.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
+-----+--------------------+
only showing top 5 rows



In [24]:
train_data, test_data = data2.randomSplit([0.7,0.3], seed = 12)
train_data.show(3), test_data.show(3) 

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[95,96,97,12...|
|  0.0|(692,[121,122,123...|
|  0.0|(692,[122,123,124...|
+-----+--------------------+
only showing top 3 rows

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[98,99,100,1...|
|  0.0|(692,[100,101,102...|
|  0.0|(692,[123,124,125...|
+-----+--------------------+
only showing top 3 rows



(None, None)

In [25]:
# 모델
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam = 0.8)
lrModel = lr.fit(train_data)
pred = lrModel.transform(test_data)
pred.show(5)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[98,99,100,1...|[0.95414441393043...|[0.72194788886091...|       0.0|
|  0.0|(692,[100,101,102...|[0.48568919283978...|[0.61909039185191...|       0.0|
|  0.0|(692,[123,124,125...|[1.00961478127974...|[0.73294475454830...|       0.0|
|  0.0|(692,[126,127,128...|[0.90293696106823...|[0.71155267489794...|       0.0|
|  0.0|(692,[126,127,128...|[0.75830339388632...|[0.68098526895593...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [26]:
eval = BinaryClassificationEvaluator(metricName='areaUnderROC')
auc = eval.evaluate(pred)
auc

1.0

In [28]:
spark.stop()

In [29]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('taxi-fare-prediction').getOrCreate()
spark

In [30]:
import os
cwd =os.getcwd()
trip_data_path = os.path.join(cwd, 'learning_spark_data', 'trips', '*.csv' )
trip_data_path

'/home/jovyan/work/learning_spark_data/trips/*.csv'

In [31]:
file_path = f"file:///{trip_data_path.replace(os.sep, '/')}"

In [33]:
trip_df = spark.read.csv(file_path, inferSchema=True, header=True)
trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [None]:
query = """
SELECT
    trip_distance,
    total_amount
FROM trips

WHERE total_amount < 5000
  AND total_amount > 0
  AND trip_distance > 0
  AND trip_distance < 500
  AND passenger_count < 4
  AND TO_DATE(tpep_pickup_datetime) >= "2021-01-01"
  AND TO_DATE(tpep_pickup_datetime) < "2021-08-01"
"""

In [34]:
spark.stop()