In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd

# Loading Data / Dataset

Create a Spark session

In [9]:
session = SparkSession.builder.getOrCreate()

In [29]:
# Read dataset with columns name/header
header_df = session.read.option("header","true").csv("Kidney.csv")

In [31]:
header_df.columns

['age',
 'bp',
 'sg',
 'al',
 'su',
 'rbc',
 'pc',
 'pcc',
 'ba',
 'bgr',
 'bu',
 'sc',
 'sod',
 'pot',
 'hemo',
 'pcv',
 'wbcc',
 'rbcc',
 'htn',
 'dm',
 'cad',
 'appet',
 'pe',
 'ane',
 'class']

In [33]:
# Read dataset without column names/headers
without_header_df2 = session.read.csv("Kidney.csv")

In [34]:
without_header_df2.columns

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13',
 '_c14',
 '_c15',
 '_c16',
 '_c17',
 '_c18',
 '_c19',
 '_c20',
 '_c21',
 '_c22',
 '_c23',
 '_c24']

In [35]:
# we are using header dataset
df = header_df

In [36]:
# dataset shema structure
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- bp: string (nullable = true)
 |-- sg: string (nullable = true)
 |-- al: string (nullable = true)
 |-- su: string (nullable = true)
 |-- rbc: string (nullable = true)
 |-- pc: string (nullable = true)
 |-- pcc: string (nullable = true)
 |-- ba: string (nullable = true)
 |-- bgr: string (nullable = true)
 |-- bu: string (nullable = true)
 |-- sc: string (nullable = true)
 |-- sod: string (nullable = true)
 |-- pot: string (nullable = true)
 |-- hemo: string (nullable = true)
 |-- pcv: string (nullable = true)
 |-- wbcc: string (nullable = true)
 |-- rbcc: string (nullable = true)
 |-- htn: string (nullable = true)
 |-- dm: string (nullable = true)
 |-- cad: string (nullable = true)
 |-- appet: string (nullable = true)
 |-- pe: string (nullable = true)
 |-- ane: string (nullable = true)
 |-- class: string (nullable = true)



In [38]:
# dataset summary
df.describe().show()

23/04/20 20:18:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 17:>                                                         (0 + 1) / 1]

+-------+-----------------+-----------------+--------------------+------------------+-------------------+------+------+-------+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+----+----+----+-----+----+----+------+
|summary|              age|               bp|                  sg|                al|                 su|   rbc|    pc|    pcc|     ba|              bgr|                bu|                sc|               sod|              pot|              hemo|              pcv|              wbcc|              rbcc| htn|  dm| cad|appet|  pe| ane| class|
+-------+-----------------+-----------------+--------------------+------------------+-------------------+------+------+-------+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+----+----+---

                                                                                

# View/Display/Show/Print Dataset

In [40]:
# print the dataset
df.show()

+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+
|age| bp|   sg| al| su|     rbc|      pc|       pcc|        ba|bgr| bu| sc|sod|pot|hemo|pcv| wbcc|rbcc|htn| dm|cad|appet| pe|ane|class|
+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+
| 48| 80|1.020|  1|  0|       ?|  normal|notpresent|notpresent|121| 36|1.2|  ?|  ?|15.4| 44| 7800| 5.2|yes|yes| no| good| no| no|  ckd|
|  7| 50|1.020|  4|  0|       ?|  normal|notpresent|notpresent|  ?| 18|0.8|  ?|  ?|11.3| 38| 6000|   ?| no| no| no| good| no| no|  ckd|
| 62| 80|1.010|  2|  3|  normal|  normal|notpresent|notpresent|423| 53|1.8|  ?|  ?| 9.6| 31| 7500|   ?| no|yes| no| poor| no|yes|  ckd|
| 48| 70|1.005|  4|  0|  normal|abnormal|   present|notpresent|117| 56|3.8|111|2.5|11.2| 32| 6700| 3.9|yes| no| no| poor|yes|yes|  ckd|
| 51| 80|1.010|  2|  0|  normal|  normal|notpres

In [41]:
# Print particular/set of data
df.show(5)

