# Part-1

__In this tutorial, we will cover__
* PySpark dataframe
* Reading the dataset
* Checking the datatypes of the column (schema)
* Selecting columns and indexing
* Check describe option similar to pandas
* Adding columns
* Dropping columns
* Renaming columns

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

24/03/16 16:07:04 WARN Utils: Your hostname, htetaunglynn resolves to a loopback address: 127.0.1.1; using 192.168.61.214 instead (on interface wlo1)
24/03/16 16:07:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/16 16:07:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/16 16:07:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark

In [4]:
# read dataset
df_ps = spark.read.option('header', 'true').csv('test1.csv')
df_ps.show()

+----+---+----------+
|name|age|experience|
+----+---+----------+
|Htet| 21|         5|
|Aung| 29|         4|
|Lynn| 30|         9|
+----+---+----------+



In [5]:
# check the schema
df_ps.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience: string (nullable = true)



The schema resulting string type. Need to add parameter `inferSchema=True` for proper schema.

In [6]:
df_ps = spark.read.option('header', 'true').csv('test1.csv', inferSchema=True)
df_ps.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [7]:
df_ps = spark.read.csv('test1.csv', header=True, inferSchema=True)
df_ps.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: integer (nullable = true)



In [8]:
type(df_ps)

pyspark.sql.dataframe.DataFrame

**Dataframe is a datastructure.**

In [9]:
df_ps.columns

['name', 'age', 'experience']

In [10]:
df_ps.head(3)    # list format

[Row(name='Htet', age=21, experience=5),
 Row(name='Aung', age=29, experience=4),
 Row(name='Lynn', age=30, experience=9)]

In [11]:
df_ps.select('name')

DataFrame[name: string]

In [12]:
df_ps.select('name').show()

+----+
|name|
+----+
|Htet|
|Aung|
|Lynn|
+----+



In [13]:
type(df_ps.select('name'))

pyspark.sql.dataframe.DataFrame

In [14]:
df_ps.select(['name','experience']).show()

+----+----------+
|name|experience|
+----+----------+
|Htet|         5|
|Aung|         4|
|Lynn|         9|
+----+----------+



In [15]:
df_ps['name']       # .show() not work

Column<'name'>

In [16]:
df_ps.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int')]

In [17]:
df_ps.describe().show()
# taking string column as well

+-------+----+------------------+------------------+
|summary|name|               age|        experience|
+-------+----+------------------+------------------+
|  count|   3|                 3|                 3|
|   mean|NULL|26.666666666666668|               6.0|
| stddev|NULL| 4.932882862316247|2.6457513110645907|
|    min|Aung|                21|                 4|
|    max|Lynn|                30|                 9|
+-------+----+------------------+------------------+



In [18]:
# Adding coolumns in dataframe
df_ps = df_ps.withColumn('experience after 2 year', df_ps['experience']+2)
df_ps.show()

+----+---+----------+-----------------------+
|name|age|experience|experience after 2 year|
+----+---+----------+-----------------------+
|Htet| 21|         5|                      7|
|Aung| 29|         4|                      6|
|Lynn| 30|         9|                     11|
+----+---+----------+-----------------------+



In [19]:
# Drop columns
df_ps = df_ps.drop('experience after 2 year')
df_ps.show()

+----+---+----------+
|name|age|experience|
+----+---+----------+
|Htet| 21|         5|
|Aung| 29|         4|
|Lynn| 30|         9|
+----+---+----------+



In [21]:
# Rename columns
df_ps.withColumnRenamed('name','new_name').show()

+--------+---+----------+
|new_name|age|experience|
+--------+---+----------+
|    Htet| 21|         5|
|    Aung| 29|         4|
|    Lynn| 30|         9|
+--------+---+----------+



# Part-2

### Pyspark Handling Missing Values

* Dropping columns
* Dropping rows
* Various parameter in dropping functionalities
* Handling missing values by mean

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practise').getOrCreate()

