# Setul de date folosit:
---
Setul de date folosit provine de la programul SEER din 2017 si contine date despre paciente suferind de cancer mamar. Acesta contine 4024 de inregistrari si 16 coloane cu informatii precum varsta, rasa, dimensiunea tumorii, numarul de luni supravietuite, etc. 

Au fost excluse din acest set datele pacientilor cu dimensiune necunoscuta a tumorii sau cu durata de supravietuire mai mica de o luna.

https://www.kaggle.com/datasets/reihanenamdari/breast-cancer


# Obiective:

---
1. Avand la dispozitie toate datele unei paciente dorim sa realizam predictii asupra statusului acesteia. 

2. Presupunem situatia in care o pacienta a trecut prin etapele unei investigatii legate de cancerul la san si a facut mai multe analize in acest sens. Pornind de la datele obtinute in urma acestora, dorim să obtinem predicții pentru dimensiunea tumorii, cu scopul de a oferi atat doctorului cat si pacientei un diagnostic  orientativ pana la realizarea unei investigatii mai amanuntite.

In [978]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Importarea librariilor si a functiilor necesare

In [979]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import date_format
from pyspark.sql.functions import when
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.functions import col

from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)
from pyspark.ml import Pipeline
from pyspark.ml.feature import RobustScaler
from pyspark.ml.feature import IndexToString
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Crearea unei sesiuni Spark

In [980]:
spark = SparkSession.builder.appName('proiect_bigdata').getOrCreate()

Citirea datelor din fisierul CSV si afisarea schemei datelor

In [981]:
data_folder = "/content/drive/MyDrive/Proiecte FMI/BIG Data/"
data = spark.read.csv(data_folder + 'Breast_Cancer.csv',inferSchema=True,header=True)

data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Race: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- T Stage : timestamp (nullable = true)
 |-- N Stage: string (nullable = true)
 |-- 6th Stage: string (nullable = true)
 |-- differentiate: string (nullable = true)
 |-- Grade: string (nullable = true)
 |-- A Stage: string (nullable = true)
 |-- Tumor Size: integer (nullable = true)
 |-- Estrogen Status: string (nullable = true)
 |-- Progesterone Status: string (nullable = true)
 |-- Regional Node Examined: integer (nullable = true)
 |-- Reginol Node Positive: integer (nullable = true)
 |-- Survival Months: integer (nullable = true)
 |-- Status: string (nullable = true)



Se poate observa ca pentru coloana "T Stage " datorita formatului fisierului la citirea acestuia informatiile au fost converite in timestamp-uri, asa ca le vom converti inapoi la formatul corect inainte de a vizualiza datele.

In [982]:
# Preluam ora din timestamp, aceasta reprezinta de fapt cifra stadiului T
# De asemenea redenumim coloana T Stage pentru a elimina spatiul de la final
data = data.withColumn("T Stage ", date_format('T Stage ', "hh")).withColumnRenamed("T Stage ","TStage")

# In functie de ora extrasa vom converti datele inapoi in formatului original din fisierul CSV
data = data.withColumn("TStage", when(data.TStage == "01","T1").when(data.TStage == "02","T2").when(data.TStage == "03","T3").when(data.TStage == "04","T4"))   

Acum putem vizualiza datele. Vom afisa cateva linii din DataFrame si statistici despre setul de date.

In [983]:
data.show()

data.describe().show()

+---+-----+--------------+------+-------+---------+--------------------+-----+--------+----------+---------------+-------------------+----------------------+---------------------+---------------+------+
|Age| Race|Marital Status|TStage|N Stage|6th Stage|       differentiate|Grade| A Stage|Tumor Size|Estrogen Status|Progesterone Status|Regional Node Examined|Reginol Node Positive|Survival Months|Status|
+---+-----+--------------+------+-------+---------+--------------------+-----+--------+----------+---------------+-------------------+----------------------+---------------------+---------------+------+
| 68|White|       Married|    T1|     N1|      IIA|Poorly differenti...|    3|Regional|         4|       Positive|           Positive|                    24|                    1|             60| Alive|
| 50|White|       Married|    T2|     N2|     IIIA|Moderately differ...|    2|Regional|        35|       Positive|           Positive|                    14|                    5|         

In continuare, pregatirea datelor difera pentru fiecare dintre probleme. Din acest motiv vom crea o copii ale DataFrame-ului pe care le vom prelucra in functie de nevoi.

In [984]:
data_task1 = data.select("*")

data_task2 = data.select("*")

# **Problema 1**



Avand la dispozitie toate datele persoanei dorim sa facem predictii asupra coloanei "Status" pentru aceasta. Aceasta este o problema de clasificare si ne vom folosi de clasificatorul RandomForest.

