# **Introduction to SparkSQL**

## Objectives

*   Load a data file into a dataframe
*   Create a Table View for the dataframe
*   Run basic SQL queries and aggregate data on the table view
*   Create a Pandas UDF to perform columnar operations

## #Setup


In [1]:
#Installing required packages

!pip install pandas
!pip install pyspark
!pip install findspark
!pip install numpy==1.19.5
!pip install pyarrow==1.0.0



In [2]:
import findspark
findspark.init()

In [3]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Exercise 1 -  Spark session


### Task 1: Creating the spark session and context

In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

22/02/15 07:30:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Task 2: Initialize Spark session

To work with dataframes we just need to verify that the spark session instance has been created.

In [5]:
spark

## Exercise 2 - Loading the Data and creating a table view
- First reading the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe Pandas is a library used for data manipulation and analysis.

- To create a Spark DataFrame we load an external DataFrame, called mtcars. This DataFrame includes 32 observations on 11 variables:


In [6]:
# reading the CSV file into a Pandas Dataframe and then read it into a Spark Dataframe Pandas
# Read the file using `read_csv` function in pandas

mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')


In [7]:
# Preview a few records
mtcars.head()

Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


In [8]:
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

#### Task 2: Loading data into a Spark DataFrame

- We use the `createDataFrame` function to load the data into a spark dataframe


In [9]:
sdf = spark.createDataFrame(mtcars)

In [10]:
#Let us look at the schema of the loaded spark dataframe

sdf.printSchema()

root
 |-- name: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: long (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- am: long (nullable = true)
 |-- gear: long (nullable = true)
 |-- carb: long (nullable = true)



#### Task 3: Create a Table View

Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. 

In [11]:
#Create a temporary view using the `createTempView()` function

sdf.createTempView("cars")

## Exercise 3 - Running SQL queries and aggregating data

-Runing queries similar to querying a SQL table. PS: It's differente here however is that we use the SQL queries directly.

In [12]:
# Showing the whole table
spark.sql("SELECT * FROM cars").show()

                                                                                

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|               name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

                                                                                

In [13]:
#Showing a specific column

spark.sql("SELECT mpg FROM cars").show(10)

                                                                                

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
|18.1|
|14.3|
|24.4|
|22.8|
|19.2|
+----+
only showing top 10 rows



In [14]:
# Basic filtering query to determine cars that have a high mileage and low cylinder count

spark.sql("SELECT * FROM cars where mpg>20 AND cyl <6").show(10)


[Stage 8:>                                                        (0 + 11) / 11]

+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          name| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|      Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|      Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|   Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
| Toyota Corona|21.5|  4|120.1| 97| 3.7|2.465|20.01|  1|  0|   3|   1|
|     Fiat X1-9|27.3|  4| 79.0| 66|4.08|1.935| 18.9|  1|  1|   4|   1|
| Porsche 914-2|26.0|  4|120.3| 91|4.43| 2.14| 16.7|  0|  1|   5|   2|
|  Lotus Europa|30.4|  4| 95.1|113|3.77|1.513| 16.9|  1|  1|   5|   2|
+--------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only s

                                                                                

In [15]:
# Aggregating data and grouping by cylinders

spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()



+--------+---+
|count(1)|cyl|
+--------+---+
|       7|  6|
|      14|  8|
|      11|  4|
+--------+---+



                                                                                

## Exercise 4 - Create a Pandas UDF to apply a columnar operation

- UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the @pandas_udf() decorator. We can then apply this UDF to our wt column.

#### Task 1: Importing libraries and registering a UDF


In [22]:
# import the Pandas UDF function 

from pyspark.sql.functions import pandas_udf, PandasUDFType

In [41]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s*0.45

spark.udf.register("convert_weight", convert_wt)

<function __main__.convert_wt(s: pandas.core.series.Series) -> pandas.core.series.Series>

#### Task 2: Applying the UDF to the tableview

- We can apply the convert_weight user-defined-function to our wt column from the cars table view. This is done very simply using the SQL query shown below.

In [None]:
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()


### Practice Questions

### Question 1 - Basic SQL operations

- Display all Mercedez car rows from the cars table view we created earlier. The Mercedez cars have the prefix "Merc" in the car name column.

In [59]:
# Code answer

spark.sql("SELECT * FROM cars where name like 'Merc%' ").show()


+-----------+----+---+-----+---+----+----+----+---+---+----+----+
|       name| mpg|cyl| disp| hp|drat|  wt|qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+----+----+---+---+----+----+
|  Merc 240D|24.4|  4|146.7| 62|3.69|3.19|20.0|  1|  0|   4|   2|
|   Merc 230|22.8|  4|140.8| 95|3.92|3.15|22.9|  1|  0|   4|   2|
|   Merc 280|19.2|  6|167.6|123|3.92|3.44|18.3|  1|  0|   4|   4|
|  Merc 280C|17.8|  6|167.6|123|3.92|3.44|18.9|  1|  0|   4|   4|
| Merc 450SE|16.4|  8|275.8|180|3.07|4.07|17.4|  0|  0|   3|   3|
| Merc 450SL|17.3|  8|275.8|180|3.07|3.73|17.6|  0|  0|   3|   3|
|Merc 450SLC|15.2|  8|275.8|180|3.07|3.78|18.0|  0|  0|   3|   3|
+-----------+----+---+-----+---+----+----+----+---+---+----+----+



### Question 2 - User Defined Functions
- creating a pandas UDF to convert the mpg column to kmpl (kilometers per liter). You can use the conversion factor of 0.425.


In [None]:
# Code answer

@pandas_udf("float")
def convert_mileage(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.425

spark.udf.register("convert_mileage", convert_mileage)

spark.sql("SELECT *, mpg AS mpg, convert_weight(mpg) as kmpl FROM cars").show()


##                                **The End**

|  Compiled By      |    Last Update    |    Social Media   |
| ----------------- | ----------------- | ----------------- |
|  Jeremias Tivane  |    12-02-2022     | [Linkedin](https://www.linkedin.com/in/jeremiastivane/) | 
|  Jeremias Tivane  |    14-02-2022     | [Github](https://github.com/Jeremias-Tivane) | 
      