24/03/18 12:56:04 WARN Utils: Your hostname, htetaunglynn resolves to a loopback address: 127.0.1.1; using 192.168.61.214 instead (on interface wlo1)
24/03/18 12:56:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/18 12:56:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.csv('test1.csv', header=True, inferSchema=True)
df.show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|   Htet|  21|         5| 600000|
|   Aung|  29|         4| 300000|
|   Lynn|  30|         9|1000000|
|    Zaw|  25|         5| 400000|
|     Su|  31|         8| 800000|
|  Thida|  26|         7|1200000|
|  Maung|NULL|      NULL| 200000|
|   NULL|  26|         3| 400000|
|    May|  28|         5| 600000|
|Thawdar|  32|         7|   NULL|
|   NULL|  35|      NULL|   NULL|
+-------+----+----------+-------+



In [3]:
# drop columns
df.drop('name').show()

+----+----------+-------+
| age|experience| salary|
+----+----------+-------+
|  21|         5| 600000|
|  29|         4| 300000|
|  30|         9|1000000|
|  25|         5| 400000|
|  31|         8| 800000|
|  26|         7|1200000|
|NULL|      NULL| 200000|
|  26|         3| 400000|
|  28|         5| 600000|
|  32|         7|   NULL|
|  35|      NULL|   NULL|
+----+----------+-------+



In [4]:
# drop null included row
df.na.drop().show()

+-----+---+----------+-------+
| name|age|experience| salary|
+-----+---+----------+-------+
| Htet| 21|         5| 600000|
| Aung| 29|         4| 300000|
| Lynn| 30|         9|1000000|
|  Zaw| 25|         5| 400000|
|   Su| 31|         8| 800000|
|Thida| 26|         7|1200000|
|  May| 28|         5| 600000|
+-----+---+----------+-------+



In [5]:
# how == any
df.na.drop(how='all').show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|   Htet|  21|         5| 600000|
|   Aung|  29|         4| 300000|
|   Lynn|  30|         9|1000000|
|    Zaw|  25|         5| 400000|
|     Su|  31|         8| 800000|
|  Thida|  26|         7|1200000|
|  Maung|NULL|      NULL| 200000|
|   NULL|  26|         3| 400000|
|    May|  28|         5| 600000|
|Thawdar|  32|         7|   NULL|
|   NULL|  35|      NULL|   NULL|
+-------+----+----------+-------+



In [6]:
# threshold
df.na.drop(how='any', thresh=2).show()
# the last row is dropped due to above 2 NULL values

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|   Htet|  21|         5| 600000|
|   Aung|  29|         4| 300000|
|   Lynn|  30|         9|1000000|
|    Zaw|  25|         5| 400000|
|     Su|  31|         8| 800000|
|  Thida|  26|         7|1200000|
|  Maung|NULL|      NULL| 200000|
|   NULL|  26|         3| 400000|
|    May|  28|         5| 600000|
|Thawdar|  32|         7|   NULL|
+-------+----+----------+-------+



In [7]:
df.na.drop(how='any', thresh=3).show()

+-------+---+----------+-------+
|   name|age|experience| salary|
+-------+---+----------+-------+
|   Htet| 21|         5| 600000|
|   Aung| 29|         4| 300000|
|   Lynn| 30|         9|1000000|
|    Zaw| 25|         5| 400000|
|     Su| 31|         8| 800000|
|  Thida| 26|         7|1200000|
|   NULL| 26|         3| 400000|
|    May| 28|         5| 600000|
|Thawdar| 32|         7|   NULL|
+-------+---+----------+-------+



In [8]:
# subset
df.na.drop(how='any', subset=['Experience']).show()

+-------+---+----------+-------+
|   name|age|experience| salary|
+-------+---+----------+-------+
|   Htet| 21|         5| 600000|
|   Aung| 29|         4| 300000|
|   Lynn| 30|         9|1000000|
|    Zaw| 25|         5| 400000|
|     Su| 31|         8| 800000|
|  Thida| 26|         7|1200000|
|   NULL| 26|         3| 400000|
|    May| 28|         5| 600000|
|Thawdar| 32|         7|   NULL|
+-------+---+----------+-------+



