# **Introduction to SparkSQL**


This lab goes over the basic operations of Apache SparkSQL.


![](http://spark.apache.org/images/spark-logo.png)


## Objectives


Spark SQL is a Spark module for structured data processing. It is sed to query structured data inside Spark programs, using either SQL or a familiar DataFrame API.

After completing this lab you will be able to:


* Load a data file into a dataframe
* Create a Table View for the dataframe
* Run basic SQL queries and aggregate data on the table view
* Create a Pandas UDF to perform columnar operations


----


## Setup


For this lab, we are going to be using Python and Spark (PySpark). These libraries should be installed in your lab environment or in SN Labs. Pandas is a popular data science package for Python. In this lab, we use Pandas to load a CSV file from disc to a pandas dataframe in memory. PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 


In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pyarrow==0.14.1 
!pip install pandas
!pip install numpy==1.19.5

In [2]:
import findspark
findspark.init()

In [3]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

##   Spark session


Create and initialize the Spark session needed to load the data frames and operate on it


####  Creating the spark session and context


In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/04/12 19:24:13 WARN Utils: Your hostname, Mehran resolves to a loopback address: 127.0.0.1; using 192.168.1.106 instead (on interface wlan0)
24/04/12 19:24:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/12 19:24:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


####  Initialize Spark session
To work with dataframes we just need to verify that the spark session instance has been created.


In [5]:
spark

##  Loading the Data and creating a table view


In this section, you will first read the CSV file into a Pandas DataFrame and then read it into a Spark DataFrame.
Pandas is a library used for data manipulation and analysis. Pandas offers data structures and operations for creating and manipulating Data Series and DataFrame objects. Data can be imported from various data sources, e.g., Numpy arrays, Python dictionaries, and CSV files. Pandas allows you to manipulate, organize and display the data.
To create a Spark DataFrame we load an external DataFrame, called autompg. This DataFrame includes 398 observations on 10 variables:


| colIndex | colName | units/description |
| :---: | :--- | :--- |
|[1] | mpg |The fuel economy of the car in terms of miles travelled per gallon of gasoline   |
|[2] | cylinders | The number of cylinders in the car's engine   |
|[3] | displacement | The volume of air displaced by all the pistons of a piston engine   |  
|[4] | horsepower  | Horsepower is a measure of power the engine produces    |
|[6] | weight | The total weight of the car   |
|[7] | acceleration | The time in seconds it takes for the car to reach 60 miles per hour |
|[8] | model year | The year (in the 20th century) the car model was released. For example 80 means the car was released in 1980. 
|[9] | origin | The region where the car was manufactured. 1 - USA. 2 - Europe. 3 - Japan 
|[10] | car name | The name of the car model.

####  Load data into a Pandas DataFrame.

Pandas has a convenient function to load CSV data from a URL directly into a pandas dataframe.


In [6]:
# Read the file using `read_csv` function in pandas
autompg = pd.read_csv('Datasets/auto-mpg.csv')

In [7]:
# Preview a few records
autompg.head()

Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model year,origin,car name
0,18.0,8,307.0,130,3504,12.0,70,1,chevrolet chevelle malibu
1,15.0,8,350.0,165,3693,11.5,70,1,buick skylark 320
2,18.0,8,318.0,150,3436,11.0,70,1,plymouth satellite
3,16.0,8,304.0,150,3433,12.0,70,1,amc rebel sst
4,17.0,8,302.0,140,3449,10.5,70,1,ford torino


####  Loading data into a Spark DataFrame


We use the `createDataFrame` function to load the data into a spark dataframe


In [8]:
sdf = spark.createDataFrame(autompg) 

Let us look at the schema of the loaded spark dataframe


In [9]:
sdf.printSchema()

root
 |-- mpg: double (nullable = true)
 |-- cylinders: long (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- weight: long (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- model year: long (nullable = true)
 |-- origin: long (nullable = true)
 |-- car name: string (nullable = true)



####  Rename the existing column name "vs" to "versus" and assign the new result DataFrame to a variable, "sdf_new". 

The function `withColumnRenamed()` is renames the existing column names.  
 


In [10]:
sdf_new = sdf.withColumnRenamed("cylinders", "cyl")

The execution of the above function doesn’t modify the original DataFrame `sdf`, instead, a new DataFrame `sdf_new` is created with the renamed column. 


#### View the new dataframe


In [11]:
sdf_new.head(5)

                                                                                

[Row(mpg=18.0, cyl=8, displacement=307.0, horsepower='130', weight=3504, acceleration=12.0, model year=70, origin=1, car name='chevrolet chevelle malibu'),
 Row(mpg=15.0, cyl=8, displacement=350.0, horsepower='165', weight=3693, acceleration=11.5, model year=70, origin=1, car name='buick skylark 320'),
 Row(mpg=18.0, cyl=8, displacement=318.0, horsepower='150', weight=3436, acceleration=11.0, model year=70, origin=1, car name='plymouth satellite'),
 Row(mpg=16.0, cyl=8, displacement=304.0, horsepower='150', weight=3433, acceleration=12.0, model year=70, origin=1, car name='amc rebel sst'),
 Row(mpg=17.0, cyl=8, displacement=302.0, horsepower='140', weight=3449, acceleration=10.5, model year=70, origin=1, car name='ford torino')]

Observe how `vs` has now been renamed to `versus` in this dataframe.


####  Create a Table View
Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the `createTempView()` function


In [12]:
sdf.createTempView("cars")

##  Running SQL queries and aggregating data


Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly. 


In [13]:
# Showing the whole table
spark.sql("SELECT * FROM cars").show()

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         ford torino|
|15.0|        8|       429.0|       198|  4341|        10.0|        70|     1|    ford galaxie 500|
|14.0|        8|       454.0|       220|  4354|         9.0|        70|     1|    chevrolet impala|


In [14]:
# Showing a specific column
spark.sql("SELECT mpg FROM cars").show(5)

+----+
| mpg|
+----+
|18.0|
|15.0|
|18.0|
|16.0|
|17.0|
+----+
only showing top 5 rows



In [25]:
# Basic filtering query to determine cars that have a high mileage and low cylinder count
spark.sql("SELECT * FROM cars where mpg>20 AND cylinders < 6 AND origin=3").show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|24.0|        4|       113.0|        95|  2372|        15.0|        70|     3|toyota corona mar...|
|27.0|        4|        97.0|        88|  2130|        14.5|        70|     3|        datsun pl510|
|27.0|        4|        97.0|        88|  2130|        14.5|        71|     3|        datsun pl510|
|25.0|        4|       113.0|        95|  2228|        14.0|        71|     3|       toyota corona|
|31.0|        4|        71.0|        65|  1773|        19.0|        71|     3| toyota corolla 1200|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



In [24]:
# Use where method to get list of cars that have miles per gallon is less than 18
sdf.where(sdf['acceleration'] < 10).show(3) 

+----+---------+------------+----------+------+------------+----------+------+------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|          car name|
+----+---------+------------+----------+------+------------+----------+------+------------------+
|14.0|        8|       454.0|       220|  4354|         9.0|        70|     1|  chevrolet impala|
|14.0|        8|       440.0|       215|  4312|         8.5|        70|     1| plymouth fury iii|
|15.0|        8|       390.0|       190|  3850|         8.5|        70|     1|amc ambassador dpl|
+----+---------+------------+----------+------+------------+----------+------+------------------+
only showing top 3 rows



In [23]:
# Aggregating data and grouping by cylinders
spark.sql("SELECT  cylinders, count(cylinders), avg(weight) from cars GROUP BY cylinders").show()

+---------+----------------+------------------+
|cylinders|count(cylinders)|       avg(weight)|
+---------+----------------+------------------+
|        6|              84|3198.2261904761904|
|        8|             103| 4114.718446601942|
|        4|             204| 2308.127450980392|
|        3|               4|            2398.5|
|        5|               3|3103.3333333333335|
+---------+----------------+------------------+



## Create a Pandas UDF to apply a columnar operation
Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the _best of both worlds_—the ability to define low-overhead, high-performance UDFs entirely in Python. In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).

In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the `@pandas_udf()` decorator. We can then apply this UDF to our `weight` column. 


####  Importing libraries and registering a UDF


In [26]:
# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [27]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

<pyspark.sql.udf.UserDefinedFunction at 0x7f6f59bb8a90>

#### Applying the UDF to the tableview

We can now apply the `convert_weight` user-defined-function to our `wt` column from the `cars` table view. This is done very simply using the SQL query shown below. In this example below we show both the original weight (in ton-lbs) and converted weight (in metric tons). 


In [29]:
spark.sql("SELECT *, weight AS weight_imperial, convert_weight(weight) as weight_metric FROM cars").show()

[Stage 23:>                                                         (0 + 1) / 1]

+----+---------+------------+----------+------+------------+----------+------+--------------------+---------------+-------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|weight_imperial|weight_metric|
+----+---------+------------+----------+------+------------+----------+------+--------------------+---------------+-------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|           3504|       1576.8|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|           3693|      1661.85|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|           3436|       1546.2|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|           3433|      1544.85|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         for

                                                                                

## Authors


[Mehran Morabbi Pazoki](https://www.linkedin.com/in/mehran-pazoki-6a3372175/)

