# **Spark**

## **Setting Up Spark in Colab**

**Checking the version of Java Installed in the system**

In [1]:
!java -version

openjdk version "11.0.7" 2020-04-14
OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu218.04)
OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu218.04, mixed mode, sharing)


**Installing Java8**

In [2]:
!apt-get install openjdk-8-jdk-headless -qq> /dev/null

**Downloading Spark**

In [3]:
!wget -q http://apachemirror.wuchna.com/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz

**Installing findspark**

In [4]:
!pip install findspark



**Setting path variables for Java and Spark**

In [5]:
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

In [6]:
import findspark
findspark.init()

## **Basics of Spark DataFrames**

- Spark DataFrames are basically used to store some data.
- It does hold the data in rows and columns.
- Each column represents a feature or variable and each row represent an individual data point.
- Spark DataFrames are able to deal with various sources of data, which means it can input data and output data from a variety of wide sources like csv,json and so.
- We can perform various transformations on data and collect the results to visualize, or record for some other processing.
- In order to begin working with Spark DataFrames, we need to create a Spark Session.
- Spark Session is like a unified entry point of a spark application, it provides a way to interact with various spark funcationalities.

In [7]:
# importing SparkSession
from pyspark.sql import SparkSession

In [8]:
# Creating a SparkSession
spark = SparkSession.builder.appName('MyFirstSparkSession').getOrCreate()

- In this manner, we can create our spark session.
- We can use the session variable inside our scripts.

Inorder, to work with real data, we need to first read a dataset. 
- For this we can use the read method from spark context.
- We can also select the type of datafile we need to load, and for this, the read method has various options like csv,json and etc.

We can load a csv file present in our filesystem as ,

<code>
dataFrame_name = spark_session_variable.read.csv('filename')
</code>

For Example lets load the data churn_data_st.csv 

In [9]:
employee_df = spark.read.csv('/content/drive/My Drive/Repos/Git/Integrating Machine Learning with Big Data/PySpark/Dataset/employee.csv')

- Additional parameters such as header and inferSchema can be passed with the read.csv() method.
- The ***header*** parameter takes either a True or a False as its values and on giving it True, it would consider the first row of the dataset as its column title or header. If given false it would not consider it as header, rather would treat it as data.
- The ***inferSchema*** parameter also takes the value as either True or False. If True it would identify and assign the correct datatypes to the columns of the DataFrame based on dataset's column values, if not it would consider all the columsn as string datatype.

In [10]:
employee_df = spark.read.csv('/content/drive/My Drive/Repos/Git/Integrating Machine Learning with Big Data/PySpark/Dataset/employee.csv',header=True,inferSchema=True)

- To see our created DataFrame contents and how the data looks like by using ***show()*** method.
- Example: `df.show()`

In [11]:
employee_df.show()

+-----------+-------------+---+----------+-----+
|employee_id|employee_name|age|  location|hours|
+-----------+-------------+---+----------+-----+
|       G001|       Pichai| 47|California|   14|
|       M002|         Bill| 64|Washington|   10|
|       A003|         Jeff| 56|Washington|   11|
|       A004|         Cook| 59|California|   12|
+-----------+-------------+---+----------+-----+



- If we want to know the features or columns of the dataframe, we can get the list of columns by executing `df.columns`

In [12]:
employee_df.columns

['employee_id', 'employee_name', 'age', 'location', 'hours']

- We can also use ***df.printSchema()*** to get the column data and its data types, where ***df*** being the DataFrame that we created.

In [13]:
# Shows the datatype of the variables of the dataset
employee_df.printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- hours: integer (nullable = true)



## **Working with Rows and Columns in Spark DataFrame**

- Here, we will try to familiarize ourselves with the Spark DataFrame.
- We will try to get the rows and columns data and perform some operations on them.

### **Working with Columns in Spark**

- Using the DataFrame to get to the columns directly.
  - Inoder to get to the columns via DataFrame, we can directly pass the column name inside of the DataFrame, that is `df['column_name']`, but on accessing the column like this we can get the column object itself.

