# SQL Options in Spark

PySpark provides two main options when it comes to using staight SQL. Spark SQL and SQL Transformer. 

## 1. Spark SQL

Spark TempView provides two functions that allow users to run **SQL** queries against a Spark DataFrame: 

 - **createOrReplaceTempView:** The lifetime of this temporary view is tied to the SparkSession that was used to create the dataset. It creates (or replaces if that view name already exists) a lazily evaluated "view" that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view.
 - **createGlobalTempView:** The lifetime of this temporary view is tied to this Spark application. This feature is useful when you want to share data among different sessions and keep alive until your application ends.
     - Why we could need it? 
         - The first obvious use case - when we need to use data coming from different SparkSessions which can't share the same configuration. The configuration can concern for instance 2 different Hive metastores and their data that somehow must be mixed together.
         - We can launch 2 different independent Spark jobs from one common code. Using an orchestration tool seems a better idea though because in the case of the second's session failure, you'll need only to relaunch it without needing to recompute the first dataset.
         - We could use multiple SparkSessions is the case when some external input defines the number of jobs not sharing the same configuration to launch

A **Spark Session vs. Spark application:**

**Spark application** can be used:

- for a single batch job
- an interactive session with multiple jobs
- a long-lived server continually satisfying requests
- A Spark job can consist of more than just a single map and reduce.
- can consist of more than one Spark Session. 

A **SparkSession** on the other hand:

 - is an interaction between two or more entities. 
 - can be created without creating SparkConf, SparkContext or SQLContext, (they’re encapsulated within the SparkSession which is new to Spark 2.0)


## 2. SQL Transformer

You also have the option to use the SQL transformer option where you can write free-form SQL scripts as well.

# SQL Options within regular PySpark calls

1. The expr function in PySparks SQL Function Library
2. PySparks selectExpr function

We will go over all these in detail so buckel up!


Let's start with Spark SQL. But first we need to create a Spark Session!

In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Let's Read in our DataFrame for this Notebook

### About this data

Recorded crime for the Police Force Areas of England and Wales. The data are rolling 12-month totals, with points at the end of each financial year between year ending March 2003 to March 2007 and at the end of each quarter from June 2007.

**Source:** https://www.kaggle.com/r3w0p4/recorded-crime-data-at-police-force-area-level

In [2]:
# Start by reading a basic csv dataset
# Let Spark know about the header and infer the Schema types!

path = 'Datasets/'

crime = spark.read.csv(path+"rec-crime-pfa.csv",header=True,inferSchema=True)

In [3]:
# This is way better
crime.limit(5).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Rolling year total number of offences
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Death or serious injury caused by illegal driving,2
4,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561


In [4]:
print(crime.printSchema())

root
 |-- 12 months ending: string (nullable = true)
 |-- PFA: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Offence: string (nullable = true)
 |-- Rolling year total number of offences: integer (nullable = true)

None


So, in order for us to perform SQL calls off of this dataframe, we will need to rename any variables that have spaces in them. We will not be using the first variable so I'll leave that one as is, but we will be using the last variable, so I will go ahead and change that to Count so we can work with it. 

In [5]:
df = crime.withColumnRenamed('Rolling year total number of offences','Count') #.withColumn("12 months ending", crime["12 months ending"].cast(DateType())).
print(df.printSchema()) 

root
 |-- 12 months ending: string (nullable = true)
 |-- PFA: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Offence: string (nullable = true)
 |-- Count: integer (nullable = true)

None


In [7]:
# Create a temporary view of the dataframe
df.createOrReplaceTempView("tempview")

In [11]:
# Then Query the temp view
spark.sql("SELECT * FROM tempview WHERE Count > 1000 limit 10").limit(5).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561
4,31/03/2003,Avon and Somerset,South West,Drug offences,2308


In [12]:
# Or choose which vars you want
spark.sql("SELECT Region, PFA FROM tempview WHERE Count > 1000").limit(5).toPandas()

