#PySpark Practice Notebook
Experimenting further with Spark Dataframes

In [None]:
#Create CSV files for the following Datasets

patients
patientId,firstName,lastName,age
101,Alice,Smith,30
102,Bob,Johnson,45
103,Charlie,Williams,50
104,John,Smith,78

visits
visitId,patientId,visitDuration
1,101,15
2,101,30
3,102,45
4,102,30
5,104,20
6,103,60
7,103,50
8,104,45
9,,45
10,101,30
11,103,40
12,,30
13,104,25
15,102,15

In [3]:
#Import necessary spark libraries
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=ff07217e93392f1c5e164a11870ac377c3073370ed78da6dc3060489468ad4e0
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [18]:
#Create a SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit,concat
lti_spark=SparkSession.builder.appName('sushmitha').getOrCreate()

In [5]:
#Create dataframes from CSV files using enforced Schema
#Schems Directive [Names as string, everything else as integers]
%%writefile patients.csv
patientId,firstName,lastName,age
101,Alice,Smith,30
102,Bob,Johnson,45
103,Charlie,Williams,50
104,John,Smith,78






Writing patients.csv


In [6]:
%%writefile visits.csv
visitId,patientId,visitDuration
1,101,15
2,101,30
3,102,45
4,102,30
5,104,20
6,103,60
7,103,50
8,104,45
9,,45
10,101,30
11,103,40
12,,30
13,104,25
15,102,15

Writing visits.csv


In [23]:
df=lti_spark.read.option('header','True').csv('patients.csv')
df.show()

+---------+---------+--------+---+
|patientId|firstName|lastName|age|
+---------+---------+--------+---+
|      101|    Alice|   Smith| 30|
|      102|      Bob| Johnson| 45|
|      103|  Charlie|Williams| 50|
|      104|     John|   Smith| 78|
+---------+---------+--------+---+



In [8]:
df1=lti_spark.read.option('header','True').csv('visits.csv')
df1.show()

+-------+---------+-------------+
|visitId|patientId|visitDuration|
+-------+---------+-------------+
|      1|      101|           15|
|      2|      101|           30|
|      3|      102|           45|
|      4|      102|           30|
|      5|      104|           20|
|      6|      103|           60|
|      7|      103|           50|
|      8|      104|           45|
|      9|     NULL|           45|
|     10|      101|           30|
|     11|      103|           40|
|     12|     NULL|           30|
|     13|      104|           25|
|     15|      102|           15|
+-------+---------+-------------+



In [8]:
#Display all the valid visits
df1.filter(col('patientId').isNotNull()).show()


#Fetch the total number of valid visits
df1.filter(col('patientId').isNotNull()).count()


+-------+---------+-------------+
|visitId|patientId|visitDuration|
+-------+---------+-------------+
|      1|      101|           15|
|      2|      101|           30|
|      3|      102|           45|
|      4|      102|           30|
|      5|      104|           20|
|      6|      103|           60|
|      7|      103|           50|
|      8|      104|           45|
|     10|      101|           30|
|     11|      103|           40|
|     13|      104|           25|
|     15|      102|           15|
+-------+---------+-------------+



12

In [64]:
#Get the total hours of patient visit
from pyspark.sql.functions import sum

df1.groupBy('patientId').agg(sum('visitDuration')).show()


+---------+------------------+
|patientId|sum(visitDuration)|
+---------+------------------+
|      101|              75.0|
|     NULL|              75.0|
|      104|              90.0|
|      102|              90.0|
|      103|             150.0|
+---------+------------------+



In [59]:
#Fetch the top two most visited patients on the basis of occurances
from pyspark.sql.functions import desc
df1.groupBy('patientId').count().orderBy(desc('count')).show()



+---------+-----+
|patientId|count|
+---------+-----+
|      101|    3|
|      104|    3|
|      102|    3|
|      103|    3|
|     NULL|    2|
+---------+-----+



In [28]:
#Add a new column patientFullName & display complete dataframe

from pyspark.sql.functions import col,lit
df2=df.withColumn('patientFullName',concat(df.firstName,df.lastName))
df2.show()

+---------+---------+--------+---+---------------+
|patientId|firstName|lastName|age|patientFullName|
+---------+---------+--------+---+---------------+
|      101|    Alice|   Smith| 30|     AliceSmith|
|      102|      Bob| Johnson| 45|     BobJohnson|
|      103|  Charlie|Williams| 50|CharlieWilliams|
|      104|     John|   Smith| 78|      JohnSmith|
+---------+---------+--------+---+---------------+



In [29]:
df2.show()

+---------+---------+--------+---+---------------+
|patientId|firstName|lastName|age|patientFullName|
+---------+---------+--------+---+---------------+
|      101|    Alice|   Smith| 30|     AliceSmith|
|      102|      Bob| Johnson| 45|     BobJohnson|
|      103|  Charlie|Williams| 50|CharlieWilliams|
|      104|     John|   Smith| 78|      JohnSmith|
+---------+---------+--------+---+---------------+



In [37]:
#Display the name of patients with maximum visiting hours
from pyspark.sql.functions import sum,desc
df3=df2.join(df1,df2.patientId==df1.patientId)
df3.groupBy('patientFullName').agg(sum('visitDuration')).orderBy(desc(sum('visitDuration'))).limit(1).show()



+---------------+------------------+
|patientFullName|sum(visitDuration)|
+---------------+------------------+
|CharlieWilliams|             150.0|
+---------------+------------------+



In [41]:
#Find the patient with maximum visiting hours in a single visit
df3.orderBy(desc('visitDuration')).limit(1).show()


+---------+---------+--------+---+---------------+-------+---------+-------------+
|patientId|firstName|lastName|age|patientFullName|visitId|patientId|visitDuration|
+---------+---------+--------+---+---------------+-------+---------+-------------+
|      103|  Charlie|Williams| 50|CharlieWilliams|      6|      103|           60|
+---------+---------+--------+---+---------------+-------+---------+-------------+



In [42]:
#Show the patient info according to age-seniority
df.orderBy(desc('age')).show()

+---------+---------+--------+---+
|patientId|firstName|lastName|age|
+---------+---------+--------+---+
|      104|     John|   Smith| 78|
|      103|  Charlie|Williams| 50|
|      102|      Bob| Johnson| 45|
|      101|    Alice|   Smith| 30|
+---------+---------+--------+---+