In [14]:
employee_df['age']

Column<b'age'>

- Using the select option in the DataFrame.
  - Instead of directly passing the column name as a list to the DataFrame, we can use the select method of the DataFrame by which we get the required column as a DataFrame.
  - **Syntax:** `df.select('column_name')`
  - We can also select multiple columns by passing the columns needed in the form of a list to the select function as its parameter.
  - **Syntax:** `df.select(['column1','column2'])`

In [15]:
# Selecting and Displaying a single column
employee_df.select('age').show()

+---+
|age|
+---+
| 47|
| 64|
| 56|
| 59|
+---+



In [16]:
# Selecting and Displaying Employee_name an Age
employee_df.select(['employee_name','age']).show()

+-------------+---+
|employee_name|age|
+-------------+---+
|       Pichai| 47|
|         Bill| 64|
|         Jeff| 56|
|         Cook| 59|
+-------------+---+



- Adding new Columns
  - We can add new columns using the method **withColumn()**. This function returns a new DataFrame by adding a new column or replacing the existing column if it has the same name as new column which we specify.
  - **Syntax:** `df.withColumn('new_column_name',column_iteself)`
  - Inorder to create a new column with the **withColumn()**, we pass in the first parameter as new column name and the second parameter as the column itself.

In [17]:
employee_df.show()

+-----------+-------------+---+----------+-----+
|employee_id|employee_name|age|  location|hours|
+-----------+-------------+---+----------+-----+
|       G001|       Pichai| 47|California|   14|
|       M002|         Bill| 64|Washington|   10|
|       A003|         Jeff| 56|Washington|   11|
|       A004|         Cook| 59|California|   12|
+-----------+-------------+---+----------+-----+



In [18]:
df_new = employee_df.withColumn('overtime_time',employee_df['hours'])
df_new.show()

+-----------+-------------+---+----------+-----+-------------+
|employee_id|employee_name|age|  location|hours|overtime_time|
+-----------+-------------+---+----------+-----+-------------+
|       G001|       Pichai| 47|California|   14|           14|
|       M002|         Bill| 64|Washington|   10|           10|
|       A003|         Jeff| 56|Washington|   11|           11|
|       A004|         Cook| 59|California|   12|           12|
+-----------+-------------+---+----------+-----+-------------+



- We will be able to rename columns using the **withColumnRenamed()** method.
  - This can be achieved by passing old column name as the first parameter and the new name as the second parameter.
  - **Syntax:** `df.withColumnRename('old_column_name','new_column_name')`
  - It return a new DataFrame.

In [19]:
df_new = df_new.withColumnRenamed('hours','working_hours')

In [20]:
df_new.show()

+-----------+-------------+---+----------+-------------+-------------+
|employee_id|employee_name|age|  location|working_hours|overtime_time|
+-----------+-------------+---+----------+-------------+-------------+
|       G001|       Pichai| 47|California|           14|           14|
|       M002|         Bill| 64|Washington|           10|           10|
|       A003|         Jeff| 56|Washington|           11|           11|
|       A004|         Cook| 59|California|           12|           12|
+-----------+-------------+---+----------+-------------+-------------+



- Dropping a column
  - We can drop a column using the **drop()** method.
  - **Syntax:** df_new = df.drop('column_name')

### **Working with Rows in Spark**

- Getting the Rows and its contents.
  - We can get the rows data from the DataFrame with various methods. One Such function is using the **head()** function.
  - We can use head() function to get the top most records or rows as a list. The head() function takes on one parameter and this parameter determines how many records from the top should be selected.
  - **Syntax:** `df.head(number_of_records_required)`

In [21]:
df_new.head(3)

[Row(employee_id='G001', employee_name='Pichai', age=47, location='California', working_hours=14, overtime_time=14),
 Row(employee_id='M002', employee_name='Bill', age=64, location='Washington', working_hours=10, overtime_time=10),
 Row(employee_id='A003', employee_name='Jeff', age=56, location='Washington', working_hours=11, overtime_time=11)]