Deoarece dorim sa realizam predictii asupra coloanei "Status" vom vizualiza pentru inceput cum sunt impartite datele in aceasta.

In [985]:
 data_task1.groupBy("Status").count().show()

+------+-----+
|Status|count|
+------+-----+
|  Dead|  616|
| Alive| 3408|
+------+-----+



Observam ca in setul de date sunt mult mai multe persoane cu status "Alive" decat "Dead" (3408 vs 616) asa ca vom filtra datele pentru a avea o distributie mai balansata a acestora.

In acest sens vom ordona datele in functie de coloana Status si vom adauga o coloana "id" pe care o vom folosi pentru a elimina date de la persoane cu Status "Alive".

In [986]:
data_task1 = data_task1.select("*").orderBy("Status").withColumn("id", monotonically_increasing_id())

data_task1 = data_task1.filter(data_task1.id > 2792)

data_task1 = data_task1.drop("id")

In [987]:
# Selectam coloanele si eliminam randurile ce contin valori null, daca exista
task1_final_data = data_task1.select(['*']).na.drop()

task1_final_data.columns

['Age',
 'Race',
 'Marital Status',
 'TStage',
 'N Stage',
 '6th Stage',
 'differentiate',
 'Grade',
 'A Stage',
 'Tumor Size',
 'Estrogen Status',
 'Progesterone Status',
 'Regional Node Examined',
 'Reginol Node Positive',
 'Survival Months',
 'Status']

Pentru a trata coloanele cu informatie de tip String vom folosi StringIndexer si OneHotEncoder pentru a le transforma intr-un format categorial cu care putem lucra in continuare.

In [988]:
race_indexer = StringIndexer(inputCol='Race',outputCol='RaceIndex')
race_encoder = OneHotEncoder(inputCol='RaceIndex',outputCol='RaceVec')

mstatus_indexer = StringIndexer(inputCol='Marital Status',outputCol='MStatusIndex')
mstatus_encoder = OneHotEncoder(inputCol='MStatusIndex',outputCol='MStatusVec')

tstage_indexer = StringIndexer(inputCol='TStage',outputCol='TStageIndex')
tstage_encoder = OneHotEncoder(inputCol='TStageIndex',outputCol='TStageVec')

nstage_indexer = StringIndexer(inputCol='N Stage',outputCol='NStageIndex')
nstage_encoder = OneHotEncoder(inputCol='NStageIndex',outputCol='NStageVec')

thstage_indexer = StringIndexer(inputCol='6th Stage',outputCol='6thStageIndex')
thstage_encoder = OneHotEncoder(inputCol='6thStageIndex',outputCol='6thStageVec')

differentiate_indexer = StringIndexer(inputCol='differentiate',outputCol='differentiateIndex')
differentiate_encoder = OneHotEncoder(inputCol='differentiateIndex',outputCol='differentiateVec')

grade_indexer = StringIndexer(inputCol='Grade',outputCol='GradeIndex')
grade_encoder = OneHotEncoder(inputCol='GradeIndex',outputCol='GradeVec')

astage_indexer = StringIndexer(inputCol='A Stage',outputCol='AStageIndex')
astage_encoder = OneHotEncoder(inputCol='AStageIndex',outputCol='AStageVec')

estrogen_indexer = StringIndexer(inputCol='Estrogen Status',outputCol='EstrogenIndex')
estrogen_encoder = OneHotEncoder(inputCol='EstrogenIndex',outputCol='EstrogenVec')

progesterone_indexer = StringIndexer(inputCol='Progesterone Status',outputCol='ProgesteroneIndex')
progesterone_encoder = OneHotEncoder(inputCol='ProgesteroneIndex',outputCol='ProgesteroneVec')

status_indexer = StringIndexer(inputCol='Status',outputCol='StatusIndex')
status_encoder = OneHotEncoder(inputCol='StatusIndex',outputCol='StatusVec')

Pentru a asambla coloanele sub forma unui singur vector coloana vom folosi VectorAssembler. Vedem mai jos assembler-ele folosite pentru ambele probleme.

In [989]:
# Combinam coloanele coloanele in vectorul "features"
assembler_rf = VectorAssembler(inputCols=['Age', 'MStatusVec', 'RaceVec', 'TStageVec',  'NStageVec',  '6thStageVec',
                                                'differentiateVec',  'GradeVec',  'AStageVec',  'Tumor Size', 'EstrogenVec',
                                                'ProgesteroneVec', 'Regional Node Examined', 'Reginol Node Positive', 'Survival Months'], outputCol = "features")

assembler_linear = VectorAssembler(inputCols=['Age', 'MStatusVec', 'RaceVec', 'NStageVec',  '6thStageVec',
                                              'differentiateVec',  'GradeVec',  'AStageVec', 'EstrogenVec','ProgesteroneVec',
                                              'Regional Node Examined', 'Reginol Node Positive','Survival Months',], outputCol = "features")

