In [1]:
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=a525b041f89781a07d9e43cb565dd40738f0212eb71c11c7cfda5e3bb456f029
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove

In [2]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf


sc=pyspark.SparkContext()
# spark = SparkSession.builder.getOrCreate()

spark = SparkSession\
    .builder\
    .config("spark.driver.memory", "16g")\
    .config("spark.executor.memory", "4g")\
    .getOrCreate()


import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import random

In [3]:
from google.colab import drive
drive.mount('/content/drive')

train_data = spark.read.csv("/content/drive/MyDrive/빅데이터 처리 및 응용/팀플/train_data.csv", header=True, inferSchema=True)
test_data = spark.read.csv("/content/drive/MyDrive/빅데이터 처리 및 응용/팀플/test_data.csv", header=True, inferSchema=True)


Mounted at /content/drive


#SMOTE 알고리즘 구현

In [4]:
import random
import numpy as np
from pyspark.sql import Row
from sklearn import neighbors
from pyspark.ml.feature import VectorAssembler

In [5]:
dataInput = train_data

In [6]:
new_use_variable=dataInput.columns[1:]

new_use_variable

['Smoking_Index',
 'AlcoholDrinking_Index',
 'Stroke_Index',
 'DiffWalking_Index',
 'Sex_Index',
 'PhysicalActivity_Index',
 'Asthma_Index',
 'KidneyDisease_Index',
 'SkinCancer_Index',
 'AgeCategory_Index',
 'GenHealth_Index',
 'Diabetic_Index',
 'BMI_Log',
 'PhysicalHealth_Log',
 'MentalHealth_Log',
 'SleepTime_Log']

In [7]:
dataInput.show()

+------------------+-------------+---------------------+------------+-----------------+---------+----------------------+------------+-------------------+----------------+-----------------+---------------+--------------+------------------+------------------+------------------+------------------+
|HeartDisease_Index|Smoking_Index|AlcoholDrinking_Index|Stroke_Index|DiffWalking_Index|Sex_Index|PhysicalActivity_Index|Asthma_Index|KidneyDisease_Index|SkinCancer_Index|AgeCategory_Index|GenHealth_Index|Diabetic_Index|           BMI_Log|PhysicalHealth_Log|  MentalHealth_Log|     SleepTime_Log|
+------------------+-------------+---------------------+------------+-----------------+---------+----------------------+------------+-------------------+----------------+-----------------+---------------+--------------+------------------+------------------+------------------+------------------+
|               0.0|          0.0|                  0.0|         0.0|              0.0|      0.0|               

In [7]:
from sklearn import neighbors
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import lit


def vectorizerFunction(dataInput, TargetFieldName):
    if(dataInput.select(TargetFieldName).distinct().count() != 2):
        raise ValueError("Target field must have only 2 distinct classes")
    columnNames = list(dataInput.columns)
    columnNames.remove(TargetFieldName)
    dataInput = dataInput.select((','.join(columnNames)+','+TargetFieldName).split(','))
    assembler=VectorAssembler(inputCols = columnNames, outputCol = 'features')
    pos_vectorized = assembler.transform(dataInput)
    vectorized = pos_vectorized.select('features',TargetFieldName).withColumn('label',pos_vectorized[TargetFieldName]).drop(TargetFieldName)
    return vectorized


# sparse vector 뺼셈 가능하게 하는 함수
def subtract_vector_udf(a, b):
    a=Vectors.dense(a)
    b=Vectors.dense(b)
    a = a.toArray()
    b = b.toArray()
    array_ = a - b
    return [float(a) for a in array_]


# sparse vector 덧셈 가능하게 하는 함수
def add_vector_udf(a, b):
    a=Vectors.dense(a)
    b=Vectors.dense(b)
    a = a.toArray()
    b = b.toArray()
    b=random.random()*b
    array_ = a + b
    return [float(a) for a in array_]