### **Getting to know the statistical detail of the dataset**

- We can get a statistical report or summary of all the numerical features of the dataset by using the **describe()** method.
- **Syntax:** `df.describe().show()`

In [22]:
df_new.describe().show()

+-------+-----------+-------------+----------------+----------+-----------------+-----------------+
|summary|employee_id|employee_name|             age|  location|    working_hours|    overtime_time|
+-------+-----------+-------------+----------------+----------+-----------------+-----------------+
|  count|          4|            4|               4|         4|                4|                4|
|   mean|       null|         null|            56.5|      null|            11.75|            11.75|
| stddev|       null|         null|7.14142842854285|      null|1.707825127659933|1.707825127659933|
|    min|       A003|         Bill|              47|California|               10|               10|
|    max|       M002|       Pichai|              64|Washington|               14|               14|
+-------+-----------+-------------+----------------+----------+-----------------+-----------------+



## **Using SQL in Spark DataFrames**

- Spark Dataframes gives us the option to use the SQL queries to directly work and interact with the DataFrames.
- In order to do this, we have to first register our DataFrame as a SQL temporary view.
- We can do that by using the method **createOrReplaceTempView()** and pass in the new preferred table name as parameter.

In [23]:
employee_df.createOrReplaceTempView('associates')

- On executing this statement, we have successfully registered the DataFrame as SQL temporary view and now we are allowed to use the direct SQL queries on the newly registered view.
- We can do that by using the **sql()** method in the created spark session.
- **Syntax:** `variable = spark_session_variable.sql('SQL Query')`


In [24]:
sql_result = spark.sql('select * from associates')

- In this way we can use complex SQL Queries to get data from the dataset. So the best thing about this method is that, if we are familiar with SQL queries we can simply use Sparks SQL features to get the required data rather than using the DataFrame operations itself.

In [25]:
sql_result.show()

+-----------+-------------+---+----------+-----+
|employee_id|employee_name|age|  location|hours|
+-----------+-------------+---+----------+-----+
|       G001|       Pichai| 47|California|   14|
|       M002|         Bill| 64|Washington|   10|
|       A003|         Jeff| 56|Washington|   11|
|       A004|         Cook| 59|California|   12|
+-----------+-------------+---+----------+-----+



In [26]:
sql_result2 = spark.sql('select * from associates where age between 45 and 60 and location="California"')

In [27]:
sql_result2.show()

+-----------+-------------+---+----------+-----+
|employee_id|employee_name|age|  location|hours|
+-----------+-------------+---+----------+-----+
|       G001|       Pichai| 47|California|   14|
|       A004|         Cook| 59|California|   12|
+-----------+-------------+---+----------+-----+



## **Operations and Functions with Spark DataFrames**

- Once we have the data with us, we may have to perform several operations such as filter, etc.

### **Filter Method**

- This method lets us filter our some data from this dataset based on some conditions.
- In order to apply this method, we can simply use the **filter()** method on our DataFrame. We can use the filter() method in various ways.
  - **filter() method using direct SQL type Syntax.**
    - We can directly key in the sql like conditions within the filter() method as parameter and it does the filtering of data for us.
    - ***Example:*** Suppose we want to get all the items whose total amount is greater than 1000, we can directly pass this condition, in this format- `df.filter('total_amount>1000').show()`
  - **filter() method using DataFrame Objects.**
    - We can perform a similar operation as seen in the above example for filter() by using the DataFrame Objects instead of SQL like syntax. That is we can fetch the columns using the DataFrame and by using comparison operators available in python to get the same result.
    - ***Example:*** `df.filter(df['total_amount']>1000).show()`
  - **filter() method based on multiple conditions**
    - We can perform the filter option based on multiple conditions as well.
    - In order to do that we have to seperate the multiple conditons using an ampersand(&) operator, wherein each condition is enclosed within a set of parenthesis.
    - ***Example:*** Supposing we want to see all the items whose item price is greater than 250 and tax is lesser than 25, we can incorporate multiple conditions within filter as shown below.
    - `df.filter((df['item_price']>250)&(df['tax']<25)).show()`