# **Clasificatorul RandomForest**

Putem acum sa inlantuim etapele intr-un pipeline si sa antrenam modelul.

In [990]:
random_forest = RandomForestClassifier(labelCol="StatusIndex", featuresCol="features", numTrees=500)

In [991]:
#Inlantuirea etapelor in cadrul unui Pipeline
pipeline = Pipeline(stages = [race_indexer,race_encoder,mstatus_indexer,mstatus_encoder,tstage_indexer,tstage_encoder,nstage_indexer,nstage_encoder,thstage_indexer,thstage_encoder,differentiate_indexer,differentiate_encoder, grade_indexer,grade_encoder,astage_indexer,astage_encoder,estrogen_indexer,estrogen_encoder,progesterone_indexer,progesterone_encoder,status_indexer,status_encoder,assembler_rf,random_forest])

In [992]:
# Impartim datele in seturi de antrenare si test
train_bc_data, test_bc_data = task1_final_data.randomSplit([0.7,.3])

In [993]:
#Antrenare pipeline
fit_model = pipeline.fit(train_bc_data)

In [994]:
# Aplicam modelul pe datele de test
results = fit_model.transform(test_bc_data)

Afisam predictiile si probabilitatile pentru acestea.

In [995]:
results.select('StatusIndex','prediction', 'probability').show(20,truncate = False)

+-----------+----------+----------------------------------------+
|StatusIndex|prediction|probability                             |
+-----------+----------+----------------------------------------+
|1.0        |1.0       |[0.11838214809893402,0.881617851901066] |
|1.0        |0.0       |[0.7384019559627187,0.26159804403728126]|
|0.0        |0.0       |[0.6203148223811359,0.37968517761886417]|
|1.0        |1.0       |[0.18460140602302313,0.8153985939769769]|
|1.0        |1.0       |[0.12866571895507775,0.8713342810449223]|
|1.0        |1.0       |[0.1421930035104205,0.8578069964895795] |
|1.0        |1.0       |[0.06849299663387048,0.9315070033661296]|
|1.0        |0.0       |[0.5888784604945302,0.41112153950546987]|
|0.0        |0.0       |[0.6591439097475112,0.34085609025248875]|
|1.0        |0.0       |[0.6164489626172137,0.38355103738278623]|
|0.0        |1.0       |[0.3772467077103851,0.622753292289615]  |
|1.0        |0.0       |[0.674480467916954,0.3255195320830459]  |
|0.0      

Evaluam mai jos modelul:

In [996]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="StatusIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(results)
print("Acuratete = %g" % (accuracy))

Acuratete = 0.78125


# **Problema 2**

Dorim să construim un model de regresie care sa realizeze predicții pentru dimensiunea tumorii folosind informatiile pacientului obtinute de la diferite analize. Deoarece dimensiunea tumorilor la persoanele in ultimul stadiu poate varia foarte mult, ne vom limita la a face predictii pentru persoane in primele 3 stadii.

Deoarece informatiile pe care dorim sa le obtinem vizeaza doar persoanele in viata vom filtra datele pentru a obtine rezultate mai bune pentru acestea.

In [997]:
data_task2 = data_task2.filter(data_task2.Status == "Alive")

Pentru aceasta problema ne vom folosi de toate coloanele mai putin de "T Stage" deoarece aceasta depinde de marimea tumorii si in acest caz dorim ca pe viitor sa facem predictii fara sa avem nevoie de aceasta informatie.

In [998]:
# Selectam coloanele si eliminam randurile ce contin valori null, daca exista
task2_final_data = data_task2.select(['*']).na.drop()
                              
task2_final_data.columns

['Age',
 'Race',
 'Marital Status',
 'TStage',
 'N Stage',
 '6th Stage',
 'differentiate',
 'Grade',
 'A Stage',
 'Tumor Size',
 'Estrogen Status',
 'Progesterone Status',
 'Regional Node Examined',
 'Reginol Node Positive',
 'Survival Months',
 'Status']

Filtram datele pentru a pastra doar datele persoanelor din primele 3 stadii si afisam numarul acestora.

In [999]:
task2_final_data = task2_final_data.filter(col("TStage") <= "T3" )

task2_final_data.groupBy("TStage").count().orderBy("TStage").show()

+------+-----+
|TStage|count|
+------+-----+
|    T1| 1446|
|    T2| 1483|
|    T3|  417|
+------+-----+



Inlantuim intr-un pipeline etapele de StringIndexer si OneHotEncoder

