<a href="https://colab.research.google.com/github/Narges1989/PySpark-Tutorial/blob/main/PySparkLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Tutorial

In [None]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=f986beda4fc8dbc36c283dcf6d23567193f5cfc9274a03598864236944675ba2
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


## 1. Create pyspark Session

In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('heart').getOrCreate()

In [None]:
spark

## 2. Read and Write Data

In [None]:
df = spark.read.csv('heart.csv', header = True, inferSchema=True)

In [None]:
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+-

In [None]:
df.write.format('csv').save('heart2')

In [None]:
# headers
df.columns

['Age',
 'Sex',
 'ChestPainType',
 'RestingBP',
 'Cholesterol',
 'FastingBS',
 'RestingECG',
 'MaxHR',
 'ExerciseAngina',
 'Oldpeak',
 'ST_Slope',
 'HeartDisease']

In [None]:
# number of fetures
len(df.columns)

12

In [None]:
# number of data rows
df.count()

918

In [None]:
df.dtypes

[('Age', 'int'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'int'),
 ('Cholesterol', 'int'),
 ('FastingBS', 'int'),
 ('RestingECG', 'string'),
 ('MaxHR', 'int'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'double'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'int')]

## 3. Show, Add and Drop a Specific Column

In [None]:
df.select('Age').show(10)

+---+
|Age|
+---+
| 40|
| 49|
| 37|
| 48|
| 54|
| 39|
| 45|
| 54|
| 37|
| 48|
+---+
only showing top 10 rows



In [None]:
# add new column
df = df.withColumn('Age 2023', df['Age']+2)
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Age 2023|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|      42|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|      51|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|      39|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|      50|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|       

In [None]:
# remove specific column
df = df.drop('Age 2023')
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+-

In [None]:
# change column name (header)
df.withColumnRenamed('HeartDisease','stroke').show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|stroke|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|     0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|     1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|     0|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------+
only showing top 3 rows



## 4. Query in pyspark

In [None]:
df[df['Age']>65].show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 66|  M|          ASY|      140|        139|        0|    Normal|   94|             Y|    1.0|    Flat|           1|
| 74|  M|          ATA|      145|          0|        1|        ST|  123|             N|    1.3|      Up|           1|
| 68|  M|          ASY|      145|          0|        1|    Normal|  136|             N|    1.8|      Up|           1|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [None]:
from pyspark.sql.functions import col

filtered_df = df.filter((col('Age') > 65) & (col('Sex') == 'M'))
filtered_df.show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 66|  M|          ASY|      140|        139|        0|    Normal|   94|             Y|    1.0|    Flat|           1|
| 74|  M|          ATA|      145|          0|        1|        ST|  123|             N|    1.3|      Up|           1|
| 68|  M|          ASY|      145|          0|        1|    Normal|  136|             N|    1.8|      Up|           1|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [None]:
df.filter('Age>60').show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 65|  M|          ASY|      140|        306|        1|    Normal|   87|             Y|    1.5|    Flat|           1|
| 63|  M|          ASY|      150|        223|        0|    Normal|  115|             N|    0.0|    Flat|           1|
| 66|  M|          ASY|      140|        139|        0|    Normal|   94|             Y|    1.0|    Flat|           1|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [None]:
df.where((df['Age']>60) & (df['Sex']=='M')).show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 65|  M|          ASY|      140|        306|        1|    Normal|   87|             Y|    1.5|    Flat|           1|
| 63|  M|          ASY|      150|        223|        0|    Normal|  115|             N|    0.0|    Flat|           1|
| 66|  M|          ASY|      140|        139|        0|    Normal|   94|             Y|    1.0|    Flat|           1|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



## 5. Statistical Analysis

In [None]:
df.describe().show()