- In order to filter out a specific data point or a specific row, convert it into a dictionary and access its values, we can use the filter() method pass in some condtition where we get a single data point.
- We can perform the filter() method and instead of using show() we can use **collect()** method. The collect() method is used to store the result into a variable. We can later use this variable to select specific values and use them in our logic or scripts.
  - ***Example:*** `resulting_row = df.filter(df['total_amount']).collect()`
  - Now, as we have the data point in form of a variable or list called `resulting_row`, we can fetch the data using the list features.
  - Suppose we want to get the date of the resulting item, we can easily get it by `resulting_data = resulting_row[0]['date']`.
  - We can also convert this into a dictionary and get the data as shown- `(resulting_row[0).asDict()`$\rightarrow$`resulting_date_from_dictionary = (resulting_row[0]).asDict('date')`

In [28]:
df = spark.read.csv('/content/drive/My Drive/Repos/Git/Integrating Machine Learning with Big Data/PySpark/Dataset/items_bought.csv',inferSchema=True,header=True)

In [29]:
df.show(4)

+----------+---------+----------+--------+----------+------------+
|      date|item_name|item_price|quantity|tax_amount|total_amount|
+----------+---------+----------+--------+----------+------------+
|11-10-2018|     Beer|     110.5|       2|     53.04|      163.54|
|14-02-2018|   Whisky|    1250.0|       1|     300.0|      1550.0|
|23-03-2020|   Whisky|    1300.5|       2|    624.24|     1924.74|
|05-10-2018|      Rum|     550.0|       2|     264.0|       814.0|
+----------+---------+----------+--------+----------+------------+
only showing top 4 rows



In [30]:
df.filter(df['total_amount']>1500).show()

+----------+---------+----------+--------+----------+------------+
|      date|item_name|item_price|quantity|tax_amount|total_amount|
+----------+---------+----------+--------+----------+------------+
|14-02-2018|   Whisky|    1250.0|       1|     300.0|      1550.0|
|23-03-2020|   Whisky|    1300.5|       2|    624.24|     1924.74|
+----------+---------+----------+--------+----------+------------+



In [31]:
df.filter('total_amount > 1500').show()

+----------+---------+----------+--------+----------+------------+
|      date|item_name|item_price|quantity|tax_amount|total_amount|
+----------+---------+----------+--------+----------+------------+
|14-02-2018|   Whisky|    1250.0|       1|     300.0|      1550.0|
|23-03-2020|   Whisky|    1300.5|       2|    624.24|     1924.74|
+----------+---------+----------+--------+----------+------------+



In [32]:
df.filter('item_price>1000 and tax_amount>500').show() # Filtering based on multiple conditions with SQL syntax

+----------+---------+----------+--------+----------+------------+
|      date|item_name|item_price|quantity|tax_amount|total_amount|
+----------+---------+----------+--------+----------+------------+
|23-03-2020|   Whisky|    1300.5|       2|    624.24|     1924.74|
+----------+---------+----------+--------+----------+------------+



In [33]:
df.filter((df['item_price']>1000)&(df['tax_amount']>500)).show() # Filtering based on multiple conditions without SQL syntax

+----------+---------+----------+--------+----------+------------+
|      date|item_name|item_price|quantity|tax_amount|total_amount|
+----------+---------+----------+--------+----------+------------+
|23-03-2020|   Whisky|    1300.5|       2|    624.24|     1924.74|
+----------+---------+----------+--------+----------+------------+



In [34]:
result_data = df.filter('total_amount = 1924.74').collect() # Collecting record based on a condition

In [35]:
result_data[0]['date'] # Fetching Data from collected Result

'23-03-2020'

In [36]:
result_data[0].asDict() # Converting the result into a dictionary.

{'date': '23-03-2020',
 'item_name': 'Whisky',
 'item_price': 1300.5,
 'quantity': 2,
 'tax_amount': 624.24,
 'total_amount': 1924.74}