In [1000]:
pipeline_linear = Pipeline(stages = [race_indexer,race_encoder,mstatus_indexer,mstatus_encoder,nstage_indexer,nstage_encoder,
                                     thstage_indexer,thstage_encoder,differentiate_indexer,differentiate_encoder, grade_indexer,
                                     grade_encoder,astage_indexer,astage_encoder,estrogen_indexer,estrogen_encoder,progesterone_indexer,
                                     progesterone_encoder])

Aplicam pe datele noastre

In [1001]:
output = pipeline_linear.fit(task2_final_data)

Scalam datele folosind RobustScaler

In [1020]:
data_transformed = output.transform(task2_final_data)
data_transformed = assembler_linear.transform(data_transformed)

scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures")

scalerModel = scaler.fit(data_transformed)

data_transformed = scalerModel.transform(data_transformed)

final_data = data_transformed.select("scaledFeatures", "Tumor Size")

Impartim datele in seturi de antrenare si test

In [1021]:
train_bc_data2, test_bc_data2 = final_data.randomSplit([0.7,.3])

# Afisam sumarul setului de antrenare
train_bc_data2.describe().show()

+-------+------------------+
|summary|        Tumor Size|
+-------+------------------+
|  count|              2404|
|   mean|28.880199667221298|
| stddev|19.798690330084543|
|    min|                 1|
|    max|               140|
+-------+------------------+



Putem trece acum la antrenarea modelului

In [1022]:
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='Tumor Size')

lrModel = lr.fit(train_bc_data2)

Afisam coeficientii si interceptia pentru model

In [1023]:
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [-1.127513634826283,-0.33407437265621537,0.0,0.0,0.0,0.0,0.0,32.30706102910916,0.0,-59.8654226773688,-43.331796098970166,-5.875507010570251,0.09981864793812427,0.0,0.0,0.09981864793812427,0.0,0.0,0.0,0.0,0.0,-0.24556125466916928,-0.3860420217465051,0.13044694793098682] Intercept: 46.335964521951446


Aplicam modelul pe datele de test si afisam reziduurile

In [1024]:
test_results = lrModel.evaluate(test_bc_data2)

test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
| -0.7917346438351238|
| -0.3812551605871164|
|  -2.164201221448401|
|  -7.462938232918518|
|  -8.655430068675194|
|-0.41983236645405597|
|  0.7955235142509096|
|  -6.008027482013755|
|-0.26999954642352364|
|   2.837332558475545|
|   4.067062358507364|
| 0.34974569620896645|
|-0.17620522974931063|
|  2.7887476999387246|
|-0.07484736492801503|
|   -8.19267041467213|
|     4.9339514993984|
|  1.8741840251097166|
| -3.9697545512592214|
|  -2.932883266310416|
+--------------------+
only showing top 20 rows



Aplicam modelul pe datele de test neetichetate

In [1025]:
unlabeled_data=test_bc_data2.select("scaledFeatures")

predictions = lrModel.transform(unlabeled_data)

predictions.show()

+--------------------+------------------+
|      scaledFeatures|        prediction|
+--------------------+------------------+
|(24,[0,1,5,7,9,12...|15.791734643835124|
|(24,[0,1,5,7,9,12...|15.381255160587116|
|(24,[0,1,5,7,9,12...|15.164201221448401|
|(24,[0,1,5,7,9,12...|15.462938232918518|
|(24,[0,1,5,7,9,12...|15.655430068675194|
|(24,[0,1,5,7,9,12...|15.419832366454056|
|(24,[0,1,5,7,9,12...| 15.20447648574909|
|(24,[0,1,5,7,9,12...|15.008027482013755|
|(24,[0,1,5,7,9,12...|15.269999546423524|
|(24,[0,1,5,7,9,12...|15.162667441524455|
|(24,[0,1,5,7,9,12...|14.932937641492636|
|(24,[0,1,5,7,9,12...|14.650254303791034|
|(24,[0,1,5,7,9,12...| 15.17620522974931|
|(24,[0,1,5,7,9,12...|15.211252300061275|
|(24,[0,1,5,7,9,12...|15.074847364928015|
|(24,[0,1,5,7,9,12...| 15.19267041467213|
|(24,[0,1,5,7,9,12...|  15.0660485006016|
|(24,[0,1,5,7,9,12...|15.125815974890283|
|(24,[0,1,5,7,9,12...|14.969754551259221|
|(24,[0,1,5,7,9,12...|14.932883266310416|
+--------------------+------------

In final vom afisa metricele obtinute pentru model:

In [1026]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))
print("MAE: {}".format(test_results.meanAbsoluteError))
print("R2: {}".format(test_results.r2))

RMSE: 12.7580866265093
MSE: 162.76877436951546
MAE: 8.39105653093462
R2: 0.572427732318617
