In this Tutorial we will cover  

* pyspark dataframe  
* reading the dataset   
* checking the datatypes of the columns  
* selecting columns and indexing  
* check describe option similar to Pandas   
* adding columns   
* Dropping columns   

In [1]:
import pyspark


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [4]:
spark

In [8]:
## read the dataset
df_spark =  spark.read.option('header','true').csv('Book1.csv', inferSchema=True)

In [9]:
df_spark.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
|  Abhi| 26|        10|
|  Neha| 27|         8|
|Milind| 27|         4|
+------+---+----------+



In [10]:
## Check the schema
df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [11]:
df_spark = spark.read.csv('Book1.csv', header=True,inferSchema=True)

In [12]:
df_spark.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
|  Abhi| 26|        10|
|  Neha| 27|         8|
|Milind| 27|         4|
+------+---+----------+



In [13]:
#data frame is one type of data structure

In [14]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [15]:
df_spark.columns

['Name', 'Age', 'Experience']

In [16]:
df_spark.head(2)

[Row(Name='Abhi', Age=26, Experience=10),
 Row(Name='Neha', Age=27, Experience=8)]

In [17]:
df_spark.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
|  Abhi| 26|        10|
|  Neha| 27|         8|
|Milind| 27|         4|
+------+---+----------+



In [21]:
#selecting "Name" column
Name_col = df_spark.select('Name')
Name_col.show()

+------+
|  Name|
+------+
|  Abhi|
|  Neha|
|Milind|
+------+



In [23]:
NameExe_col = df_spark.select(['Name','Experience'])
NameExe_col.show()

+------+----------+
|  Name|Experience|
+------+----------+
|  Abhi|        10|
|  Neha|         8|
|Milind|         4|
+------+----------+



In [24]:
df_spark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [28]:
df_spark.describe().show()

+-------+----+------------------+-----------------+
|summary|Name|               Age|       Experience|
+-------+----+------------------+-----------------+
|  count|   3|                 3|                3|
|   mean|null|26.666666666666668|7.333333333333333|
| stddev|null|0.5773502691896258|3.055050463303893|
|    min|Abhi|                26|                4|
|    max|Neha|                27|               10|
+-------+----+------------------+-----------------+



In [31]:
##Adding columns in pyspark data frame
df_spark = df_spark.withColumn('Experience After 2 yrs',df_spark["Experience"]+2)

In [32]:
df_spark.show()

+------+---+----------+----------------------+
|  Name|Age|Experience|Experience After 2 yrs|
+------+---+----------+----------------------+
|  Abhi| 26|        10|                    12|
|  Neha| 27|         8|                    10|
|Milind| 27|         4|                     6|
+------+---+----------+----------------------+



In [33]:
# Drop the columns
df_spark = df_spark.drop('Experience After 2 yrs')

In [34]:
df_spark.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
|  Abhi| 26|        10|
|  Neha| 27|         8|
|Milind| 27|         4|
+------+---+----------+



In [35]:
##Rename the columns
df_spark = df_spark.withColumnRenamed('Name','Nav')

In [36]:
df_spark.show()

+------+---+----------+
|   Nav|Age|Experience|
+------+---+----------+
|  Abhi| 26|        10|
|  Neha| 27|         8|
|Milind| 27|         4|
+------+---+----------+



Pyspark Handling Missing Values   
* Dropping columns   
* Dropping Rows   
* Various Parameter in Dropping functionalities   
* Handling Missing values by Mean , median and mode

In [39]:
df2_spark = spark.read.csv('Book2.csv', header=True,inferSchema=True)
df2_spark.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|   Abhi|  26|        10| 30000|
|   Neha|  27|         8| 40000|
| Milind|  27|         4| 30000|
|   Minu|  25|         6| 50000|
|   Paul|  23|         8| 30000|
| Harsha|  25|         5| 60000|
|Shubham|  25|         4| 20000|
| Mahesh|null|      null|100000|
|   null|  34|        10| 38000|
|   null|  36|      null|  null|
+-------+----+----------+------+



In [40]:
## Drop the cols
df2_spark.drop('Name').show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  26|        10| 30000|
|  27|         8| 40000|
|  27|         4| 30000|
|  25|         6| 50000|
|  23|         8| 30000|
|  25|         5| 60000|
|  25|         4| 20000|
|null|      null|100000|
|  34|        10| 38000|
|  36|      null|  null|
+----+----------+------+



In [41]:
df2_spark.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|   Abhi|  26|        10| 30000|
|   Neha|  27|         8| 40000|
| Milind|  27|         4| 30000|
|   Minu|  25|         6| 50000|
|   Paul|  23|         8| 30000|
| Harsha|  25|         5| 60000|
|Shubham|  25|         4| 20000|
| Mahesh|null|      null|100000|
|   null|  34|        10| 38000|
|   null|  36|      null|  null|
+-------+----+----------+------+



In [43]:
df2_spark.na.drop().show() #by defalut how == any

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   Abhi| 26|        10| 30000|
|   Neha| 27|         8| 40000|
| Milind| 27|         4| 30000|
|   Minu| 25|         6| 50000|
|   Paul| 23|         8| 30000|
| Harsha| 25|         5| 60000|
|Shubham| 25|         4| 20000|
+-------+---+----------+------+