Unnamed: 0,Region,PFA
0,South West,Avon and Somerset
1,South West,Avon and Somerset
2,South West,Avon and Somerset
3,South West,Avon and Somerset
4,South West,Avon and Somerset


In [16]:
# You can also pass your query results to an object 
# (we don't need to use .collect() here)
sql_results = spark.sql("SELECT * FROM tempview WHERE Count > 1000 AND Region='South West'")
sql_results.limit(5).toPandas()
# type(sql_results)

Unnamed: 0,12 months ending,PFA,Region,Offence,Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561
4,31/03/2003,Avon and Somerset,South West,Drug offences,2308


In [11]:
# We can even do aggregated "group by" calls like this
spark.sql("SELECT Region, sum(Count) AS Total FROM tempview GROUP BY Region").limit(5).toPandas()

Unnamed: 0,Region,Total
0,Fraud: CIFAS,7678981
1,North West,30235732
2,British Transport Police,3029117
3,Wales,11137260
4,London,42691902


basically anything goes

**Bonus!** <br>
Not included in the lecture, but thought some of you may enjoy this. If you want to write more freeform style SQL you can enclose your query in triple quotes like this. Here I have shown an example using CTE which a more advanced SQL procedure. A common table expression (CTE) defines a temporary result set that a user can reference possibly multiple times within the scope of a SQL statement. A CTE is used mainly in a SELECT statement. Many people find this super useful. 

In [17]:
# We could also do more complex SQL queries like CTE (not included )
spark.sql("""WITH t AS (
    WITH tempview AS (SELECT 1)
    SELECT * FROM tempview
)
SELECT * FROM t;""").toPandas()

Unnamed: 0,1
0,1


### SQL Transformer

You also have the option to use the SQL transformer option where you can write freeform SQL scripts.

In [19]:
# First we need to import SQL transformer
from pyspark.ml.feature import SQLTransformer

In [20]:
# Then we create an SQL call 
sqlTrans = SQLTransformer(
    statement="SELECT PFA,Region,Offence FROM __THIS__") 
# And use it to transform our df object
sqlTrans.transform(df).show(5)

+-----------------+----------+--------------------+
|              PFA|    Region|             Offence|
+-----------------+----------+--------------------+
|Avon and Somerset|South West|All other theft o...|
|Avon and Somerset|South West|       Bicycle theft|
|Avon and Somerset|South West|Criminal damage a...|
|Avon and Somerset|South West|Death or serious ...|
|Avon and Somerset|South West|   Domestic burglary|
+-----------------+----------+--------------------+
only showing top 5 rows



In [21]:
type(sqlTrans)

pyspark.ml.feature.SQLTransformer

In [22]:
# Note that "__THIS__" is a special word and cannot be change to __THAT__ for example
sqlTrans = SQLTransformer(
    statement="SELECT PFA,Region,Offence FROM __THAT__") 
# And use it to transform our df object
sqlTrans.transform(df).show(5)

AnalysisException: Table or view not found: __THAT__; line 1 pos 31;
'Project ['PFA, 'Region, 'Offence]
+- 'UnresolvedRelation [__THAT__], [], false


In [23]:
# Also Note that a call like this won't work...
SQLTransformer(statement="SELECT PFA,Region,Offence FROM __THIS__").show()

AttributeError: 'SQLTransformer' object has no attribute 'show'

**Now how about a group by call**

In [23]:
#Note that this call will not work on the original dataframe "crime" when the variable "Count" is a string

sqlTrans = SQLTransformer(
    statement="SELECT Offence, SUM(Count) as Total FROM __THIS__ GROUP BY Offence") 
sqlTrans.transform(df).show(5)

+--------------------+--------+
|             Offence|   Total|
+--------------------+--------+
|Public order offe...|10925676|
|       Bicycle theft| 5297006|
|Residential burglary| 1671469|
|Violence without ...|16590158|
|All other theft o...|30979393|
+--------------------+--------+
only showing top 5 rows



**And a where statement**

