In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import isnan, when, count, col
import pandas as pd

## Explanation of `pyspark.sql.functions`

- `isnan`: This function checks whether a column contains **"NaN"** (Not a Number) values.
- `when`: This function is used to create conditional expressions. It is similar to an **"if-else"** statement, where you specify a condition and a result if the condition is met.
- `count`: This function counts the **number of occurrences** that match a condition.
- `col`: This function is used to refer to a **column** in the DataFrame.

In [2]:
sc = SparkContext()
sqlc = SQLContext(sc)

24/10/24 10:41:13 WARN Utils: Your hostname, htetaunglynn-XPS-13-9310 resolves to a loopback address: 127.0.1.1; using 192.168.1.108 instead (on interface wlp0s20f3)
24/10/24 10:41:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/24 10:41:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/24 10:41:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = sqlc.read.format('csv').options(header='true', inferschema='true').load('iris.csv')
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [4]:
df.columns

['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']

In [5]:
df.select(df['sepal_length']).show(5)

+------------+
|sepal_length|
+------------+
|         5.1|
|         4.9|
|         4.7|
|         4.6|
|         5.0|
+------------+
only showing top 5 rows



In [6]:
# check whether NaN value include or not
df.select(isnan('species')).show(5)

+--------------+
|isnan(species)|
+--------------+
|         false|
|         false|
|         false|
|         false|
|         false|
+--------------+
only showing top 5 rows



In [7]:
# check whether Null value include or not
df.select(col('species').isNull()).show(5)

+-----------------+
|(species IS NULL)|
+-----------------+
|            false|
|            false|
|            false|
|            false|
|            false|
+-----------------+
only showing top 5 rows



In [8]:
df.select(df['species'].alias('hello')).show(5)

+------+
| hello|
+------+
|setosa|
|setosa|
|setosa|
|setosa|
|setosa|
+------+
only showing top 5 rows



In [9]:
# If the value in 'species' column is 'NaN' or 'Null', it will return 'species'. Unless returns None.
when(isnan('species') | col('species').isNull(), 'species')

Column<'CASE WHEN (isnan(species) OR (species IS NULL)) THEN species END'>

In [10]:
count(when(isnan('species') | col('species').isNull(), 'species')).alias('null_count')

Column<'count(CASE WHEN (isnan(species) OR (species IS NULL)) THEN species END) AS null_count'>

In [11]:
df.select(count(when(isnan('species') | col('species').isNull(), 'species')).alias('null_count')).show()

+----------+
|null_count|
+----------+
|         0|
+----------+



In [12]:
null_count = []
for c in df.columns:
    
    check = when(isnan(c) | col(c).isNull(), c) # conditional check for NaN or null values
    
    num = count(check)                          # number of NaN or null values
    
    list_ = num.alias(c)                        # collect col name and its null count
    # print(type(list_))                        # type = 'pyspark.sql.column.Column'
    
    null_count.append(list_)                    # append collected data into a list
    print(df.select(null_count).show())

print("FINAL RESULT AS A PYSPARK DATAFRAME \
     \n************************************")
df.select(null_count).show()                    # show as a pyspark data frame


+------------+
|sepal_length|
+------------+
|           0|
+------------+

None
+------------+-----------+
|sepal_length|sepal_width|
+------------+-----------+
|           0|          5|
+------------+-----------+

None
+------------+-----------+------------+
|sepal_length|sepal_width|petal_length|
+------------+-----------+------------+
|           0|          5|           3|
+------------+-----------+------------+

None
+------------+-----------+------------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|
+------------+-----------+------------+-----------+
|           0|          5|           3|          0|
+------------+-----------+------------+-----------+

None
+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|           0|          5|           3|          0|      0|
+------------+-----------+------------+-----------+-------+

None


In [13]:
# same with above result using vectorization
df.select([count(when(isnan(c) | col(c).isNull(),
                c)).alias(c) for c in df.columns]).show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|           0|          5|           3|          0|      0|
+------------+-----------+------------+-----------+-------+



In [14]:
# can operate even the type of word doesn't same
df.filter(col('Sepal_Length').isNull()).count()

0

In [15]:
df.select([count(when(isnan(i) | col(i).isNull(), i)).alias(i) for i in df.columns]).show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|           0|          5|           3|          0|      0|
+------------+-----------+------------+-----------+-------+



In [16]:
df.describe().show()

24/10/24 10:41:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+------------------+------------------+---------+
|summary|      sepal_length|       sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+------------------+------------------+------------------+---------+
|  count|               150|               145|               147|               150|      150|
|   mean| 5.843333333333335| 3.055862068965518|3.7605442176870776|1.1986666666666672|     NULL|
| stddev|0.8280661279778637|0.4409390015241722| 1.781124695592811|0.7631607417008414|     NULL|
|    min|               4.3|               2.0|               1.0|               0.1|   setosa|
|    max|               7.9|               4.4|               6.9|               2.5|virginica|
+-------+------------------+------------------+------------------+------------------+---------+



In [17]:
df.describe().show(1)