def SmoteSampling(vectorized, k = 5, minorityClass = 1.0, majorityClass = 0.0, percentageOver = 200, percentageUnder = 100):
    if(percentageUnder > 100|percentageUnder < 10):
        raise ValueError("Percentage Under must be in range 10 - 100");
    if(percentageOver < 100):
        raise ValueError("Percentage Over must be in at least 100");

    dataInput_min = vectorized[vectorized['label'] == minorityClass]
    dataInput_maj = vectorized[vectorized['label'] == majorityClass]

    feature = dataInput_min.select('features')
    feature = feature.rdd
    feature = feature.map(lambda x: x[0])
    feature = feature.collect()
    feature = np.asarray(feature)
    print(feature)

    nbrs = neighbors.NearestNeighbors(n_neighbors=k, algorithm='auto').fit(feature)
    neighbours =  nbrs.kneighbors(feature)
    gap = neighbours[0]
    neighbours = neighbours[1]
    min_rdd = dataInput_min.drop('label').rdd
    pos_rddArray = min_rdd.map(lambda x : list(x))
    pos_ListArray = pos_rddArray.collect()
    min_Array = list(pos_ListArray)
    newRows = []
    nt = len(min_Array)
    nexs = int(percentageOver/100)

    for i in range(nt):
        for j in range(nexs):
            neigh = random.randint(1,k)
            difs = subtract_vector_udf(min_Array[neigh][0], min_Array[i][0])
            newRec = add_vector_udf(min_Array[i][0], difs)
            newRows.insert(0,tuple(newRec)) # 튜플로 수정한 상태 리스트 안달아주면 넘파이 어레이 넘어가는데 추후 데이터프레임 생성에 문제 있는 거 같아서 list로 만들어줌
    
    newData_rdd = sc.parallelize(newRows)

    newRow_schema=dataInput.select(new_use_variable).schema
    newRow = spark.createDataFrame(newData_rdd, schema = newRow_schema)
    newRow=newRow.withColumn("HeartDisease_Index", lit(1.0)).select(dataInput.columns)

    balanced_data = dataInput.union(newRow)
    return balanced_data

def Borderline_SmoteSampling(vectorized, k = 5, minorityClass = 1.0, majorityClass = 0.0, percentageOver = 200, percentageUnder = 100):
    if(percentageUnder > 100|percentageUnder < 10):
        raise ValueError("Percentage Under must be in range 10 - 100");
    if(percentageOver < 100):
        raise ValueError("Percentage Over must be in at least 100");

    dataInput_min = vectorized[vectorized['label'] == minorityClass]
    dataInput_maj = vectorized[vectorized['label'] == majorityClass]
    

    feature = dataInput_min.select('features')
    feature = feature.rdd
    feature = feature.map(lambda x: x[0])
    feature = feature.collect()
    feature = np.asarray(feature)

    danger_ratio = []

    all_data = vectorized.select('features')
    all_data = all_data.rdd
    all_data = all_data.map(lambda x : x[0])
    all_data = all_data.collect()
    all_data = np.asarray(all_data)

    nbrs = neighbors.NearestNeighbors(n_neighbors=k, algorithm='auto').fit(all_data)
    neighbours =  nbrs.kneighbors(feature)
    gap = neighbours[0]
    neighbours = neighbours[1]

    for data in neighbours:
        danger = 0
        for d in data:
            if d >= 21797:
                danger += 1
            print(float(danger / (k)))
            danger_ratio.append(float(danger / (k)))

    min_rdd = dataInput_min.drop('label').rdd
    pos_rddArray = min_rdd.map(lambda x : list(x))
    pos_ListArray = pos_rddArray.collect()
    min_Array = list(pos_ListArray)
    newRows = []
    nt = len(min_Array)
    nexs = int(percentageOver/100)

    for i in range(nt):
        if danger_ratio[i] >= 0.4:
            for j in range(nexs):
                neigh = random.randint(1,k)
                difs = subtract_vector_udf(min_Array[neigh][0], min_Array[i][0])
                newRec = add_vector_udf(min_Array[i][0], difs)
                newRows.insert(0,tuple(newRec)) # 튜플로 수정한 상태 리스트 안달아주면 넘파이 어레이 넘어가는데 추후 데이터프레임 생성에 문제 있는 거 같아서 list로 만들어줌
    
    newData_rdd = sc.parallelize(newRows)

    newRow_schema=dataInput.select(new_use_variable).schema
    newRow = spark.createDataFrame(newData_rdd, schema = newRow_schema)
    newRow=newRow.withColumn("HeartDisease_Index", lit(1.0)).select(dataInput.columns)

    balanced_data = dataInput.union(newRow)
    return balanced_data

