# Missing Values

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('MissingValue').getOrCreate()

In [3]:
df = spark.read.csv('data/ContainsNull.csv', header = True, inferSchema = True)

In [4]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



* us .na method to drop, fill or do something about the missing data

In [5]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



* we can also specify the threshold (number of nulls in a row)

In [6]:
df.na.drop(thresh = 2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



* default -> df.na.drop(how = 'any').show()       # remove rows which have any null values

In [7]:
df.na.drop(how = 'all').show()    # remove rows which have all null values

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



* we can also specify list of column names to consider to remove null values

In [8]:
df.na.drop(subset = ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [9]:
df.na.drop(subset = ['Sales', 'Name']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



## Fill-in the missing values

* fill(value, subset)

In [10]:
df.na.fill('Fill Value').show()       # fill columns with string data type

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|Fill Value| null|
|emp3|Fill Value|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [11]:
df.na.fill(0).show()                # fill columns with float or int data type

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



* It is better to specify column names in which you want to fill the values

In [12]:
df.na.fill('Fill Value', subset = ['Name']).show() 

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|Fill Value| null|
|emp3|Fill Value|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



* fill average value

In [13]:
from pyspark.sql.functions import mean

In [14]:
mean_value = df.select(mean(df['Sales'])).collect()  #collect to get the object back

In [15]:
mean_value    # return list of row object

[Row(avg(Sales)=400.5)]

In [16]:
mean_sales = mean_value[0][0]   # getting value from first row and first column

In [17]:
df.na.fill(value = mean_sales, subset = ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



# Dates and Timestamps

In [19]:
df2 = spark.read.csv('data/appl_stock.csv', header = True, inferSchema = 'True')

In [20]:
df2.show(1)

+----------+----------+----------+------------------+----------+---------+---------+
|      Date|      Open|      High|               Low|     Close|   Volume|Adj Close|
+----------+----------+----------+------------------+----------+---------+---------+
|2010-01-04|213.429998|214.499996|212.38000099999996|214.009998|123432400|27.727039|
+----------+----------+----------+------------------+----------+---------+---------+
only showing top 1 row



In [21]:
df2.select(['Date']).show(1)

+----------+
|      Date|
+----------+
|2010-01-04|
+----------+
only showing top 1 row



In [22]:
df2.head(1)

[Row(Date='2010-01-04', Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [23]:
from pyspark.sql.functions import (dayofmonth, dayofyear,
                                  hour, month,
                                  year, weekofyear,
                                  format_number, date_format)

In [24]:
df2.select(dayofmonth(df2['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [25]:
df2.select(dayofyear(df2['Date'])).show(5)

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
+---------------+
only showing top 5 rows



In [26]:
df2.select(hour(df2['Date'])).show(5)

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 5 rows



In [27]:
df2.select(month(df2['Date'])).show(5)

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 5 rows



* Average closing price per year

In [28]:
new_df = df2.withColumn('Year', year(df2['Date']))  # create new column 'Year' where column value is the year

In [29]:
new_df.show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+----+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
+----------+----------+----------+------------------+------------------+---------+------------------+----+
only showing top 5 rows



In [30]:
result = new_df.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [31]:
result.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [32]:
result.select(['Year', format_number('avg(Close)', 2).alias("Closing Average")]).show()

+----+---------------+
|Year|Closing Average|
+----+---------------+
|2015|         120.04|
|2013|         472.63|
|2014|         295.40|
|2012|         576.05|
|2016|         104.60|
|2010|         259.84|
|2011|         364.00|
+----+---------------+

