<a href="https://colab.research.google.com/github/Amelbnmbh/Introduction-to-Big-Data-with-Spark-and-Hadoop/blob/main/Introduction_to_SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Introduction to SparkSQL**
## Objectives
Spark SQL is a Spark module for structured data processing. It is sed to query structured data inside Spark programs, using either SQL or a familiar DataFrame API.

After completing this lab you will be able to:
* Load a data file into a dataframe
* Create a Table View for the dataframe
* Run basic SQL queries and aggregate data on the table view
* Create a Pandas UDF to perform columnar operations
----

## Setup



In [3]:
# Installing required packages
!pip install pyarrow
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
import findspark
findspark.init()

In [5]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Exercise 1 -  Spark session
Create and initialize the Spark session needed to load the data frames and operate on it

#### Task 1: Creating the spark session and context


In [6]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Task 2: Initialize Spark session
To work with dataframes we just need to verify that the spark session instance has been created.


In [7]:
spark

## Exercise 2 - Loading the Data and creating a table view
In this section, we will first read the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe

To create a Spark DataFrame we load an external DataFrame, called `mtcars`. This DataFrame includes 32 observations on 11 variables:

| colIndex | colName | units/description |
| :---: | :--- | :--- |
|[, 1] | mpg |Miles per gallon  |
|[, 2] | cyl | Number of cylinders  |
|[, 3] | disp | Displacement (cu.in.) |  
|[, 4] | hp  | Gross horsepower  |
|[, 5] | drat | Rear axle ratio  |
|[, 6] | wt | Weight (lb/1000)  |
|[, 7] | qsec | 1/4 mile time  |
|[, 8] | vs  | V/S  |
|[, 9] | am | Transmission (0 = automatic, 1 = manual)  |
|[,10] | gear | Number of forward gears  |
|[,11] | carb | Number of carburetors |

#### Task 1: Load data into a Pandas DataFrame.

Pandas has a convenient function to load CSV data from a URL directly into a pandas dataframe.



In [8]:
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [9]:
# Preview a few records
mtcars.head()

Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


In [10]:
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

#### Task 2: Loading data into a Spark DataFrame

We use the `createDataFrame` function to load the data into a spark dataframe



In [11]:
sdf = spark.createDataFrame(mtcars)

Let us look at the schema of the loaded spark dataframe


In [12]:
sdf.printSchema()

root
 |-- name: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: long (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- am: long (nullable = true)
 |-- gear: long (nullable = true)
 |-- carb: long (nullable = true)



#### Task 3: Rename the existing column name "vs" to "versus" and assign the new result DataFrame to a variable, "sdf_new".

The function `withColumnRenamed()` is renames the existing column names.  



In [13]:
sdf_new = sdf.withColumnRenamed("vs", "versus")

The execution of the above function doesn’t modify the original DataFrame `sdf`, instead, a new DataFrame `sdf_new` is created with the renamed column.



#### Task 4: View the new dataframe


In [15]:
sdf_new.show(5)

+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
|             name| mpg|cyl| disp| hp|drat|   wt| qsec|versus| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|     0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|     0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|     1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|     1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|     0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+------+---+----+----+
only showing top 5 rows



#### Task 4: Create a Table View
Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the `createTempView()` function


In [16]:
sdf.createTempView("cars")

## Exercise 3 - Running SQL queries and aggregating data
Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.


In [17]:
# Showing the whole table
spark.sql("SELECT * FROM cars").show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|               name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

In [20]:
# Showing a specific column
spark.sql("SELECT mpg FROM cars").show(3)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
+----+
only showing top 3 rows



In [21]:
# Basic filtering query to determine cars that have a high mileage and low cylinder count
spark.sql("SELECT * FROM cars where mpg>20 AND cyl < 6").show(5)

+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
|       name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
| Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|  Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|   Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|   Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
+-----------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [23]:
# Aggregating data and grouping by cylinders
spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()

+--------+---+
|count(1)|cyl|
+--------+---+
|       7|  6|
|      14|  8|
|      11|  4|
+--------+---+



## Exercise 4 - Create a Pandas UDF to apply a columnar operation
Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the _best of both worlds_—the ability to define low-overhead, high-performance UDFs entirely in Python. In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).

In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the `@pandas_udf()` decorator. We can then apply this UDF to our `wt` column.

#### Task 1: Importing libraries and registering a UDF



In [24]:
# import the Pandas UDF function
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [25]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

<pyspark.sql.udf.UserDefinedFunction at 0x7f7cd04df130>