def ADASYN_Sampling(vectorized, k = 5, G = 10, minorityClass = 1.0, majorityClass = 0.0, percentageOver = 200, percentageUnder = 100):
    if(percentageUnder > 100|percentageUnder < 10):
        raise ValueError("Percentage Under must be in range 10 - 100");
    if(percentageOver < 100):
        raise ValueError("Percentage Over must be in at least 100");
    G = 100
    dataInput_min = vectorized[vectorized['label'] == minorityClass]
    dataInput_maj = vectorized[vectorized['label'] == majorityClass]
    

    feature = dataInput_min.select('features')
    feature = feature.rdd
    feature = feature.map(lambda x: x[0])
    feature = feature.collect()
    feature = np.asarray(feature)

    adasyn_ratio = []

    all_data = vectorized.select('features')
    all_data = all_data.rdd
    all_data = all_data.map(lambda x : x[0])
    all_data = all_data.collect()
    all_data = np.asarray(all_data)

    nbrs = neighbors.NearestNeighbors(n_neighbors=k, algorithm='auto').fit(all_data)
    neighbours =  nbrs.kneighbors(feature)
    gap = neighbours[0]
    neighbours = neighbours[1]

    for data in neighbours:
        danger = 0
        for d in data:
            if d >= 21797:
                danger += 1
            print(float(danger / (k)))
            adasyn_ratio.append(float(danger / (k)))

    adasyn_normalize_ratio = [ratio / sum(adasyn_ratio) for ratio in adasyn_ratio]
    min_rdd = dataInput_min.drop('label').rdd
    pos_rddArray = min_rdd.map(lambda x : list(x))
    pos_ListArray = pos_rddArray.collect()
    min_Array = list(pos_ListArray)
    newRows = []
    nt = len(min_Array)
    nexs = int(percentageOver/100)

    for i in range(nt):
        iteration = int(G * adasyn_ratio[i]) + 1
        for j in range(iteration):
            neigh = random.randint(1,k)
            difs = subtract_vector_udf(min_Array[neigh][0], min_Array[i][0])
            newRec = add_vector_udf(min_Array[i][0], difs)
            newRows.insert(0,tuple(newRec)) # 튜플로 수정한 상태 리스트 안달아주면 넘파이 어레이 넘어가는데 추후 데이터프레임 생성에 문제 있는 거 같아서 list로 만들어줌
    
    newData_rdd = sc.parallelize(newRows)

    newRow_schema=dataInput.select(new_use_variable).schema
    newRow = spark.createDataFrame(newData_rdd, schema = newRow_schema)
    newRow=newRow.withColumn("HeartDisease_Index", lit(1.0)).select(dataInput.columns)

    balanced_data = dataInput.union(newRow)
    return balanced_data

In [None]:
bordeline_smote_train_data = Borderline_SmoteSampling(vectorizerFunction(dataInput, 'HeartDisease_Index'), k = 5, percentageOver = 910)

In [51]:
bordeline_smote_train_data.toPandas().to_csv('/content/drive/MyDrive/빅데이터 처리 및 응용/팀플/train_data_borderline.csv', index = False)

In [18]:
from sklearn import neighbors
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import lit


def vectorizerFunction(dataInput, TargetFieldName):
    if(dataInput.select(TargetFieldName).distinct().count() != 2):
        raise ValueError("Target field must have only 2 distinct classes")
    columnNames = list(dataInput.columns)
    columnNames.remove(TargetFieldName)
    dataInput = dataInput.select((','.join(columnNames)+','+TargetFieldName).split(','))
    assembler=VectorAssembler(inputCols = columnNames, outputCol = 'features')
    pos_vectorized = assembler.transform(dataInput)
    vectorized = pos_vectorized.select('features',TargetFieldName).withColumn('label',pos_vectorized[TargetFieldName]).drop(TargetFieldName)
    return vectorized


# sparse vector 뺼셈 가능하게 하는 함수
def subtract_vector_udf(a, b):
    a=Vectors.dense(a)
    b=Vectors.dense(b)
    a = a.toArray()
    b = b.toArray()
    array_ = a - b
    return [float(a) for a in array_]