### **groupBy method**

- **groupby()** allows us to group rows together based on some common value.
- If there is a column that is repeating, we can apply groupBy() on that column and we can get each and evert distict repeating value as a distinct group and we can apply logic or requirements based on certain groups.
- Suppose we have the revenue or sales data that contains columns like company name, products and sales figures, in this scenario we can groupby the company name column to get an overview of a specific company's products and sales figures.
- Let us use **groupBy()** to the company name column and use the sum function to get the total sale and revenue of each company. For this we can write the following code - `df.groupBy('company_name').sum().show()`
- There are various other methods we can use the groupBy function with and they are,
  - **count()** -  It returns the count of rows for each group.
  - **mean()** - It returns the mean of values of each group.
  - **max()** - It returns the max value in each group.
  - **min()** - It returns the min value in each group.
  - **sum()** - It return the sum of the values in each group.
  - **avg()** - It returns the average of values in each group.
  - **agg()** - It can be used to calculate more than one aggregate at a time, we will see more about the agg() method in the next topic and also learn how to use it with groupBy() method.

### **Aggregate Function**

- Aggregation Function is used to perform some aggregation operations like sum, average, and so on, on a set of values of columns and outputs a consolidated or single-valued result.
- Aggregate Function can be used to aggregate between all rows of the DataFrame as well as can be used with groupBy function to aggregate within rows of each group.
- One important point is that aggregate function takes in a dictionary as parameter. That is we can give the key and the value of the dictionary as the column name and the operation name(like sum, mean and so) respectively.
- Let us use the aggregate function on the entire DataFrame and later see how it can be fine tuned to work with groupBy function.
- Suppose, we want the total revenue or sales of all companies in the dataset and also we want to know the maximum revenue or sales among all companies, we can get it by using the **agg()** function on DataFrame as shown below,
  - `df.agg({'revenue_sales':'sum'}).show()` $\rightarrow$ Total Revenue 
  - `df.agg({'revenue_sales':'max'}).show()` $\rightarrow$ Max Revenue
- Now let's see how we can use an **agg()** function with the **groupBy()** function.
- Suppose we want the total revenue or sales of each company present in our DataFrame. In order to achieve this we can first groupby the companies with the `company_name` column and then apply the aggregate function on the same.
- `df.groupBy('company_name').agg({'revenue_sales':'sum'}).show()`

### **orderBy Function**

- **orderBy()** function allows us to order and sort data in our DataFrame. It is very useful when we need to quickly analyze and visualize our data.
- It can be used on columns of DataFrame and it acts as an ascending or decending order filter on the entire column(s).
- Suppose we want to order our `'revenue_sales'` column data in ascending order, we can easily achieve this by using the orderBy() function on the DataFrame and passing the column name to be sorted as the parameter to the function.
- `df.orderBy(df['revenue_sales']).show()`$\rightarrow$ Ascending Order
- `df.orderBy(df['revenue_sales].desc()).show()`$\rightarrow$ Descending Order

In [37]:
df = spark.read.csv('/content/drive/My Drive/Repos/Git/Integrating Machine Learning with Big Data/PySpark/Dataset/company_product_revenue.csv',inferSchema=True,header=True)

In [38]:
df.show(4)

+-------------+------------+-------------+
| company_name|product_name|revenue_sales|
+-------------+------------+-------------+
|         Audi|          A4|          450|
|Mercedes Benz|     G Class|         1200|
|          BMW|          X1|          425|
|     Mahindra|     XUV 500|          850|
+-------------+------------+-------------+
only showing top 4 rows



In [39]:
print('Total Revenue Sales per Company')
df.groupBy('company_name').sum().show()

Total Revenue Sales per Company
+-------------+------------------+
| company_name|sum(revenue_sales)|
+-------------+------------------+
|          Kia|              1140|
|         Audi|              2275|
|     Mahindra|              1640|
|          BMW|              1975|
|Mercedes Benz|              2570|
+-------------+------------------+



In [40]:
print('Total Revenue Sale for the entire Data')
df.agg({'revenue_sales':'sum'}).show()

