#Removing Duplicate Data, in Rows And Columns

In [17]:
from pyspark.sql import Row,SQLContext
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

sqlContext = SQLContext(sc)



In [45]:
spark = SparkSession.builder.appName("Data Cleaning Using Spark").getOrCreate()

In [18]:
df_dup = sc.parallelize([Row (server_name = "101 Server", cpu_utilization = 85, session_count = 80),\
                         Row (server_name = "101 Server", cpu_utilization = 80, session_count = 90), \
                         Row (server_name = "102 Server", cpu_utilization = 85, session_count = 80), \
                         Row (server_name = "102 Server", cpu_utilization = 85, session_count = 80)]).toDF()

In [19]:
df_dup.show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 Server|             85|           80|
| 101 Server|             80|           90|
| 102 Server|             85|           80|
| 102 Server|             85|           80|
+-----------+---------------+-------------+



In [20]:
#Removing duplicates in row data
df_dup.drop_duplicates().show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 Server|             85|           80|
| 101 Server|             80|           90|
| 102 Server|             85|           80|
+-----------+---------------+-------------+



In [23]:
#Removing duplicates in column data
df_dup.drop_duplicates(["server_name"]).show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 Server|             85|           80|
| 102 Server|             85|           80|
+-----------+---------------+-------------+



#Working With NAs


In [43]:
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

In [28]:
 df = sc.parallelize([Row (server_name = "101 Server", cpu_utilization = 85, session_count = 80),\
                         Row (server_name = "101 Server", cpu_utilization = 80, session_count = 90), \
                         Row (server_name = "102 Server", cpu_utilization = 85, session_count = 80), \
                         Row (server_name = "102 Server", cpu_utilization = 85, session_count = 80)]).toDF()

In [29]:
df.show()

+-----------+---------------+-------------+
|server_name|cpu_utilization|session_count|
+-----------+---------------+-------------+
| 101 Server|             85|           80|
| 101 Server|             80|           90|
| 102 Server|             85|           80|
| 102 Server|             85|           80|
+-----------+---------------+-------------+



In [31]:
#Add an na or None column to the dataframe
df_na = df.withColumn('na_col',lit(None).cast(StringType()))

In [32]:
df_na.show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|  NULL|
| 101 Server|             80|           90|  NULL|
| 102 Server|             85|           80|  NULL|
| 102 Server|             85|           80|  NULL|
+-----------+---------------+-------------+------+



In [33]:
#fill the na values or None values with A
df_na.fillna('A').show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|     A|
| 101 Server|             80|           90|     A|
| 102 Server|             85|           80|     A|
| 102 Server|             85|           80|     A|
+-----------+---------------+-------------+------+



In [34]:

df2 = df_na.fillna('A').union(df_na)

In [35]:
df2.show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|     A|
| 101 Server|             80|           90|     A|
| 102 Server|             85|           80|     A|
| 102 Server|             85|           80|     A|
| 101 Server|             85|           80|  NULL|
| 101 Server|             80|           90|  NULL|
| 102 Server|             85|           80|  NULL|
| 102 Server|             85|           80|  NULL|
+-----------+---------------+-------------+------+



In [38]:
df2.na.drop().show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|     A|
| 101 Server|             80|           90|     A|
| 102 Server|             85|           80|     A|
| 102 Server|             85|           80|     A|
+-----------+---------------+-------------+------+



# **Lets do the same thing with SQL**

In [41]:
df2.createOrReplaceTempView("na_table")

In [46]:
spark.sql("SELECT * FROM na_table").show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|     A|
| 101 Server|             80|           90|     A|
| 102 Server|             85|           80|     A|
| 102 Server|             85|           80|     A|
| 101 Server|             85|           80|  NULL|
| 101 Server|             80|           90|  NULL|
| 102 Server|             85|           80|  NULL|
| 102 Server|             85|           80|  NULL|
+-----------+---------------+-------------+------+



In [47]:
spark.sql("SELECT *FROM na_table WHERE na_col IS NULL").show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|  NULL|
| 101 Server|             80|           90|  NULL|
| 102 Server|             85|           80|  NULL|
| 102 Server|             85|           80|  NULL|
+-----------+---------------+-------------+------+



In [48]:
spark.sql("SELECT * FROM na_table WHERE na_col IS NOT NULL ").show()

+-----------+---------------+-------------+------+
|server_name|cpu_utilization|session_count|na_col|
+-----------+---------------+-------------+------+
| 101 Server|             85|           80|     A|
| 101 Server|             80|           90|     A|
| 102 Server|             85|           80|     A|
| 102 Server|             85|           80|     A|
+-----------+---------------+-------------+------+