# sparse vector 덧셈 가능하게 하는 함수
def add_vector_udf(a, b):
    a=Vectors.dense(a)
    b=Vectors.dense(b)
    a = a.toArray()
    b = b.toArray()
    b=random.random()*b
    array_ = a + b
    return [float(a) for a in array_]


def ADASYN_Sampling(vectorized, k = 5, G = 10, minorityClass = 1.0, majorityClass = 0.0, percentageOver = 200, percentageUnder = 100):
    if(percentageUnder > 100|percentageUnder < 10):
        raise ValueError("Percentage Under must be in range 10 - 100");
    if(percentageOver < 100):
        raise ValueError("Percentage Over must be in at least 100");
    G = 100
    dataInput_min = vectorized[vectorized['label'] == minorityClass]
    dataInput_maj = vectorized[vectorized['label'] == majorityClass]
    

    feature = dataInput_min.select('features')
    feature = feature.rdd
    feature = feature.map(lambda x: x[0])
    feature = feature.collect()
    feature = np.asarray(feature)

    adasyn_ratio = []

    all_data = vectorized.select('features')
    all_data = all_data.rdd
    all_data = all_data.map(lambda x : x[0])
    all_data = all_data.collect()
    all_data = np.asarray(all_data)

    nbrs = neighbors.NearestNeighbors(n_neighbors=k, algorithm='auto').fit(all_data)
    neighbours =  nbrs.kneighbors(feature)
    gap = neighbours[0]
    neighbours = neighbours[1]

    for data in neighbours:
        danger = 0
        for d in data:
            if d >= 21797:
                danger += 1
            print(float(danger / (k)))
            adasyn_ratio.append(float(danger / (k)))

    adasyn_normalize_ratio = [ratio / sum(adasyn_ratio) for ratio in adasyn_ratio]
    min_rdd = dataInput_min.drop('label').rdd
    pos_rddArray = min_rdd.map(lambda x : list(x))
    pos_ListArray = pos_rddArray.collect()
    min_Array = list(pos_ListArray)
    newRows = []
    nt = len(min_Array)
    nexs = int(percentageOver/100)

    for i in range(nt):
        iteration = int(10 * adasyn_ratio[i])
        print(iteration)
        for j in range(iteration):
            neigh = random.randint(1,k)
            difs = subtract_vector_udf(min_Array[neigh][0], min_Array[i][0])
            newRec = add_vector_udf(min_Array[i][0], difs)
            newRows.insert(0,tuple(newRec)) # 튜플로 수정한 상태 리스트 안달아주면 넘파이 어레이 넘어가는데 추후 데이터프레임 생성에 문제 있는 거 같아서 list로 만들어줌
    
    newData_rdd = sc.parallelize(newRows)

    newRow_schema=dataInput.select(new_use_variable).schema
    newRow = spark.createDataFrame(newData_rdd, schema = newRow_schema)
    newRow=newRow.withColumn("HeartDisease_Index", lit(1.0)).select(dataInput.columns)

    balanced_data = dataInput.union(newRow)
    return balanced_data

In [19]:
adasyn_train_data = ADASYN_Sampling(vectorizerFunction(dataInput, 'HeartDisease_Index'), k = 5, percentageOver = 910)

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8
10
2
4
6
8


In [20]:
adasyn_train_data.count()

365831

In [21]:
adasyn_train_data.toPandas().to_csv('/content/drive/MyDrive/빅데이터 처리 및 응용/팀플/adasyn_train_data.csv', index = False)

### 반올림 적용 고민해보기

In [None]:
from pyspark.sql.functions import round, col, bround
 
#BMI~부턴 반올림 필요없고 나이는 1의자리 나머진 소수 첫번째자리에서 반올림

smote_train_data2=smote_train_data

smote_train_data2=smote_train_data2.withColumn("Smoking_Index", round(col('Smoking_Index')))

smote_train_data2=smote_train_data2.withColumn('AlcoholDrinking_Index', round(col('AlcoholDrinking_Index')))

smote_train_data2=smote_train_data2.withColumn('Stroke_Index', round(col( 'Stroke_Index')))

smote_train_data2=smote_train_data2.withColumn( 'DiffWalking_Index', round(col( 'DiffWalking_Index')))

smote_train_data2=smote_train_data2.withColumn('Sex_Index', round(col('Sex_Index')))