Total Revenue Sale for the entire Data
+------------------+
|sum(revenue_sales)|
+------------------+
|              9600|
+------------------+



In [41]:
print('Max revenue sales per Company')
df.groupBy('company_name').agg({'revenue_sales':'max'}).show()

Max revenue sales per Company
+-------------+------------------+
| company_name|max(revenue_sales)|
+-------------+------------------+
|          Kia|               690|
|         Audi|               725|
|     Mahindra|               850|
|          BMW|               850|
|Mercedes Benz|              1200|
+-------------+------------------+



In [42]:
print('Order the data based on revenue_sales in ascending order')
df.orderBy('revenue_sales').show()

Order the data based on revenue_sales in ascending order
+-------------+------------+-------------+
| company_name|product_name|revenue_sales|
+-------------+------------+-------------+
|          BMW|          X1|          425|
|         Audi|          A4|          450|
|          Kia|    Carnival|          450|
|Mercedes Benz|     C Class|          470|
|         Audi|          Q7|          500|
|         Audi|          A6|          600|
|          Kia|      Seltos|          690|
|          BMW|          X3|          700|
|         Audi|          Q5|          725|
|     Mahindra|     XUV 300|          790|
|          BMW|          X5|          850|
|     Mahindra|     XUV 500|          850|
|Mercedes Benz|         GLS|          900|
|Mercedes Benz|     G Class|         1200|
+-------------+------------+-------------+



In [43]:
print('Order the data based on revenue_sales in descending order')
df.orderBy(df['revenue_sales'].desc()).show()

Order the data based on revenue_sales in descending order
+-------------+------------+-------------+
| company_name|product_name|revenue_sales|
+-------------+------------+-------------+
|Mercedes Benz|     G Class|         1200|
|Mercedes Benz|         GLS|          900|
|     Mahindra|     XUV 500|          850|
|          BMW|          X5|          850|
|     Mahindra|     XUV 300|          790|
|         Audi|          Q5|          725|
|          BMW|          X3|          700|
|          Kia|      Seltos|          690|
|         Audi|          A6|          600|
|         Audi|          Q7|          500|
|Mercedes Benz|     C Class|          470|
|         Audi|          A4|          450|
|          Kia|    Carnival|          450|
|          BMW|          X1|          425|
+-------------+------------+-------------+



### **Using Standard Functions in our Operations.**

- There are various standard functions which are very useful in our logic building and scripting, it will help us to easily manipulate the data.
- There are various standard functions and complex functions like correlation and standard deviation and so which we can easily use. Inorder to do this we have to import the standard functions which we want to use from the `spark.sql.functions` and it can be done by the command  - `from pyspark.sql.functions import function_name`
- ***Example:*** `from pyspark.sql.functions import stddev,avg,format_number`
- Suppose we want to get the average of the revenue or sales column from our previous dataset, and set the column name of the result to something we want. We can use standard average functions which we have imported and use the alias function to set the preferred column name as shown 
- `df.select(avg('revenue_sales').alias('Average Revenue Sales')).show()`
- We can use the alias for the functions, we can define the header name of the column, by which it makes more sense and can be inferred easily.
- We can also perform standard deviations and see how to format the results based on our requirements.
- For this we can use the standard functions `'stddev'` to calculate the standard deviation and `'format_number'` function to format the results.
- `std_dev_result = df.select(stddev('revenue').alias('std'))`
- `std_dev_result.select(format_number('std',3)).show()`
- Here the `format_number()` method takes two parameters, the first parameter being column name ( in our case its the standard deviation result present in the `std_dev_result` DataFrame) and the second parameter being the number of decimal points considered.

In [44]:
df = spark.read.csv('/content/drive/My Drive/Repos/Git/Integrating Machine Learning with Big Data/PySpark/Dataset/company_product_revenue.csv',inferSchema=True,header=True)

In [45]:
df.show(4)

