In [66]:
import pandas as pd
df = pd.DataFrame({
    'Name': ['John', 'Jane', 'Doe','Taj', 'Ravi', 'Anu'],
    'Age': [28, 34, 45, 22, 30, 25],
    'Experience': [5, 10, 15, 2, 8, 3],
    'Salary': [50000, 60000, 70000, 40000, 55000, 45000],
})
df.to_csv('dataframe2.csv', index=False)

In [67]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Practise_Dataframe").getOrCreate()

In [68]:
df_pyspark = spark.read.csv('dataframe2.csv', header=True, inferSchema=True)
df_pyspark.show()

+----+----+----------+-------+
|Name| Age|Experience| Salary|
+----+----+----------+-------+
|John|28.0|       5.0|50000.0|
|Jane|34.0|      10.0|60000.0|
| Doe|45.0|      15.0|70000.0|
| Taj|22.0|       2.0|40000.0|
|Ravi|30.0|       8.0|55000.0|
| Anu|25.0|       3.0|45000.0|
|NULL|NULL|       5.0|30000.0|
|NULL|18.0|       1.0|12000.0|
| Sam|40.0|      NULL|80000.0|
|NULL|NULL|      NULL|   NULL|
+----+----+----------+-------+



In [69]:
df_pyspark.na.drop().show()
# by default how='any'

+----+----+----------+-------+
|Name| Age|Experience| Salary|
+----+----+----------+-------+
|John|28.0|       5.0|50000.0|
|Jane|34.0|      10.0|60000.0|
| Doe|45.0|      15.0|70000.0|
| Taj|22.0|       2.0|40000.0|
|Ravi|30.0|       8.0|55000.0|
| Anu|25.0|       3.0|45000.0|
+----+----+----------+-------+



In [70]:
df_pyspark.na.drop(how='all').show()

+----+----+----------+-------+
|Name| Age|Experience| Salary|
+----+----+----------+-------+
|John|28.0|       5.0|50000.0|
|Jane|34.0|      10.0|60000.0|
| Doe|45.0|      15.0|70000.0|
| Taj|22.0|       2.0|40000.0|
|Ravi|30.0|       8.0|55000.0|
| Anu|25.0|       3.0|45000.0|
|NULL|NULL|       5.0|30000.0|
|NULL|18.0|       1.0|12000.0|
| Sam|40.0|      NULL|80000.0|
+----+----+----------+-------+



thresh means atleast non null values should be present i that row

In [71]:
df_pyspark.na.drop(how='any',thresh=2).show() 

+----+----+----------+-------+
|Name| Age|Experience| Salary|
+----+----+----------+-------+
|John|28.0|       5.0|50000.0|
|Jane|34.0|      10.0|60000.0|
| Doe|45.0|      15.0|70000.0|
| Taj|22.0|       2.0|40000.0|
|Ravi|30.0|       8.0|55000.0|
| Anu|25.0|       3.0|45000.0|
|NULL|NULL|       5.0|30000.0|
|NULL|18.0|       1.0|12000.0|
| Sam|40.0|      NULL|80000.0|
+----+----+----------+-------+



In [72]:
df_pyspark.na.drop(how='any',subset='Experience').show() 

+----+----+----------+-------+
|Name| Age|Experience| Salary|
+----+----+----------+-------+
|John|28.0|       5.0|50000.0|
|Jane|34.0|      10.0|60000.0|
| Doe|45.0|      15.0|70000.0|
| Taj|22.0|       2.0|40000.0|
|Ravi|30.0|       8.0|55000.0|
| Anu|25.0|       3.0|45000.0|
|NULL|NULL|       5.0|30000.0|
|NULL|18.0|       1.0|12000.0|
+----+----+----------+-------+



In [73]:
df_pyspark.na.fill('-').show()
# df_pyspark.na.fill('-',['Name', 'Age', 'Salary']).show()

+----+----+----------+-------+
|Name| Age|Experience| Salary|
+----+----+----------+-------+
|John|28.0|       5.0|50000.0|
|Jane|34.0|      10.0|60000.0|
| Doe|45.0|      15.0|70000.0|
| Taj|22.0|       2.0|40000.0|
|Ravi|30.0|       8.0|55000.0|
| Anu|25.0|       3.0|45000.0|
|   -|NULL|       5.0|30000.0|
|   -|18.0|       1.0|12000.0|
| Sam|40.0|      NULL|80000.0|
|   -|NULL|      NULL|   NULL|
+----+----+----------+-------+



In [74]:
df_pyspark.na.fill({
    'Name': 'Unknown',
    'Age': 0,
    'Experience': 0,
    'Salary': 0
}).show()

+-------+----+----------+-------+
|   Name| Age|Experience| Salary|
+-------+----+----------+-------+
|   John|28.0|       5.0|50000.0|
|   Jane|34.0|      10.0|60000.0|
|    Doe|45.0|      15.0|70000.0|
|    Taj|22.0|       2.0|40000.0|
|   Ravi|30.0|       8.0|55000.0|
|    Anu|25.0|       3.0|45000.0|
|Unknown| 0.0|       5.0|30000.0|
|Unknown|18.0|       1.0|12000.0|
|    Sam|40.0|       0.0|80000.0|
|Unknown| 0.0|       0.0|    0.0|
+-------+----+----------+-------+



In [75]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(col) for col in ['Age', 'Experience', 'Salary']]
    ).setStrategy('mean')

In [76]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----+----+----------+-------+-----------+------------------+-----------------+
|Name| Age|Experience| Salary|Age_imputed|Experience_imputed|   Salary_imputed|
+----+----+----------+-------+-----------+------------------+-----------------+
|John|28.0|       5.0|50000.0|       28.0|               5.0|          50000.0|
|Jane|34.0|      10.0|60000.0|       34.0|              10.0|          60000.0|
| Doe|45.0|      15.0|70000.0|       45.0|              15.0|          70000.0|
| Taj|22.0|       2.0|40000.0|       22.0|               2.0|          40000.0|
|Ravi|30.0|       8.0|55000.0|       30.0|               8.0|          55000.0|
| Anu|25.0|       3.0|45000.0|       25.0|               3.0|          45000.0|
|NULL|NULL|       5.0|30000.0|      30.25|               5.0|          30000.0|
|NULL|18.0|       1.0|12000.0|       18.0|               1.0|          12000.0|
| Sam|40.0|      NULL|80000.0|       40.0|             6.125|          80000.0|
|NULL|NULL|      NULL|   NULL|      30.2

25/06/27 04:01:42 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 902157 ms exceeds timeout 120000 ms
25/06/27 04:01:42 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/27 04:01:50 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at o