smote_train_data2=smote_train_data2.withColumn('PhysicalActivity_Index', round(col('PhysicalActivity_Index')))

smote_train_data2=smote_train_data2.withColumn('Asthma_Index', round(col('Asthma_Index')))

smote_train_data2=smote_train_data2.withColumn('KidneyDisease_Index', round(col('KidneyDisease_Index')))

smote_train_data2=smote_train_data2.withColumn('SkinCancer_Index', round(col('SkinCancer_Index')))

smote_train_data2=smote_train_data2.withColumn('AgeCategory_Index', round(col('AgeCategory_Index'), -1))

smote_train_data2=smote_train_data2.withColumn('AgeCategory_Index', bround(col('AgeCategory_Index')))


smote_train_data2=smote_train_data2.withColumn('GenHealth_Index', round(col('GenHealth_Index')))

smote_train_data2=smote_train_data2.withColumn('Diabetic_Index', round(col('Diabetic_Index')))



In [None]:
smote_train_data2.show()

+------------------+-------------+---------------------+------------+-----------------+---------+----------------------+------------+-------------------+----------------+-----------------+---------------+--------------+------------------+------------------+------------------+------------------+
|HeartDisease_Index|Smoking_Index|AlcoholDrinking_Index|Stroke_Index|DiffWalking_Index|Sex_Index|PhysicalActivity_Index|Asthma_Index|KidneyDisease_Index|SkinCancer_Index|AgeCategory_Index|GenHealth_Index|Diabetic_Index|           BMI_Log|PhysicalHealth_Log|  MentalHealth_Log|     SleepTime_Log|
+------------------+-------------+---------------------+------------+-----------------+---------+----------------------+------------+-------------------+----------------+-----------------+---------------+--------------+------------------+------------------+------------------+------------------+
|               0.0|          0.0|                  0.0|         0.0|              0.0|      0.0|               

In [None]:
smote_train_data2.count()

437488

In [None]:
round_smote_train_data=smote_train_data2

### 증강후 label 비율 확인

In [None]:
# 원본 데이터 비율
train_major_df = train_data.filter(train_data.HeartDisease_Index == 0.0)
train_minor_df = train_data.filter(train_data.HeartDisease_Index == 1.0)

print("원본 train 데이터 label 0인 경우 : {}".format(train_major_df.count()))
print("원본 train 데이터 label 1인 경우 : {}".format(train_minor_df.count()))

원본 train 데이터 label 0인 경우 : 219518
원본 train 데이터 label 1인 경우 : 21797


In [None]:
# smote 적용 후 데이터 비율
smote_train_major_df = smote_train_data.filter(smote_train_data.HeartDisease_Index == 0.0)
smote_train_minor_df = smote_train_data.filter(smote_train_data.HeartDisease_Index == 1.0)

print("smote 진행 후 train 데이터 label 0인 경우 : {}".format(smote_train_major_df.count()))
print("smote 진행 후 train 데이터 label 1인 경우 : {}".format(smote_train_minor_df.count()))

smote 진행 후 train 데이터 label 0인 경우 : 219518
smote 진행 후 train 데이터 label 1인 경우 : 217970


In [None]:
# 반올림 smote 적용 후 데이터 비율

round_smote_train_major_df = round_smote_train_data.filter(round_smote_train_data.HeartDisease_Index == 0.0)
round_smote_train_minor_df = round_smote_train_data.filter(round_smote_train_data.HeartDisease_Index == 1.0)

print("smote 진행 후 train 데이터 label 0인 경우 : {}".format(round_smote_train_major_df.count()))
print("smote 진행 후 train 데이터 label 1인 경우 : {}".format(round_smote_train_minor_df.count()))

smote 진행 후 train 데이터 label 0인 경우 : 219518
smote 진행 후 train 데이터 label 1인 경우 : 217970


# SMOTE로 데이터 증강한 결과 내보내기

In [None]:
smote_train_data.toPandas().to_csv('/content/drive/MyDrive/빅데이터 처리 및 응용/팀플/smote_train_data.csv', index=False, encoding='CP949')

#SMOTE 반올림 내보낸거

In [None]:
round_smote_train_data.toPandas().to_csv('/content/drive/MyDrive/빅데이터 처리 및 응용/팀플/round_smote_train_data.csv', index=False, encoding='CP949')