# **Lab 11: PySpark Dataframes and Spark SQL**

# [Pyspark Dataframes](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html)

A distributed collection of data grouped into named columns.A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SparkSession. Once created, it can be manipulated using the various domain-specific-language (DSL) functions.  

Credit goes to [website](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/) for examples of Pyspark dataframe operations. 

### So, is there any differences between Lab10's Pyspark RDD and Pyspark DataFrames?  
From an [article](https://sachee.medium.com/apache-spark-dataframe-vs-rdd-24a04d2eb1b9):

*   Pyspark DataFrames require a schema and you can think of them as “tables” of data. RDDs are less structured and closer to Scala collections or lists.
*   Operations on Pyspark DataFrames are optimizable by Spark whereas operations on RDDs are imperative and run through the transformations and actions in order.





## Spark dataframes are immutable
When a Spark dataframe is created,the content of the dataframe cannot be changed but a new dataframe will be created. Internally in Spark, commands such as filter() will not change the dataframe but it creates a new dataframe from the output of executing the command. 

Like an RDD, a DataFrame is an immutable distributed collection of data. Unlike an RDD, data is organized into named columns, like a table in a relational database. Designed to make large data sets processing even easier, DataFrame allows developers to impose a structure onto a distributed collection of data, allowing higher-level abstraction; it provides a domain specific language API to manipulate your distributed data; and makes Spark accessible to a wider audience, beyond specialized data engineers.

### What about the dataframe that we used in previous labs ?
In previous labs, we did run dataframe but remember it is Python's dataframe where we imported the pandas library. It is not Pyspark dataframe.  



In [None]:
!pip install pyspark --quiet


Download Java and Apache Spark with Hadoop. There are many releases for Apache Spark. If you are running in non-Colab environment, find the release that suits your machine. Here is the link to Apache Spark [releases](https://spark.apache.org/downloads.html).

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz


Now, it’s time to set the ‘environment’ path.Then we need to install and import the ‘findspark’ library that will locate Spark on the system and import it as a regular library.

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
!pip install -q findspark
import findspark
findspark.init()

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark. Ok, we are done with PySpark setup.

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate() #local[3] or local[3]

In [5]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
 
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("IMDB").setMaster("local[*]")
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate(conf=conf)

Let's read the data that we have used in Lab03. 

In [9]:
imdb_df = spark.read.csv("IMDB_top250.csv",header=True)

Let's try to understand the data. First, we need to know the column names and types. 

In [None]:
imdb_df.printSchema()

In [None]:
imdb_df.show(5)  # What is the function used to retrieve the first 5 rows of a Python's data frame?

Let's calculate the total number of rows.

In [None]:
imdb_df.count()

In [None]:
imdb_df.show(truncate=False)
#What happens when truncate = True ?

# Let's use Spark's SQL to perform operations on PySpark's dataframe. 

# Queries: select and Like functions


In [None]:
imdb_df.select("Title","Year","Rated").show(5)  # view the first five rows based on selected columns

In the brackets of the “Like” function, the % character is used to filter out all titles having the “ THE ” word. If the condition we are looking for is the exact match, then no % character shall be used.

In [None]:
imdb_df.select( "Title",
imdb_df.Title.like("%Godfather%")).show(15)

# Inspection: distinct() and show()

In [None]:
imdb_df.select("Rated").distinct().show()
#The distinct() will come in handy when you want to determine the unique values in the categorical columns in the dataframe.

# Multiple View: groupBy function 

Can we know how many rows of data associated with each rating category ? We can use the groupBy function to group the dataframe column values and then apply an aggregate function on them to derive some useful insight.



In [None]:
from pyspark.sql import functions as F #you can create alias
imdb_df.groupBy("Rated").agg(F.count("Rated").alias("Total-Row")).sort(F.desc("Total-Row")).show()


# Descriptive Statistics

Often when we are working with numeric features, we want to have a look at the statistics regarding the dataframe. The describe() function is best suited for such purposes.It is pretty similar to Panda’s describe function but the statistical values are far less and the string columns are described as well.

In [None]:
imdb_df.describe().show()

# Data Cleaning

Can data cleaning operations be applied to Pyspark dataframe ? Yes, of course. Let's look at some examples of data cleaning operations. We will use the Automobile data from Lab04.

In [20]:
import numpy as np
import pandas as pd
cols = ['symboling', 'normalized-losses', 'make', 'fuel-type', 'aspiration', 'num-of-doors', 'body-style', 'drive-wheels', 'engine-location', 'wheel-base', 'length', 'width', 'height', 'curb-weight', 'engine-type', 'num-of-cylinders', 'engine-size', 'fuel-system', 'bore', 'stroke', 'compression-ratio', 'horsepower', 'peak-rpm', 'city-mpg', 'highway-mpg', 'price']
cars = pd.read_csv('imports-85.data.txt', names=cols)
 

Once we have read the Automobile data, we convert them to Pyspark dataframe.

In [None]:
cars_df = spark.createDataFrame(cars.astype(str))
cars_df.show()

From the first glance of the above code, there are many "?""where the data is missing. Let's count the number of "?" in each column of the Pyspark dataframe.

In [None]:
from pyspark.sql import functions as F
cars_df.agg(*[
    F.count(F.when(F.col(str(c)) == "?",1)).alias(c) for c in cars_df.columns]).show()

The column *normalized-losses* has the most "?". Let's replace the missing values in this column with the most frequently occurred value. The code below helps to identify the frequency of each distinct value in the column *normalized-losses*.

In [None]:
from pyspark.sql import functions as F
cars_df.groupBy("normalized-losses").agg(F.count("normalized-losses").alias("Total")).sort(F.desc("Total")).show()

From the above table, the value 161 is the most frequently occured values in the column normalized-losses. Let's replace the "?" with the value 161. The value 161 must be in the form of a string because the entire PySpark dataframe is of type String. Refer to cell[79].

In [29]:
cars_df = cars_df.replace('?', '161', subset= "normalized-losses")


Checkout whether the replacement is done. Looks like replacement is successful.

In [None]:
cars_df.show()

There are other columns having "?" as well. However it is not as many as the column *normalized-losses*. Let's just replace it using a default value which is 0. 

In [None]:
cars_df = cars_df.replace('?', '0')
cars_df.show()

Let's do a check again. Is there any columns having "?" as value ? No, there are no columns having "?" as value. 

In [None]:
cars_df.agg(*[
    F.count(F.when(F.col(str(c)) == "?", 1)).alias(c) for c in cars_df.columns]).show()

We can convert the column type too. Suppose that we would like to convert the column *symboling* from String to Integer.

In [33]:
from pyspark.sql.types import IntegerType
cars_df = cars_df.withColumn("symboling", F.col("symboling").astype(IntegerType()))

Let's double-check whether the conversion is done. Yes, the conversion is successful. 

In [None]:
cars_df.printSchema()

We can create a new column. Let's name the column as *double-symboling*. This new column will multiply the value from the column *symboling* with 2. Use the show() command to check whether the new column and the values have been created / computed. 

In [None]:
cars_df.withColumn("double-symboling", cars_df.symboling*2).show()

# Output File

There are many ways to export the Pyspark dataframe as csv. As shown in the following cells are two methods. 

**Method 1**: Convert it to pandas dataframe. You will see the file appearing in the folder within this Colab session. Once the file is in pandas dataframe, you can use the Python dataframe manipulation operations on it. 

In [36]:
cars_df.toPandas().to_csv('mycsv.csv')

**Method 2:**Write it out as csv. If you check the folder, there are two csv files. Why are there two csv files ? 

In [37]:
cars_df.write.csv('mycsv1.csv')

Remember, we are still in Pyspark environment. Let's find out the number of partitions. It turned out that we have two partitions. Remember, Pyspark writes out one file per partitions, so that is why we have to csv file. Pyspark does parallel write operation. We do not have control over the file naming.   

In [None]:
cars_df.rdd.getNumPartitions()

Since we want only a single csv file

In [39]:
cars_df.coalesce(1).write.csv("/content/mycsv2", header=True)



In [40]:
spark.stop()