+---+---+-----+---+---+------+--------+----------+----------+---+---+---+---+---+----+---+----+----+---+---+---+-----+---+---+-----+
|age| bp|   sg| al| su|   rbc|      pc|       pcc|        ba|bgr| bu| sc|sod|pot|hemo|pcv|wbcc|rbcc|htn| dm|cad|appet| pe|ane|class|
+---+---+-----+---+---+------+--------+----------+----------+---+---+---+---+---+----+---+----+----+---+---+---+-----+---+---+-----+
| 48| 80|1.020|  1|  0|     ?|  normal|notpresent|notpresent|121| 36|1.2|  ?|  ?|15.4| 44|7800| 5.2|yes|yes| no| good| no| no|  ckd|
|  7| 50|1.020|  4|  0|     ?|  normal|notpresent|notpresent|  ?| 18|0.8|  ?|  ?|11.3| 38|6000|   ?| no| no| no| good| no| no|  ckd|
| 62| 80|1.010|  2|  3|normal|  normal|notpresent|notpresent|423| 53|1.8|  ?|  ?| 9.6| 31|7500|   ?| no|yes| no| poor| no|yes|  ckd|
| 48| 70|1.005|  4|  0|normal|abnormal|   present|notpresent|117| 56|3.8|111|2.5|11.2| 32|6700| 3.9|yes| no| no| poor|yes|yes|  ckd|
| 51| 80|1.010|  2|  0|normal|  normal|notpresent|notpresent|106| 26|

In [44]:
# configure for eager mode
session.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [45]:
df

age,bp,sg,al,su,rbc,pc,pcc,ba,bgr,bu,sc,sod,pot,hemo,pcv,wbcc,rbcc,htn,dm,cad,appet,pe,ane,class
48,80,1.020,1,0,?,normal,notpresent,notpresent,121,36,1.2,?,?,15.4,44,7800,5.2,yes,yes,no,good,no,no,ckd
7,50,1.020,4,0,?,normal,notpresent,notpresent,?,18,0.8,?,?,11.3,38,6000,?,no,no,no,good,no,no,ckd
62,80,1.010,2,3,normal,normal,notpresent,notpresent,423,53,1.8,?,?,9.6,31,7500,?,no,yes,no,poor,no,yes,ckd
48,70,1.005,4,0,normal,abnormal,present,notpresent,117,56,3.8,111,2.5,11.2,32,6700,3.9,yes,no,no,poor,yes,yes,ckd
51,80,1.010,2,0,normal,normal,notpresent,notpresent,106,26,1.4,?,?,11.6,35,7300,4.6,no,no,no,good,no,no,ckd
60,90,1.015,3,0,?,?,notpresent,notpresent,74,25,1.1,142,3.2,12.2,39,7800,4.4,yes,yes,no,good,yes,no,ckd
68,70,1.010,0,0,?,normal,notpresent,notpresent,100,54,24.0,104,4,12.4,36,?,?,no,no,no,good,no,no,ckd
24,?,1.015,2,4,normal,abnormal,notpresent,notpresent,410,31,1.1,?,?,12.4,44,6900,5,no,yes,no,good,yes,no,ckd
52,100,1.015,3,0,normal,abnormal,present,notpresent,138,60,1.9,?,?,10.8,33,9600,4,yes,yes,no,good,no,yes,ckd
53,90,1.020,2,0,abnormal,abnormal,present,notpresent,70,107,7.2,114,3.7,9.5,29,12100,3.7,yes,yes,no,poor,no,yes,ckd


In [49]:
# Display on virticl mode
df.show(vertical=True)

-RECORD 0-----------
 age   | 48         
 bp    | 80         
 sg    | 1.020      
 al    | 1          
 su    | 0          
 rbc   | ?          
 pc    | normal     
 pcc   | notpresent 
 ba    | notpresent 
 bgr   | 121        
 bu    | 36         
 sc    | 1.2        
 sod   | ?          
 pot   | ?          
 hemo  | 15.4       
 pcv   | 44         
 wbcc  | 7800       
 rbcc  | 5.2        
 htn   | yes        
 dm    | yes        
 cad   | no         
 appet | good       
 pe    | no         
 ane   | no         
 class | ckd        
-RECORD 1-----------
 age   | 7          
 bp    | 50         
 sg    | 1.020      
 al    | 4          
 su    | 0          
 rbc   | ?          
 pc    | normal     
 pcc   | notpresent 
 ba    | notpresent 
 bgr   | ?          
 bu    | 18         
 sc    | 0.8        
 sod   | ?          
 pot   | ?          
 hemo  | 11.3       
 pcv   | 38         
 wbcc  | 6000       
 rbcc  | ?          
 htn   | no         
 dm    | no         
 cad   | no  