+-------+------------+-----------+------------+-----------+-------+
|summary|sepal_length|sepal_width|petal_length|petal_width|species|
+-------+------------+-----------+------------+-----------+-------+
|  count|         150|        145|         147|        150|    150|
+-------+------------+-----------+------------+-----------+-------+
only showing top 1 row



In [18]:
# check the location of NULL value in 'sepal_width' column
df.where(col('SEPAL_WIDTH').isNull()).show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         4.8|       NULL|         1.4|        0.1|    setosa|
|         4.3|       NULL|         1.1|        0.1|    setosa|
|         4.8|       NULL|         1.4|        0.3|    setosa|
|         5.9|       NULL|         4.2|        1.5|versicolor|
|         6.7|       NULL|         5.0|        1.7|versicolor|
+------------+-----------+------------+-----------+----------+



In [19]:
# same result as above
df.filter(col('sepal_width').isNull()).show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         4.8|       NULL|         1.4|        0.1|    setosa|
|         4.3|       NULL|         1.1|        0.1|    setosa|
|         4.8|       NULL|         1.4|        0.3|    setosa|
|         5.9|       NULL|         4.2|        1.5|versicolor|
|         6.7|       NULL|         5.0|        1.7|versicolor|
+------------+-----------+------------+-----------+----------+



In [20]:
# show only sepal_length column instead the whole DF
df.where(col('SEPAL_WIDTH').isNull()).select('sepal_length').show()

+------------+
|sepal_length|
+------------+
|         4.8|
|         4.3|
|         4.8|
|         5.9|
|         6.7|
+------------+



In [21]:
# total row count (original)
df.count()

150

In [22]:
# total row count (after removing NULL)
df.select('sepal_width').dropna().count()

145

In [23]:
df.filter(col('sepal_width').isNull()).show()

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         4.8|       NULL|         1.4|        0.1|    setosa|
|         4.3|       NULL|         1.1|        0.1|    setosa|
|         4.8|       NULL|         1.4|        0.3|    setosa|
|         5.9|       NULL|         4.2|        1.5|versicolor|
|         6.7|       NULL|         5.0|        1.7|versicolor|
+------------+-----------+------------+-----------+----------+



In [24]:
y = df.select('sepal_width','petal_length').fillna(1)
y.select([count(when(isnan(i) | col(i).isNull(), i)) \
          .alias(i) for i in y.columns]).show()

+-----------+------------+
|sepal_width|petal_length|
+-----------+------------+
|          0|           0|
+-----------+------------+



In [25]:
z = df.fillna(1)

In [26]:
z.select([count(when(isnan(k) | col(k).isNull(), k)) \
          .alias(k) for k in z.columns]).show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|           0|          0|           0|          0|      0|
+------------+-----------+------------+-----------+-------+



In [27]:
df.corr('sepal_length', 'sepal_width')

0.026596550288657865

In [28]:
a = z.drop('species')
a.show(5)

+------------+-----------+------------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|
+------------+-----------+------------+-----------+
|         5.1|        3.5|         1.4|        0.2|
|         4.9|        3.0|         1.4|        0.2|
|         4.7|        3.2|         1.3|        0.2|
|         4.6|        3.1|         1.5|        0.2|
|         5.0|        3.6|         1.4|        0.2|
+------------+-----------+------------+-----------+
only showing top 5 rows



In [29]:
# RDD - Resilient Distributed Dataset
features = a.rdd.map(lambda row: row[0:])
features

PythonRDD[177] at RDD at PythonRDD.scala:53

In [30]:
corr_matrix = Statistics.corr(features, method='pearson')
corr_matrix

                                                                                

array([[ 1.        , -0.0059379 ,  0.85314895,  0.81795363],
       [-0.0059379 ,  1.        , -0.21291876, -0.20042214],
       [ 0.85314895, -0.21291876,  1.        ,  0.94337041],
       [ 0.81795363, -0.20042214,  0.94337041,  1.        ]])

In [31]:
corr_df = pd.DataFrame(corr_matrix)
corr_df

Unnamed: 0,0,1,2,3
0,1.0,-0.005938,0.853149,0.817954
1,-0.005938,1.0,-0.212919,-0.200422
2,0.853149,-0.212919,1.0,0.94337
3,0.817954,-0.200422,0.94337,1.0


In [35]:
corr_df.index

RangeIndex(start=0, stop=4, step=1)

In [36]:
corr_df.columns = a.columns
a.columns

['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

In [37]:
corr_df

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width
0,1.0,-0.005938,0.853149,0.817954
1,-0.005938,1.0,-0.212919,-0.200422
2,0.853149,-0.212919,1.0,0.94337
3,0.817954,-0.200422,0.94337,1.0


In [38]:
corr_df.index, corr_df.columns = a.columns, a.columns
corr_df

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width
sepal_length,1.0,-0.005938,0.853149,0.817954
sepal_width,-0.005938,1.0,-0.212919,-0.200422
petal_length,0.853149,-0.212919,1.0,0.94337
petal_width,0.817954,-0.200422,0.94337,1.0