In [24]:
sqlTrans = SQLTransformer(
    statement="SELECT PFA,Offence FROM __THIS__ WHERE Count > 1000") 
sqlTrans.transform(df).show(5)

+-----------------+--------------------+
|              PFA|             Offence|
+-----------------+--------------------+
|Avon and Somerset|All other theft o...|
|Avon and Somerset|       Bicycle theft|
|Avon and Somerset|Criminal damage a...|
|Avon and Somerset|   Domestic burglary|
|Avon and Somerset|       Drug offences|
+-----------------+--------------------+
only showing top 5 rows



**You can also, of course, read the output into a dataframe**

In [18]:
result = sqlTrans.transform(df)
result.show(5)

+-----------------+--------------------+
|              PFA|             Offence|
+-----------------+--------------------+
|Avon and Somerset|All other theft o...|
|Avon and Somerset|       Bicycle theft|
|Avon and Somerset|Criminal damage a...|
|Avon and Somerset|   Domestic burglary|
|Avon and Somerset|       Drug offences|
+-----------------+--------------------+
only showing top 5 rows



# SQL Options within regular PySpark calls

### The expr function in PySparks SQL Function Library

You can also use the expr function within the pyspark.sql.functions library coupled with either PySpark's withColumn function or the select function.

In [19]:
# First we need to read in the library
from pyspark.sql.functions import expr 

Let's add a percent column to the dataframe. To do this, first we need to get the total number of rows in the dataframe (we can't soft this unfortunatly).

In [20]:
sqlTrans = SQLTransformer(
    statement="SELECT SUM(Count) as Total FROM __THIS__") 
sqlTrans.transform(df).show(5)

+---------+
|    Total|
+---------+
|244720928|
+---------+



In [21]:
# We could add a percent column to our df 
# that shows the offence %
# with the "withColumn" command
df.withColumn("percent",expr("round((count/244720928)*100,2)")).show()

+----------------+-----------------+----------+--------------------+-----+-------+
|12 months ending|              PFA|    Region|             Offence|Count|percent|
+----------------+-----------------+----------+--------------------+-----+-------+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|    0.0|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|   0.01|
|      31/03/2003|Avon and Somerset|South West|Death or serious ...|    2|    0.0|
|      31/03/2003|Avon and Somerset|South West|   Domestic burglary|14561|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Drug offences| 2308|    0.0|
|      31/03/2003|Avon and Somerset|South West|      Fraud offences| 5339|    0.0|
|      31/03/2003|Avon and Somerset|South West|            Homicide|   19|    0.0|
|      31/03/2003|Avon and Somerset|South West|Miscellaneous cri...| 1597|    0.0|
|   

In [22]:
# Same thing with the "select" command
df.select("*",expr("round((count/244720928)*100,2) AS percent")).show()

+----------------+-----------------+----------+--------------------+-----+-------+
|12 months ending|              PFA|    Region|             Offence|Count|percent|
+----------------+-----------------+----------+--------------------+-----+-------+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|    0.0|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|   0.01|
|      31/03/2003|Avon and Somerset|South West|Death or serious ...|    2|    0.0|
|      31/03/2003|Avon and Somerset|South West|   Domestic burglary|14561|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Drug offences| 2308|    0.0|
|      31/03/2003|Avon and Somerset|South West|      Fraud offences| 5339|    0.0|
|      31/03/2003|Avon and Somerset|South West|            Homicide|   19|    0.0|
|      31/03/2003|Avon and Somerset|South West|Miscellaneous cri...| 1597|    0.0|
|   

### PySparks selectExpr function

Very similar idea here but slightly different syntax.

In [23]:
df.selectExpr("*","round((count/244720928)*100,2) AS percent").filter("Region ='South West'").show()

