In [1]:
!pip install pyspark



### PySpark Handling Missing Values
* Dropping Columns
* Dropping Rows
* Various Parameter in Dropping functionalities
* Handling Missing values by Mean

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession         # This is how we start spark session.
spark = SparkSession.builder.appName('Practise').getOrCreate()      # Starting the session with app name 'Practise'



In [4]:
spark

In [5]:
df_pyspark = spark.read.csv('water.csv', header = True, inferSchema = True)
df_pyspark_copy = df_pyspark

In [6]:
df_pyspark.show()

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [7]:
## Dropping the Columns
df_pyspark.drop('Water').show()

+----+
|Year|
+----+
|1885|
|1886|
|1887|
|1888|
|1889|
|1890|
|1891|
|1892|
|1893|
|1894|
|1895|
|1896|
|1897|
|1898|
|1899|
|1900|
|1901|
|1902|
|1903|
|1904|
+----+
only showing top 20 rows



In [8]:
df_pyspark.na.drop().show()    # This will drop those rows where NAN or Null values are present.

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [9]:
### any = how
df_pyspark.na.drop(how = "all").show() # Suppose you have all the col values as null in one or more than one row, only then that row will get dropped.

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [10]:
### threshold
df_pyspark.na.drop(how = "any", thresh = 2).show()  # Means atleast 2 non null values should be there in a row to not get dropped

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [11]:
### subset
df_pyspark.na.drop(how = "any", subset = ['Water']).show()   # Drop the row if null value is present in 'Water' column.

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [12]:
## Filling the Missing values
df_pyspark.na.fill('Missing Values').show()     # Wherever there is a Nan or null value, it will replace it with "Missing Values"

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [13]:
## Filling the Missing values
df_pyspark.na.fill('Missing Values', 'Water').show()   # Doing the same thing but only for 'Water' column.

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [14]:
# Imputer Function in PySpark.
from pyspark.ml.feature import Imputer

imputer_mean = Imputer(
    inputCols = ['Water'],
    outputCols = ["{}_imputed".format(c) for c in ['Water']]
    ).setStrategy("mean")   # This replaces null values in the 'Water' column with mean of water column.

imputer_median = Imputer(
    inputCols = ['Water'],
    outputCols = ["{}_imputed".format(c) for c in ['Water']]
    ).setStrategy("median")   # This replaces null values in the 'Water' column with median of water column.

In [15]:
# Add imputation cols to df
imputer_median.fit(df_pyspark).transform(df_pyspark).show()   

+----+-----+-------------+
|Year|Water|Water_imputed|
+----+-----+-------------+
|1885|  356|          356|
|1886|  386|          386|
|1887|  397|          397|
|1888|  397|          397|
|1889|  413|          413|
|1890|  458|          458|
|1891|  485|          485|
|1892|  344|          344|
|1893|  390|          390|
|1894|  360|          360|
|1895|  420|          420|
|1896|  435|          435|
|1897|  439|          439|
|1898|  454|          454|
|1899|  462|          462|
|1900|  454|          454|
|1901|  469|          469|
|1902|  500|          500|
|1903|  492|          492|
|1904|  473|          473|
+----+-----+-------------+
only showing top 20 rows



## Pyspark Dataframes
* Filter Operations
* &,|,==
* ~

#### Filter Operations

In [16]:
## Finding Water Quantity less than or equal to 380.
df_pyspark.filter("Water<=380").show()

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1892|  344|
|1894|  360|
+----+-----+



In [17]:
## To do the same thing done above but showing only one column. Do this:-
df_pyspark.filter("Water<=380").select(['Year']).show()

+----+
|Year|
+----+
|1885|
|1892|
|1894|
+----+



In [18]:
# Different way to get the same output.
df_pyspark.filter(df_pyspark['Water']<= 380).show()

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1892|  344|
|1894|  360|
+----+-----+



In [22]:
# To write multiple conditions
df_pyspark.filter((df_pyspark['Water']<= 380) & (df_pyspark['Water']>= 350)).show()
df_pyspark.filter((df_pyspark['Water']<= 380) | (df_pyspark['Water']>= 350)).show()

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1894|  360|
+----+-----+

+----+-----+
|Year|Water|
+----+-----+
|1885|  356|
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1892|  344|
|1893|  390|
|1894|  360|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
+----+-----+
only showing top 20 rows



In [25]:
# Using not condition
df_pyspark.filter(~(df_pyspark['Water']<= 380)).show()   # THis will show water values greater than 380

+----+-----+
|Year|Water|
+----+-----+
|1886|  386|
|1887|  397|
|1888|  397|
|1889|  413|
|1890|  458|
|1891|  485|
|1893|  390|
|1895|  420|
|1896|  435|
|1897|  439|
|1898|  454|
|1899|  462|
|1900|  454|
|1901|  469|
|1902|  500|
|1903|  492|
|1904|  473|
|1905|  458|
|1906|  469|
|1907|  481|
+----+-----+
only showing top 20 rows



## PySpark GroupBY and Aggregate Functions.


In [26]:
df_pyspark.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Water: integer (nullable = true)



In [31]:
## GroupBy
# Note that groupBy function always work together with an Aggregate function, alone it doesn't make sense.
df_pyspark.groupBy('Water').sum().show()

+-----+---------+----------+
|Water|sum(Year)|sum(Water)|
+-----+---------+----------+
|  458|     5724|      1374|
|  481|     1907|       481|
|  613|     3911|      1226|
|  530|     1912|       530|
|  587|     9734|      2935|
|  606|     1945|       606|
|  602|     1958|       602|
|  625|     1962|       625|
|  360|     1894|       360|
|  386|     1886|       386|
|  435|     1896|       435|
|  473|     3815|       946|
|  500|     3839|      1000|
|  432|     3855|       864|
|  397|     3775|       794|
|  579|     1956|       579|
|  598|     1949|       598|
|  462|     5739|      1386|
|  564|     1951|       564|
|  609|     1944|       609|
+-----+---------+----------+
only showing top 20 rows



In [33]:
## GroupBy
df_pyspark.groupBy('Water').count().show()
df_pyspark.groupBy('Year').count().show()

+-----+-----+
|Water|count|
+-----+-----+
|  458|    3|
|  481|    1|
|  613|    2|
|  530|    1|
|  587|    5|
|  606|    1|
|  602|    1|
|  625|    1|
|  360|    1|
|  386|    1|
|  435|    1|
|  473|    2|
|  500|    2|
|  432|    2|
|  397|    2|
|  579|    1|
|  598|    1|
|  462|    3|
|  564|    1|
|  609|    1|
+-----+-----+
only showing top 20 rows

+----+-----+
|Year|count|
+----+-----+
|1959|    1|
|1896|    1|
|1903|    1|
|1888|    1|
|1924|    1|
|1892|    1|
|1889|    1|
|1927|    1|
|1955|    1|
|1890|    1|
|1908|    1|
|1925|    1|
|1886|    1|
|1961|    1|
|1942|    1|
|1939|    1|
|1944|    1|
|1899|    1|
|1902|    1|
|1922|    1|
+----+-----+
only showing top 20 rows



In [34]:
# Aggregate
df_pyspark.agg({'Year':'sum'}).show()

+---------+
|sum(Year)|
+---------+
|   151996|
+---------+