+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|summary|               Age| Sex|ChestPainType|         RestingBP|       Cholesterol|          FastingBS|RestingECG|             MaxHR|ExerciseAngina|           Oldpeak|ST_Slope|       HeartDisease|
+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|  count|               918| 918|          918|               918|               918|                918|       918|               918|           918|               918|     918|                918|
|   mean|53.510893246187365|NULL|         NULL|132.39651416122004| 198.7995642701525|0.23311546840958605|      NULL|136.80936819172112|          NULL|0.8873638344226581|    NULL| 0.5533769063180828|
| std

In [None]:
# change data type
from pyspark.sql.types import FloatType, IntegerType

df = df.withColumn('Age', df['Age'].cast(FloatType()))
df.printSchema()

root
 |-- Age: float (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



## 6. Convert Spark Dataframe to Pandas DataFrame and Inversely

In [None]:
# spark to pandas
pd_df = df.toPandas()


In [None]:
type(pd_df)

In [None]:
# pandas to spark
sp_df = spark.createDataFrame(pd_df)

In [None]:
type(sp_df)

## 7. Missing Value Handling

In [None]:
data = [
    ('James', 'M', 1, None),
    ('Anna', 'F', None, None),
    ('Ali', 'M', 1, 'NY'),
    ('Sara', 'F', None, 'LS'),
    (None, 'M', 1, None),
    (None, None, None, None),
]

columns = ['name','sex', 'marriage','state']

df_temp = spark.createDataFrame(data,columns)
df_temp.show()

+-----+----+--------+-----+
| name| sex|marriage|state|
+-----+----+--------+-----+
|James|   M|       1| NULL|
| Anna|   F|    NULL| NULL|
|  Ali|   M|       1|   NY|
| Sara|   F|    NULL|   LS|
| NULL|   M|       1| NULL|
| NULL|NULL|    NULL| NULL|
+-----+----+--------+-----+



In [None]:
# drop null rows
df_temp.na.drop(how='all').show()

+-----+---+--------+-----+
| name|sex|marriage|state|
+-----+---+--------+-----+
|James|  M|       1| NULL|
| Anna|  F|    NULL| NULL|
|  Ali|  M|       1|   NY|
| Sara|  F|    NULL|   LS|
| NULL|  M|       1| NULL|
+-----+---+--------+-----+



In [None]:
# define threshold for removing null values
df_temp.na.drop(thresh=1).show()

+-----+---+--------+-----+
| name|sex|marriage|state|
+-----+---+--------+-----+
|James|  M|       1| NULL|
| Anna|  F|    NULL| NULL|
|  Ali|  M|       1|   NY|
| Sara|  F|    NULL|   LS|
| NULL|  M|       1| NULL|
+-----+---+--------+-----+



In [None]:
# fill missing value with a specific value
df_temp.na.fill(value = 'x' , subset =['name', 'Sex']).show()

+-----+---+--------+-----+
| name|sex|marriage|state|
+-----+---+--------+-----+
|James|  M|       1| NULL|
| Anna|  F|    NULL| NULL|
|  Ali|  M|       1|   NY|
| Sara|  F|    NULL|   LS|
|    x|  M|       1| NULL|
|    x|  x|    NULL| NULL|
+-----+---+--------+-----+



In [None]:
# fill missing value with imputation different parameter
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCol='marriage', outputCol='marriage').setStrategy('mode')

imputer.fit(df_temp).transform(df_temp).show()

+-----+----+--------+-----+
| name| sex|marriage|state|
+-----+----+--------+-----+
|James|   M|       1| NULL|
| Anna|   F|       1| NULL|
|  Ali|   M|       1|   NY|
| Sara|   F|       1|   LS|
| NULL|   M|       1| NULL|
| NULL|NULL|       1| NULL|
+-----+----+--------+-----+



## 8. Grouby and Orderby

In [None]:
df.show(5)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
|48.0|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
|54.0|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+----+---+-------------+---------+-----------+--

In [None]:
df.groupBy('Sex').count().show()