In [15]:
df.dtypes

[('name', 'string'), ('age', 'int'), ('experience', 'int'), ('salary', 'int')]

In [9]:
# fill missing value
df.na.fill(value='missing value').show()
# only fill for string column

+-------------+----+----------+-------+
|         name| age|experience| salary|
+-------------+----+----------+-------+
|         Htet|  21|         5| 600000|
|         Aung|  29|         4| 300000|
|         Lynn|  30|         9|1000000|
|          Zaw|  25|         5| 400000|
|           Su|  31|         8| 800000|
|        Thida|  26|         7|1200000|
|        Maung|NULL|      NULL| 200000|
|missing value|  26|         3| 400000|
|          May|  28|         5| 600000|
|      Thawdar|  32|         7|   NULL|
|missing value|  35|      NULL|   NULL|
+-------------+----+----------+-------+



In [17]:
df.na.fill(value=0).show()
# fill for all integer columns

+-------+---+----------+-------+
|   name|age|experience| salary|
+-------+---+----------+-------+
|   Htet| 21|         5| 600000|
|   Aung| 29|         4| 300000|
|   Lynn| 30|         9|1000000|
|    Zaw| 25|         5| 400000|
|     Su| 31|         8| 800000|
|  Thida| 26|         7|1200000|
|  Maung|  0|         0| 200000|
|   NULL| 26|         3| 400000|
|    May| 28|         5| 600000|
|Thawdar| 32|         7|      0|
|   NULL| 35|         0|      0|
+-------+---+----------+-------+



In [25]:
# Handling missing values
from pyspark.ml.feature import Imputer

# Imputed with mean value
imputer = Imputer(inputCols = ['age','experience','salary'],
                  outputCols  = ["{}_imputed".format(c) for c in ['age','experience','salary']]).setStrategy('mean')

# Add imputation cols to df
imputer.fit(df).transform(df).show()

+-------+----+----------+-------+-----------+------------------+--------------+
|   name| age|experience| salary|age_imputed|experience_imputed|salary_imputed|
+-------+----+----------+-------+-----------+------------------+--------------+
|   Htet|  21|         5| 600000|         21|                 5|        600000|
|   Aung|  29|         4| 300000|         29|                 4|        300000|
|   Lynn|  30|         9|1000000|         30|                 9|       1000000|
|    Zaw|  25|         5| 400000|         25|                 5|        400000|
|     Su|  31|         8| 800000|         31|                 8|        800000|
|  Thida|  26|         7|1200000|         26|                 7|       1200000|
|  Maung|NULL|      NULL| 200000|         28|                 5|        200000|
|   NULL|  26|         3| 400000|         26|                 3|        400000|
|    May|  28|         5| 600000|         28|                 5|        600000|
|Thawdar|  32|         7|   NULL|       

In [26]:
# Imputed with median value
imputer = Imputer(inputCols = ['age','experience','salary'],
                  outputCols  = ["{}_imputed".format(c) for c in ['age','experience','salary']]).setStrategy('median')

# Add imputation cols to df
imputer.fit(df).transform(df).show()

+-------+----+----------+-------+-----------+------------------+--------------+
|   name| age|experience| salary|age_imputed|experience_imputed|salary_imputed|
+-------+----+----------+-------+-----------+------------------+--------------+
|   Htet|  21|         5| 600000|         21|                 5|        600000|
|   Aung|  29|         4| 300000|         29|                 4|        300000|
|   Lynn|  30|         9|1000000|         30|                 9|       1000000|
|    Zaw|  25|         5| 400000|         25|                 5|        400000|
|     Su|  31|         8| 800000|         31|                 8|        800000|
|  Thida|  26|         7|1200000|         26|                 7|       1200000|
|  Maung|NULL|      NULL| 200000|         28|                 5|        200000|
|   NULL|  26|         3| 400000|         26|                 3|        400000|
|    May|  28|         5| 600000|         28|                 5|        600000|
|Thawdar|  32|         7|   NULL|       