In [50]:
df.collect()

[Row(age='48', bp='80', sg='1.020', al='1', su='0', rbc='?', pc='normal', pcc='notpresent', ba='notpresent', bgr='121', bu='36', sc='1.2', sod='?', pot='?', hemo='15.4', pcv='44', wbcc='7800', rbcc='5.2', htn='yes', dm='yes', cad='no', appet='good', pe='no', ane='no', class='ckd'),
 Row(age='7', bp='50', sg='1.020', al='4', su='0', rbc='?', pc='normal', pcc='notpresent', ba='notpresent', bgr='?', bu='18', sc='0.8', sod='?', pot='?', hemo='11.3', pcv='38', wbcc='6000', rbcc='?', htn='no', dm='no', cad='no', appet='good', pe='no', ane='no', class='ckd'),
 Row(age='62', bp='80', sg='1.010', al='2', su='3', rbc='normal', pc='normal', pcc='notpresent', ba='notpresent', bgr='423', bu='53', sc='1.8', sod='?', pot='?', hemo='9.6', pcv='31', wbcc='7500', rbcc='?', htn='no', dm='yes', cad='no', appet='poor', pe='no', ane='yes', class='ckd'),
 Row(age='48', bp='70', sg='1.005', al='4', su='0', rbc='normal', pc='abnormal', pcc='present', ba='notpresent', bgr='117', bu='56', sc='3.8', sod='111', po

In [51]:
# safe version
df.take(2)

[Row(age='48', bp='80', sg='1.020', al='1', su='0', rbc='?', pc='normal', pcc='notpresent', ba='notpresent', bgr='121', bu='36', sc='1.2', sod='?', pot='?', hemo='15.4', pcv='44', wbcc='7800', rbcc='5.2', htn='yes', dm='yes', cad='no', appet='good', pe='no', ane='no', class='ckd'),
 Row(age='7', bp='50', sg='1.020', al='4', su='0', rbc='?', pc='normal', pcc='notpresent', ba='notpresent', bgr='?', bu='18', sc='0.8', sod='?', pot='?', hemo='11.3', pcv='38', wbcc='6000', rbcc='?', htn='no', dm='no', cad='no', appet='good', pe='no', ane='no', class='ckd')]

In [54]:
# collect data from bottom/tail/end of the dataset
df.tail(10)

[Row(age='52', bp='80', sg='1.025', al='0', su='0', rbc='normal', pc='normal', pcc='notpresent', ba='notpresent', bgr='99', bu='25', sc='0.8', sod='135', pot='3.7', hemo='15', pcv='52', wbcc='6300', rbcc='5.3', htn='no', dm='no', cad='no', appet='good', pe='no', ane='no', class='notckd'),
 Row(age='36', bp='80', sg='1.025', al='0', su='0', rbc='normal', pc='normal', pcc='notpresent', ba='notpresent', bgr='85', bu='16', sc='1.1', sod='142', pot='4.1', hemo='15.6', pcv='44', wbcc='5800', rbcc='6.3', htn='no', dm='no', cad='no', appet='good', pe='no', ane='no', class='notckd'),
 Row(age='57', bp='80', sg='1.020', al='0', su='0', rbc='normal', pc='normal', pcc='notpresent', ba='notpresent', bgr='133', bu='48', sc='1.2', sod='147', pot='4.3', hemo='14.8', pcv='46', wbcc='6600', rbcc='5.5', htn='no', dm='no', cad='no', appet='good', pe='no', ane='no', class='notckd'),
 Row(age='43', bp='60', sg='1.025', al='0', su='0', rbc='normal', pc='normal', pcc='notpresent', ba='notpresent', bgr='117', 

In [56]:
# panda dataframe conversion
pd_df = df.toPandas()

In [58]:
# Check dataframe type
type(pd_df)

pandas.core.frame.DataFrame

# Data Slection 

In [60]:
# via column name as a property
df.age

Column<'age'>

In [61]:
# Via Column name as a slicing/index value
df["age"]

Column<'age'>

In [63]:
# Select of a column 
from pyspark.sql import Column
from pyspark.sql.functions import upper
df.select(df.age).show()

+---+
|age|
+---+
| 48|
|  7|
| 62|
| 48|
| 51|
| 60|
| 68|
| 24|
| 52|
| 53|
| 50|
| 63|
| 68|
| 68|
| 68|
| 40|
| 47|
| 47|
| 60|
| 62|
+---+
only showing top 20 rows



In [67]:
type(df.select(df.age))

pyspark.sql.dataframe.DataFrame

## Adding/Insertion of a new column

In [72]:
df.withColumn("new_rbc",upper(df.rbc)).show()
# df.select(df.rbc).show()

+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+--------+
|age| bp|   sg| al| su|     rbc|      pc|       pcc|        ba|bgr| bu| sc|sod|pot|hemo|pcv| wbcc|rbcc|htn| dm|cad|appet| pe|ane|class| new_rbc|
+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+--------+
| 48| 80|1.020|  1|  0|       ?|  normal|notpresent|notpresent|121| 36|1.2|  ?|  ?|15.4| 44| 7800| 5.2|yes|yes| no| good| no| no|  ckd|       ?|
|  7| 50|1.020|  4|  0|       ?|  normal|notpresent|notpresent|  ?| 18|0.8|  ?|  ?|11.3| 38| 6000|   ?| no| no| no| good| no| no|  ckd|       ?|
| 62| 80|1.010|  2|  3|  normal|  normal|notpresent|notpresent|423| 53|1.8|  ?|  ?| 9.6| 31| 7500|   ?| no|yes| no| poor| no|yes|  ckd|  NORMAL|
| 48| 70|1.005|  4|  0|  normal|abnormal|   present|notpresent|117| 56|3.8|111|2.5|11.2| 32| 6700| 3.9|yes| no| no| poor|yes|yes| 

### Filtering

In [73]:
df.filter(df.age>20).show()

+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+
|age| bp|   sg| al| su|     rbc|      pc|       pcc|        ba|bgr| bu| sc|sod|pot|hemo|pcv| wbcc|rbcc|htn| dm|cad|appet| pe|ane|class|
+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+
| 48| 80|1.020|  1|  0|       ?|  normal|notpresent|notpresent|121| 36|1.2|  ?|  ?|15.4| 44| 7800| 5.2|yes|yes| no| good| no| no|  ckd|
| 62| 80|1.010|  2|  3|  normal|  normal|notpresent|notpresent|423| 53|1.8|  ?|  ?| 9.6| 31| 7500|   ?| no|yes| no| poor| no|yes|  ckd|
| 48| 70|1.005|  4|  0|  normal|abnormal|   present|notpresent|117| 56|3.8|111|2.5|11.2| 32| 6700| 3.9|yes| no| no| poor|yes|yes|  ckd|
| 51| 80|1.010|  2|  0|  normal|  normal|notpresent|notpresent|106| 26|1.4|  ?|  ?|11.6| 35| 7300| 4.6| no| no| no| good| no| no|  ckd|
| 60| 90|1.015|  3|  0|       ?|       ?|notpres

In [79]:
df.filter(df.su>2 ).show()

+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+
|age| bp|   sg| al| su|     rbc|      pc|       pcc|        ba|bgr| bu| sc|sod|pot|hemo|pcv| wbcc|rbcc|htn| dm|cad|appet| pe|ane|class|
+---+---+-----+---+---+--------+--------+----------+----------+---+---+---+---+---+----+---+-----+----+---+---+---+-----+---+---+-----+
| 62| 80|1.010|  2|  3|  normal|  normal|notpresent|notpresent|423| 53|1.8|  ?|  ?| 9.6| 31| 7500|   ?| no|yes| no| poor| no|yes|  ckd|
| 24|  ?|1.015|  2|  4|  normal|abnormal|notpresent|notpresent|410| 31|1.1|  ?|  ?|12.4| 44| 6900|   5| no|yes| no| good|yes| no|  ckd|
| 50| 60|1.010|  2|  4|       ?|abnormal|   present|notpresent|490| 55|  4|  ?|  ?| 9.4| 28|    ?|   ?|yes|yes| no| good| no|yes|  ckd|
| 60|100|1.025|  0|  3|       ?|  normal|notpresent|notpresent|263| 27|1.3|135|4.3|12.7| 37|11400| 4.3|yes|yes|yes| good| no| no|  ckd|
| 69| 70|1.010|  3|  4|  normal|abnormal|notpres