+---+-----+
|Sex|count|
+---+-----+
|  F|  193|
|  M|  725|
+---+-----+



In [None]:
df.groupBy('Age').count().show(5)

+----+-----+
| Age|count|
+----+-----+
|64.0|   22|
|47.0|   19|
|58.0|   42|
|39.0|   15|
|30.0|    1|
+----+-----+
only showing top 5 rows



In [None]:
df.groupBy('Age').mean('Cholesterol').show(5)

+----+------------------+
| Age|  avg(Cholesterol)|
+----+------------------+
|64.0| 205.3181818181818|
|47.0|193.42105263157896|
|58.0|217.28571428571428|
|39.0|             239.2|
|30.0|             237.0|
+----+------------------+
only showing top 5 rows



In [None]:
# ordering groupby output
from pyspark.sql.functions import desc,asc
df.groupBy('Age').count().orderBy(desc('Age')).show(5)

+----+-----+
| Age|count|
+----+-----+
|77.0|    2|
|76.0|    2|
|75.0|    3|
|74.0|    7|
|73.0|    1|
+----+-----+
only showing top 5 rows



In [None]:
# groupby by multiple columns
from pyspark.sql.functions import min,max, avg

df.groupBy('HeartDisease').agg(avg('Cholesterol'), avg('RestingBP')).show()

+------------+------------------+------------------+
|HeartDisease|  avg(Cholesterol)|    avg(RestingBP)|
+------------+------------------+------------------+
|           1|175.94094488188978|134.18503937007873|
|           0| 227.1219512195122|130.18048780487806|
+------------+------------------+------------------+



In [None]:
# pivot: reshape data based on column **values**
df.groupBy('Age').pivot('Sex',('M','F')).count().show(5)

+----+----+---+
| Age|   M|  F|
+----+----+---+
|64.0|  16|  6|
|47.0|  15|  4|
|58.0|  35|  7|
|39.0|  12|  3|
|30.0|NULL|  1|
+----+----+---+
only showing top 5 rows



## 9. Machine Learning Algorithms in pyspark

In [None]:
df.show(5)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
|48.0|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
|54.0|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+----+---+-------------+---------+-----------+--

### Linear Regression

In [None]:
# defining features and target values
X_col = ['Cholesterol','RestingBP','MaxHR','FastingBS']
target_col = ['Age']

In [None]:
# convert features to wraped vector
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=X_col, outputCol='feature_vec3')
df = va.transform(df)
data = df.select(['Age','RestingBP','MaxHR','FastingBS','feature_vec3','Cholesterol'])

In [None]:
data.show(5)