+-------------+------------+-------------+
| company_name|product_name|revenue_sales|
+-------------+------------+-------------+
|         Audi|          A4|          450|
|Mercedes Benz|     G Class|         1200|
|          BMW|          X1|          425|
|     Mahindra|     XUV 500|          850|
+-------------+------------+-------------+
only showing top 4 rows



In [46]:
from pyspark.sql.functions import mean, stddev, format_number, avg # Importing in-built functions

In [47]:
# Using mean functions
df.select(mean('revenue_sales').alias('Mean Revenue Sales')).show()

+------------------+
|Mean Revenue Sales|
+------------------+
| 685.7142857142857|
+------------------+



In [48]:
result_average = df.select(avg('revenue_sales').alias('Average Revenue Sales')) # Using avg Function
print(f'Average Revenue Sales:{result_average.head()[0]}') # Taking the first item in the head list

Average Revenue Sales:685.7142857142857


In [49]:
result_average.select(format_number('Average Revenue Sales',2).alias('Formatted Revenue Sales')).show() # Formatting to two decimal points.

+-----------------------+
|Formatted Revenue Sales|
+-----------------------+
|                 685.71|
+-----------------------+



## **Dealing with Missing Data in Spark DataFrame**

- In practical scenarios the data we work with might have some empty values or missing values which are generally referred to as null values.
- The null values must be handled properly in our data inorder to obtain meaningful and accurate results.
- There are various ways we could handle the missing data issue.
- We could either drop the data point which contains the missing values or fill it in with some calculated or default values.
- It is recommended to provide a calculated result or default value to replace the missing values rather than dropping the data point as a part of preserving the data.


### **Dropping the rows or data-points that contain the null values**

- We can drop the missing values from our DataFrame using the **drop()** method which is associated with the **na** property in the DataFrame.
- The drop() function returns a new DataFrame omitting all the rows with null values.
- ***Example:*** `df.na.drop().show()`
- This drops all the columns which have null values present in the `df` DataFrame.
- The **drop()** function have 3 parameters and they are
  - ***how:*** 
    - This parameter has two main options `'any'` and `'all'`. By default, `how='any'` and it means to drop all rows if any of those rows have a null value.
    - The second option associated with `how` is `'all'` and it defines, drops all rows which have all row values as null.
    - ***Example:*** `df.na.drop(how='any').show()`
    - This returns us the DataFrame with those rows having one or more null values in their columns but drops all those rows which have all null values in their columns.
  - ***thresh:***
    - The `'thresh'` can be assigned with a natural number and based on the value it considers all the rows which have non-null values equal or more than threshold number which we have specified.
    - ***Example:*** `df.na.drop(thresh=2).show()`
    - It drops only those rows with non-null values less than the specified threshold value.
    - By default, the thresh values is set to None.
  - ***subset:***
    - The subset parameter can be used to specify the columns that we want to consider for checking or validating for null-values.
    - ***Example:*** `df.na.drop(subset=[column_name(s)]).show()`
    - This would drop the rows with how condition 'any' applied to only those specific columns which are mentioned in the subset parameter. It will still show null values present in the other columns.

### **Filling the null-values with other values.**

- Instead of dropping the null values, let us see some methods to fill the null values and retain the rows or the data-points.
- We can replace the null or missing values in our DataFrame with another value using the **fill()** method which is associated with `na` method in the DataFrame.
- **Syntax:** `df.na.fill(value,subset=None)`
- Generally, the fill() function, replaces the null values by matching the data type of the column values and the value given as parameter to the fill() method.
- ***Example:*** `df.na.fill('filling_string_value').show()`
- If we apply fill() in the above manner, it will replace the column values which has the string data type and has null value with the value 'filling_string_value' which is given as the value parameter to the function fill().
- It functions similary with other value data types.
- We can use the subset parameter to select the columns which we want to fill.
- ***Example:*** `df.na.fill(value,subset=[column_name(s)]).show()`
- **Note:** If we enter a non-string value as our value parameter to the string type column, it will simply be ignored, that is the missing value will not be filled.

### **Filling Calculated values in place of missing or Null values.**