#### Task 2: Applying the UDF to the tableview

We can now apply the `convert_weight` user-defined-function to our `wt` column from the `cars` table view. This is done very simply using the SQL query shown below. In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons).


In [27]:
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show(3)

+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
|         name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|weight_imperial|weight_metric|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|           2.62|        1.179|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|          2.875|      1.29375|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|           2.32|        1.044|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+---------------+-------------+
only showing top 3 rows



## Exercise 5 - Combining DataFrames based on a specific condition.

#### Task 1 - Understanding JOIN operation


In [28]:
# define sample DataFrame 1

data = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")]

columns = ["emp_id", "emp_name"]

dataframe_1 = spark.createDataFrame(data, columns)

In [29]:
# define sample DataFrame 2

data = [("A101", 3250), ("A102", 6735), ("A103", 8650)]

columns = ["emp_id", "salary"]

dataframe_2 = spark.createDataFrame(data, columns)

In [30]:
# create a new DataFrame, "combined_df" by performing an inner join

combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner")

In [31]:
# Show the data in combined_df as a list of Row.

combined_df.collect()

[Row(emp_id='A101', emp_name='John', salary=3250),
 Row(emp_id='A102', emp_name='Peter', salary=6735),
 Row(emp_id='A103', emp_name='Charlie', salary=8650)]

In [32]:
combined_df.head()

Row(emp_id='A101', emp_name='John', salary=3250)

In [33]:
combined_df.show()

+------+--------+------+
|emp_id|emp_name|salary|
+------+--------+------+
|  A101|    John|  3250|
|  A102|   Peter|  6735|
|  A103| Charlie|  8650|
+------+--------+------+



#### Task 2 Filling the missing values


In [34]:
# define sample DataFrame 1 with some missing values

data = [("A101", 1000), ("A102", 2000), ("A103",None)]

columns = ["emp_id", "salary"]

dataframe_3 = spark.createDataFrame(data, columns)


In [36]:
# fill missing salary value with a specified value
filled_df = dataframe_3.fillna({"salary": 3000})
dataframe_3.head(3)

[Row(emp_id='A101', salary=1000),
 Row(emp_id='A102', salary=2000),
 Row(emp_id='A103', salary=None)]

### Question 1 - Basic SQL operations
Display all Mercedez car rows from the `cars` table view we created earlier. The Mercedez cars have the prefix "Merc" in the car name column.


In [40]:
spark.sql("SELECT * FROM cars where name like 'Merc%'").show()

+-----------+----+---+-----+---+----+----+----+---+---+----+----+
|       name| mpg|cyl| disp| hp|drat|  wt|qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+----+----+---+---+----+----+
|  Merc 240D|24.4|  4|146.7| 62|3.69|3.19|20.0|  1|  0|   4|   2|
|   Merc 230|22.8|  4|140.8| 95|3.92|3.15|22.9|  1|  0|   4|   2|
|   Merc 280|19.2|  6|167.6|123|3.92|3.44|18.3|  1|  0|   4|   4|
|  Merc 280C|17.8|  6|167.6|123|3.92|3.44|18.9|  1|  0|   4|   4|
| Merc 450SE|16.4|  8|275.8|180|3.07|4.07|17.4|  0|  0|   3|   3|
| Merc 450SL|17.3|  8|275.8|180|3.07|3.73|17.6|  0|  0|   3|   3|
|Merc 450SLC|15.2|  8|275.8|180|3.07|3.78|18.0|  0|  0|   3|   3|
+-----------+----+---+-----+---+----+----+----+---+---+----+----+



### Question 2 - User Defined Functions
In this notebook, we created a UDF to convert weight from imperial to metric units. Now for this exercise, please create a pandas UDF to convert the `mpg` column to `kmpl` (kilometers per liter). You can use the conversion factor of 0.425.


In [41]:
# Code block for learners to answer
from pyspark.sql.functions import pandas_udf

@pandas_udf("float")
def convert_mileage(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.425

spark.udf.register("convert_mileage", convert_mileage)

spark.sql("SELECT *, mpg AS mpg, convert_mileage(mpg) as kmpl FROM cars").show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
|               name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb| mpg|   kmpl|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+----+-------+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|21.0|  8.925|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|21.0|  8.925|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|22.8|   9.69|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|21.4|  9.095|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|18.7| 7.9475|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|18.1| 7.6925|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|14.3| 6.0775|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|24.4|  10.37|
|           Merc 230|