In [1]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Connecting to archive.ubuntu.com (91.189.91.82)] [1 InRelease 2,588 B/110 kB 2%] [Connected to c0% [Connecting to archive.ubuntu.com (91.189.91.82)] [Connected to cloud.r-project.org (18.239.18.39                                                                                                    Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
                                                                                                    0% [Waiting for headers] [Waiting for headers] [Waiting for headers]                                                                    Hit:3 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers]                                                                    Get:4 https://developer.download.nvidia.com/comp

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
# Create a SparkContext
conf = SparkConf().setMaster("local").setAppName("KMeansPatientCharacteristics") # treat every core of your desktop as an executor
SpContext = SparkContext(conf = conf)

In [4]:
SpSession = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").getOrCreate()

In [5]:
#Load the CSV file into a RDD
patientData = SpContext.textFile("/content/drive/MyDrive/hca_datasets/11-patient-characteristics-cluster.csv",2)
patientData.cache()
patientData.collect()

['Excluded: Patients whose use of alcohol or drugs is sufficient to impair compliance with protocol requirements.',
 'Current substance or alcohol use disorder as determined by the SCID or by positive drug toxicology results',
 'Subjects who consume >14 alcoholic drinks per week.',
 'Excessive consumption of xanthine-based beverages',
 'History of drug abuse or use of illegal drugs: use of soft drugs (marijuana  pot) within 3 months of the screening visit or hard drugs (cocaine  PCP  crack)within 1 year of the screening visit',
 'Non-smokers.',
 'Smokers may participate  but they are limited to 10 cigarettes per day while at the clinic and must follow clinic smoking rules',
 'Regular use of alcohol within six months prior to the screening visit (more than fourteen units of alcohol per week [ 1 Unit = 150mL of wine  360 mL of beer  or 45mL of 40% alcohol])',
 'Use of soft drugs ( such as marijuana) within 3 months prior to the screening visit or hard drugs (such as cocaine  phencyclidin

In [6]:
def TransformToVector(inputStr):
    attList=inputStr.split(",")
    return [attList[0]] # spam label, list of sentence as attributes

patientXformed=patientData.map(TransformToVector)

patientDf= SpSession.createDataFrame(patientXformed, ["patient_characteristics"])
patientDf.cache()
patientDf.select("patient_characteristics").show()

+-----------------------+
|patient_characteristics|
+-----------------------+
|   Excluded: Patient...|
|   Current substance...|
|   Subjects who cons...|
|   Excessive consump...|
|   History of drug a...|
|           Non-smokers.|
|   Smokers may parti...|
|   Regular use of al...|
|   Use of soft drugs...|
|   Subject has been ...|
|   subjects who have...|
|   Current use of to...|
|   Exclusion Criteri...|
|   alcoholism  drug ...|
|   Be current smoker...|
|   History of signif...|
|   History of signif...|
|             Inebriated|
|          or abstinence|
|   Patients who are ...|
+-----------------------+
only showing top 20 rows



In [7]:
(trainingData, testData) = patientDf.randomSplit([0.9, 0.1])
trainingData.count()
testData.count()
testData.collect()

[Row(patient_characteristics='(Note: Prior history of deep vein thrombosis will not exclude subjects from participating in this study.)'),
 Row(patient_characteristics="(The lipid profiles at Visit 3 (baseline) and all subsequent visits were kept 'blinded' until data analysis)"),
 Row(patient_characteristics='(Was written - Ganciclovir or foscarnet for non-CMV herpes infections within 6 months prior to study entry.)'),
 Row(patient_characteristics='18 years of age and older'),
 Row(patient_characteristics='2 positive Hepatitis B surface antigen results 6 months apart'),
 Row(patient_characteristics='20 000/mL'),
 Row(patient_characteristics='50 copies/ml for the last 6 months on HAART therapy.'),
 Row(patient_characteristics='8.0 g/dL'),
 Row(patient_characteristics='A CD4+ count between 200 and 500 cells/mm3.'),
 Row(patient_characteristics='A current diagnosis of severe and unstable cardiovascular disease.'),
 Row(patient_characteristics='A disability that may prevent the subject fro

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.feature import IDF
from pyspark.ml.clustering import KMeans

In [9]:
#break sentences into words
tokenizer = Tokenizer(inputCol="patient_characteristics", outputCol="words")

In [10]:
#calculate term frequency (TF)
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),  outputCol="tempfeatures")

In [11]:
#calculate TF-IDF
tfIdf=IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")

In [12]:
kmeans=KMeans(k=10,seed=1)

In [13]:
#build a pipleline
pipeline = Pipeline(stages=[tokenizer, hashingTF, tfIdf, kmeans])

In [14]:
#Build a model with a pipeline
kMeansPipeLineModel=pipeline.fit(trainingData) #training phase

In [15]:
#Predict on test data
prediction=kMeansPipeLineModel.transform(testData) # testing phase

In [16]:
prediction.show()

+-----------------------+--------------------+--------------------+--------------------+----------+
|patient_characteristics|               words|        tempfeatures|            features|prediction|
+-----------------------+--------------------+--------------------+--------------------+----------+
|   (Note: Prior hist...|[(note:, prior, h...|(262144,[73409,87...|(262144,[73409,87...|         0|
|   (The lipid profil...|[(the, lipid, pro...|(262144,[3748,385...|(262144,[3748,385...|         0|
|   (Was written - Ga...|[(was, written, -...|(262144,[1546,275...|(262144,[1546,275...|         0|
|   18 years of age a...|[18, years, of, a...|(262144,[66776,76...|(262144,[66776,76...|         0|
|   2 positive Hepati...|[2, positive, hep...|(262144,[1546,366...|(262144,[1546,366...|         0|
|              20 000/mL|        [20, 000/ml]|(262144,[90804,11...|(262144,[90804,11...|         0|
|   50 copies/ml for ...|[50, copies/ml, f...|(262144,[1546,305...|(262144,[1546,305...|         0|