+----------------+-----------------+----------+--------------------+-----+-------+
|12 months ending|              PFA|    Region|             Offence|Count|percent|
+----------------+-----------------+----------+--------------------+-----+-------+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|    0.0|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|   0.01|
|      31/03/2003|Avon and Somerset|South West|Death or serious ...|    2|    0.0|
|      31/03/2003|Avon and Somerset|South West|   Domestic burglary|14561|   0.01|
|      31/03/2003|Avon and Somerset|South West|       Drug offences| 2308|    0.0|
|      31/03/2003|Avon and Somerset|South West|      Fraud offences| 5339|    0.0|
|      31/03/2003|Avon and Somerset|South West|            Homicide|   19|    0.0|
|      31/03/2003|Avon and Somerset|South West|Miscellaneous cri...| 1597|    0.0|
|   

## That's all folks! Great job!

In [None]:
# Speed test

In [25]:
import time

In [29]:
before = time.time()
spark.sql("SELECT * FROM tempview WHERE Count > 1000").show()
after = time.time()
after - before

+----------------+-----------------+----------+--------------------+-----+
|12 months ending|              PFA|    Region|             Offence|Count|
+----------------+-----------------+----------+--------------------+-----+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|
|      31/03/2003|Avon and Somerset|South West|   Domestic burglary|14561|
|      31/03/2003|Avon and Somerset|South West|       Drug offences| 2308|
|      31/03/2003|Avon and Somerset|South West|      Fraud offences| 5339|
|      31/03/2003|Avon and Somerset|South West|Miscellaneous cri...| 1597|
|      31/03/2003|Avon and Somerset|South West|Non-domestic burg...|15621|
|      31/03/2003|Avon and Somerset|South West|Public order offe...| 4025|
|      31/03/2003|Avon and Somerset|South West|             Robbery| 3504|
|      31/03/2003|Avon an

0.15072178840637207

In [30]:
before = time.time()
# Then we create an SQL call 
sqlTrans = SQLTransformer(
    statement="SELECT * FROM __THIS__ WHERE Count > 1000")
# And use it to transform our df object
sqlTrans.transform(df).show()
after = time.time()
after - before

+----------------+-----------------+----------+--------------------+-----+
|12 months ending|              PFA|    Region|             Offence|Count|
+----------------+-----------------+----------+--------------------+-----+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|
|      31/03/2003|Avon and Somerset|South West|   Domestic burglary|14561|
|      31/03/2003|Avon and Somerset|South West|       Drug offences| 2308|
|      31/03/2003|Avon and Somerset|South West|      Fraud offences| 5339|
|      31/03/2003|Avon and Somerset|South West|Miscellaneous cri...| 1597|
|      31/03/2003|Avon and Somerset|South West|Non-domestic burg...|15621|
|      31/03/2003|Avon and Somerset|South West|Public order offe...| 4025|
|      31/03/2003|Avon and Somerset|South West|             Robbery| 3504|
|      31/03/2003|Avon an

0.14869451522827148

In [31]:
before = time.time()
# Then we create an SQL call 
df.filter("Count > 1000").show()
#SQLTransformer(statement="SELECT * FROM __THIS__ WHERE Count > 1000").transform(df).show()
after = time.time()
after - before

+----------------+-----------------+----------+--------------------+-----+
|12 months ending|              PFA|    Region|             Offence|Count|
+----------------+-----------------+----------+--------------------+-----+
|      31/03/2003|Avon and Somerset|South West|All other theft o...|25959|
|      31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|
|      31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|
|      31/03/2003|Avon and Somerset|South West|   Domestic burglary|14561|
|      31/03/2003|Avon and Somerset|South West|       Drug offences| 2308|
|      31/03/2003|Avon and Somerset|South West|      Fraud offences| 5339|
|      31/03/2003|Avon and Somerset|South West|Miscellaneous cri...| 1597|
|      31/03/2003|Avon and Somerset|South West|Non-domestic burg...|15621|
|      31/03/2003|Avon and Somerset|South West|Public order offe...| 4025|
|      31/03/2003|Avon and Somerset|South West|             Robbery| 3504|
|      31/03/2003|Avon an

0.1356346607208252