In [46]:
# any == how
df2_spark.na.drop(how="all").show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|   Abhi|  26|        10| 30000|
|   Neha|  27|         8| 40000|
| Milind|  27|         4| 30000|
|   Minu|  25|         6| 50000|
|   Paul|  23|         8| 30000|
| Harsha|  25|         5| 60000|
|Shubham|  25|         4| 20000|
| Mahesh|null|      null|100000|
|   null|  34|        10| 38000|
|   null|  36|      null|  null|
+-------+----+----------+------+



In [49]:
df2_spark.na.drop(how="any", thresh=2).show()  # atleast 2 not null values

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|   Abhi|  26|        10| 30000|
|   Neha|  27|         8| 40000|
| Milind|  27|         4| 30000|
|   Minu|  25|         6| 50000|
|   Paul|  23|         8| 30000|
| Harsha|  25|         5| 60000|
|Shubham|  25|         4| 20000|
| Mahesh|null|      null|100000|
|   null|  34|        10| 38000|
+-------+----+----------+------+



In [50]:
df2_spark.na.drop(how="any", thresh=3).show()  # atleast 2 not null values

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   Abhi| 26|        10| 30000|
|   Neha| 27|         8| 40000|
| Milind| 27|         4| 30000|
|   Minu| 25|         6| 50000|
|   Paul| 23|         8| 30000|
| Harsha| 25|         5| 60000|
|Shubham| 25|         4| 20000|
|   null| 34|        10| 38000|
+-------+---+----------+------+



In [52]:
# Subset
df2_spark.na.drop(how='any',subset=['Experience']).show() #Drop nan values form exp col

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   Abhi| 26|        10| 30000|
|   Neha| 27|         8| 40000|
| Milind| 27|         4| 30000|
|   Minu| 25|         6| 50000|
|   Paul| 23|         8| 30000|
| Harsha| 25|         5| 60000|
|Shubham| 25|         4| 20000|
|   null| 34|        10| 38000|
+-------+---+----------+------+



In [55]:
df2_spark.na.fill('Missing Values').show()

+--------------+----+----------+------+
|          Name| Age|Experience|Salary|
+--------------+----+----------+------+
|          Abhi|  26|        10| 30000|
|          Neha|  27|         8| 40000|
|        Milind|  27|         4| 30000|
|          Minu|  25|         6| 50000|
|          Paul|  23|         8| 30000|
|        Harsha|  25|         5| 60000|
|       Shubham|  25|         4| 20000|
|        Mahesh|null|      null|100000|
|Missing Values|  34|        10| 38000|
|Missing Values|  36|      null|  null|
+--------------+----+----------+------+



In [56]:
df2_spark.show()

+-------+----+----------+------+
|   Name| Age|Experience|Salary|
+-------+----+----------+------+
|   Abhi|  26|        10| 30000|
|   Neha|  27|         8| 40000|
| Milind|  27|         4| 30000|
|   Minu|  25|         6| 50000|
|   Paul|  23|         8| 30000|
| Harsha|  25|         5| 60000|
|Shubham|  25|         4| 20000|
| Mahesh|null|      null|100000|
|   null|  34|        10| 38000|
|   null|  36|      null|  null|
+-------+----+----------+------+



In [57]:
from pyspark.ml.feature import Imputer

In [58]:
imputer = Imputer( inputCols=['Age','Experience','Salary'], 
                  outputCols=["{}_imputed".format(c) for c in ['Age','Experience','Salary']]
                 ).setStrategy('mean')

In [59]:
imputer.fit(df2_spark).transform(df2_spark).show()

+-------+----+----------+------+-----------+------------------+--------------+
|   Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+-------+----+----------+------+-----------+------------------+--------------+
|   Abhi|  26|        10| 30000|         26|                10|         30000|
|   Neha|  27|         8| 40000|         27|                 8|         40000|
| Milind|  27|         4| 30000|         27|                 4|         30000|
|   Minu|  25|         6| 50000|         25|                 6|         50000|
|   Paul|  23|         8| 30000|         23|                 8|         30000|
| Harsha|  25|         5| 60000|         25|                 5|         60000|
|Shubham|  25|         4| 20000|         25|                 4|         20000|
| Mahesh|null|      null|100000|         27|                 6|        100000|
|   null|  34|        10| 38000|         34|                10|         38000|
|   null|  36|      null|  null|         36|        

In [60]:
imputer = Imputer( inputCols=['Age','Experience','Salary'], 
                  outputCols=['Age','Experience','Salary']
                 ).setStrategy('mean')

In [61]:
imputer.fit(df2_spark).transform(df2_spark).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   Abhi| 26|        10| 30000|
|   Neha| 27|         8| 40000|
| Milind| 27|         4| 30000|
|   Minu| 25|         6| 50000|
|   Paul| 23|         8| 30000|
| Harsha| 25|         5| 60000|
|Shubham| 25|         4| 20000|
| Mahesh| 27|         6|100000|
|   null| 34|        10| 38000|
|   null| 36|         6| 44222|
+-------+---+----------+------+