+----+---------+-----+---------+--------------------+-----------+
| Age|RestingBP|MaxHR|FastingBS|        feature_vec3|Cholesterol|
+----+---------+-----+---------+--------------------+-----------+
|40.0|      140|  172|        0|[289.0,140.0,172....|        289|
|49.0|      160|  156|        0|[180.0,160.0,156....|        180|
|37.0|      130|   98|        0|[283.0,130.0,98.0...|        283|
|48.0|      138|  108|        0|[214.0,138.0,108....|        214|
|54.0|      150|  122|        0|[195.0,150.0,122....|        195|
+----+---------+-----+---------+--------------------+-----------+
only showing top 5 rows



In [None]:
# train test split
trainset,testset = data.randomSplit([0.75,0.25])

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='feature_vec3', labelCol='Age')

In [None]:
lr = lr.fit(trainset)

In [None]:
pred = lr.evaluate(testset)

In [None]:
# Access and print various metrics
print(f"Root Mean Squared Error (RMSE): {pred.rootMeanSquaredError}")
print(f"Mean Absolute Error (MAE): {pred.meanAbsoluteError}")
print(f"R^2 (Coefficient of Determination): {pred.r2}")
print(f"Explained Variance: {pred.explainedVariance}")
print(f"Mean Squared Error (MSE): {pred.meanSquaredError}")


Root Mean Squared Error (RMSE): 8.716355896022712
Mean Absolute Error (MAE): 7.102182762889659
R^2 (Coefficient of Determination): 0.15344419218348937
Explained Variance: 19.930124769213556
Mean Squared Error (MSE): 75.9748601061299


In [None]:
# Generate predictions on the test set
predictions = lr.transform(testset)

# Show the predictions along with the actual labels
predictions.select('feature_vec3', 'Age', 'prediction').show(3)

+--------------------+----+-----------------+
|        feature_vec3| Age|       prediction|
+--------------------+----+-----------------+
|[219.0,100.0,150....|31.0|47.49244600703679|
|[225.0,110.0,184....|32.0|44.16280757608736|
|[246.0,100.0,150....|33.0|47.49065785761302|
+--------------------+----+-----------------+
only showing top 3 rows



### Compare LogisticRegression and RandomForest

In [None]:
df = df.drop('feature_vec','feature_vec1','feature_vec2','feature_vec3')

In [None]:
x_col = ['Age','Cholesterol','RestingBP','MaxHR','FastingBS']
y_col = ['HeartDisease']

In [None]:
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=x_col, outputCol='feature_vec')
df = va.transform(df)
data = df.select(['feature_vec','HeartDisease'])

In [None]:
trainset, testset = data.randomSplit([0.75,0.25])

In [None]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression

rfc = RandomForestClassifier(featuresCol='feature_vec', labelCol='HeartDisease')
lrc = LogisticRegression(featuresCol='feature_vec', labelCol='HeartDisease')

In [None]:
rfc = rfc.fit(trainset)
lrc = lrc.fit(trainset)

In [None]:
pred_rfc = rfc.transform(testset)
pred_lrc = lrc.transform(testset)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="HeartDisease", rawPredictionCol="rawPrediction")

# Evaluate the model's performance using the Area Under ROC metric
auc_roc = evaluator.evaluate(pred_rfc, {evaluator.metricName: "areaUnderROC"})
print(f"Area Under ROC: {auc_roc}")

# Evaluate the model's performance using the Area Under PR metric
auc_pr = evaluator.evaluate(pred_rfc, {evaluator.metricName: "areaUnderPR"})
print(f"Area Under PR: {auc_pr}")


Area Under ROC: 0.7588149763722283
Area Under PR: 0.8056855512770454


In [None]:
# Evaluate the model's performance using the Area Under ROC metric
auc_roc = evaluator.evaluate(pred_lrc, {evaluator.metricName: "areaUnderROC"})
print(f"Area Under ROC: {auc_roc}")

# Evaluate the model's performance using the Area Under PR metric
auc_pr = evaluator.evaluate(pred_lrc, {evaluator.metricName: "areaUnderPR"})
print(f"Area Under PR: {auc_pr}")


Area Under ROC: 0.7707015630679751
Area Under PR: 0.8176908248800365


## 10. Convert Categorical Data to Numerical

In [None]:
from pyspark.ml.feature import StringIndexer

sex_ind = StringIndexer(inputCol='Sex', outputCol='Sex_num')
df_new = sex_ind.fit(df).transform(df)
df_new.show(5)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+-------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|         feature_vec|Sex_num|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+-------+
|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|[40.0,289.0,140.0...|    0.0|
|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|[49.0,180.0,160.0...|    1.0|
|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|[37.0,283.0,130.0...|    0.0|
|48.0|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|       

In [None]:
cpt_ind = StringIndexer(inputCol='ChestPainType', outputCol='ChestPainType_num')
df_new = cpt_ind.fit(df_new).transform(df_new)


In [None]:
df_new.select('Sex','Sex_num','ChestPainType','ChestPainType_num').show(5)

+---+-------+-------------+-----------------+
|Sex|Sex_num|ChestPainType|ChestPainType_num|
+---+-------+-------------+-----------------+
|  M|    0.0|          ATA|              2.0|
|  F|    1.0|          NAP|              1.0|
|  M|    0.0|          ATA|              2.0|
|  F|    1.0|          ASY|              0.0|
|  M|    0.0|          NAP|              1.0|
+---+-------+-------------+-----------------+
only showing top 5 rows



## 11. Join in Pyspark

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('index', monotonically_increasing_id())
df.show(5)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+-----+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|         feature_vec|index|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+-----+
|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|[40.0,289.0,140.0...|    0|
|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|[49.0,180.0,160.0...|    1|
|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|[37.0,283.0,130.0...|    2|
|48.0|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|[48.0,

Change order of Columns

In [None]:
cols = df.columns

# Reorder the columns: move the last column to the first position
df = df.select([cols[-1]] + cols[:-1])

In [None]:
df.show(5)

+-----+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+
|index| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|         feature_vec|
+-----+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+
|    0|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|[40.0,289.0,140.0...|
|    1|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|[49.0,180.0,160.0...|
|    2|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|[37.0,283.0,130.0...|
|    3|48.0|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|

In [None]:
df_p = spark.read.csv('/content/patients.csv', header=True)
df_p.show(5)

+---+---------+---------+
| id|FirstName| LastName|
+---+---------+---------+
|  4|     Lily| Hamilton|
|  5|   Gianna|Armstrong|
|  6|  Arianna|    Riley|
|  7| Caroline|   Turner|
|  8|   Wilson|   Murray|
+---+---------+---------+
only showing top 5 rows



In [None]:
df_p.count()

999

In [None]:
df_join = df.join(df_p, df.index==df_p.id, how='inner')
df_join.show(5)

+-----+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+---+---------+---------+
|index| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|         feature_vec| id|FirstName| LastName|
+-----+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+---+---------+---------+
|    4|54.0|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|[54.0,195.0,150.0...|  4|     Lily| Hamilton|
|    5|39.0|  M|          NAP|      120|        339|        0|    Normal|  170|             N|    0.0|      Up|           0|[39.0,339.0,120.0...|  5|   Gianna|Armstrong|
|    6|45.0|  F|          ATA|      130|        237|        0|    Normal|  170|             N|    0.0|      Up|           0|[45.0,237.0,130.0...|  6| 

In [None]:
df_outer = df.join(df_p, df.index==df_p.id, how='outer')
df_outer.count()

1003

In [None]:
df_left = df.join(df_p, df.index==df_p.id, how='left')
df_left.count()

918

In [None]:
df_right = df.join(df_p, df.index==df_p.id, how='right')
df_right.count()

999

## 12. Builtin Functions

In [None]:
from pyspark.sql.functions import upper, lower, substring

df.select(upper('ChestPainType'), lower('ChestPainType'), substring('ChestPainType', 2, 2)).show(5)

+--------------------+--------------------+------------------------------+
|upper(ChestPainType)|lower(ChestPainType)|substring(ChestPainType, 2, 2)|
+--------------------+--------------------+------------------------------+
|                 ATA|                 ata|                            TA|
|                 NAP|                 nap|                            AP|
|                 ATA|                 ata|                            TA|
|                 ASY|                 asy|                            SY|
|                 NAP|                 nap|                            AP|
+--------------------+--------------------+------------------------------+
only showing top 5 rows



### DateTime

In [None]:
from pyspark.sql.functions import expr

df= df.withColumn('date', expr('date_add(current_timestamp(), cast(rand()*365 as int))'))

In [None]:
df.show(5)

+-----+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+----------+
|index| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|         feature_vec|      date|
+-----+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+----------+
|    0|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|[40.0,289.0,140.0...|2025-03-14|
|    1|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|[49.0,180.0,160.0...|2024-12-04|
|    2|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|[37.0,283.0,130.0...|2024-10-16|
|    3|48.0|  F|          ASY|      138|        214|      