# **SQL Options in Spark**

PySpark provides two main options when it comes to using straight SQL. Spark SQL and SQL Transformer.

### **1) Spark SQL**

Spark TempView provides two functions that allow users to run **SQL** queries against a Spark Dataframe.

- **createOrReplaceTempView:** The lifetime of this temporary view is tied to the SparkSession that was used to create the dataset. It creates (or replaces if that view name already exists) a lazily evaluated "view" that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view.
- **createGlobalTempView:** The lifetime of this temporary view is tied to this Spark application. This feature is useful when you want to share data among different sessions and keep alive until your application ends.
  
A **Spark Session vs. Spark Application:**

**Spark application** can be used:
- for a single batch job;
- an interactive session with multiple jobs;
- a long-lived server continually satisfying requests;
- A Spark job can consist of more than just a single map and reduce;
- can constst of more than one Spark Session.
  
A **Spark Session** on the other hand:
- is an interaction between two or more entities;
- can be created without creating SparkConf, SparkContext or SQLContext, (they're encapsulated within the SparkSession which is new to Spark 2.0).

### **2) SQL Transformer**

You also have the option to use the SQL transformer option where you can write free-form SQL scripts as well.

# **SQL Options within regular PySpark calls**

1. The expr function in PySpark SQL Function library;
2. PySpark selectExpr function

We will go over all these in detail so buckel up!

Let's start with Spark SQL. But first we need to create a Spark Session.

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
spark

In [6]:
path = './data/'

crime = spark.read.csv(path+'rec-crime-pfa.csv', header=True, inferSchema=True)
crime.printSchema()

root
 |-- 12 months ending: string (nullable = true)
 |-- PFA: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Offence: string (nullable = true)
 |-- Rolling year total number of offences: integer (nullable = true)



In [7]:
crime.limit(5).toPandas()

Unnamed: 0,12 months ending,PFA,Region,Offence,Rolling year total number of offences
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Death or serious injury caused by illegal driving,2
4,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561


In [9]:
df = crime.withColumnRenamed('12 months ending', '12_month_ending') \
        .withColumnRenamed('Rolling year total number of offences', 'crime_count')

df.printSchema()

root
 |-- 12_month_ending: string (nullable = true)
 |-- PFA: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Offence: string (nullable = true)
 |-- crime_count: integer (nullable = true)



In [10]:
df.createOrReplaceTempView('tempview')

In [13]:
spark.sql('SELECT * FROM tempview WHERE crime_count > 1000').limit(5).toPandas()

Unnamed: 0,12_month_ending,PFA,Region,Offence,crime_count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561
4,31/03/2003,Avon and Somerset,South West,Drug offences,2308


In [14]:
spark.sql('SELECT Region, Offence FROM tempview WHERE crime_count > 1000').limit(5).toPandas()

Unnamed: 0,Region,Offence
0,South West,All other theft offences
1,South West,Bicycle theft
2,South West,Criminal damage and arson
3,South West,Domestic burglary
4,South West,Drug offences


In [None]:
sql_results = spark.sql('SELECT Region, Offence FROM tempview WHERE crime_count > 2000')
sql_results.limit(5).toPandas()

In [15]:
spark.sql('SELECT Region, sum(crime_count) AS Total FROM tempview GROUP BY Region').toPandas()

Unnamed: 0,Region,Total
0,Fraud: CIFAS,7678981
1,North West,30235732
2,British Transport Police,3029117
3,Wales,11137260
4,London,42691902
5,South East,30911995
6,Fraud: Action Fraud,5921984
7,Fraud: UK Finance,2925861
8,South West,17985880
9,East,19890612


In [16]:
from pyspark.ml.feature import SQLTransformer

In [17]:
sqlTrans = SQLTransformer(statement='SELECT PFA, Region, Offence FROM __THIS__')

In [19]:
sqlTrans.transform(df).show(5, truncate=False)

+-----------------+----------+-------------------------------------------------+
|PFA              |Region    |Offence                                          |
+-----------------+----------+-------------------------------------------------+
|Avon and Somerset|South West|All other theft offences                         |
|Avon and Somerset|South West|Bicycle theft                                    |
|Avon and Somerset|South West|Criminal damage and arson                        |
|Avon and Somerset|South West|Death or serious injury caused by illegal driving|
|Avon and Somerset|South West|Domestic burglary                                |
+-----------------+----------+-------------------------------------------------+
only showing top 5 rows



In [20]:
type(sqlTrans)

pyspark.ml.feature.SQLTransformer

In [29]:
sqlTransGroup = SQLTransformer(statement='SELECT Region, sum(crime_count) AS Total FROM __THIS__ GROUP BY Region')

In [30]:
sqlTransGroup.transform(df).show(5, truncate=False)

+------------------------+--------+
|Region                  |Total   |
+------------------------+--------+
|Fraud: CIFAS            |7678981 |
|North West              |30235732|
|British Transport Police|3029117 |
|Wales                   |11137260|
|London                  |42691902|
+------------------------+--------+
only showing top 5 rows



In [31]:
from pyspark.sql.functions import expr

In [33]:
spark.sql('SELECT sum(crime_count) from tempview').show()

+----------------+
|sum(crime_count)|
+----------------+
|       244720928|
+----------------+



In [46]:
df_percent = df.withColumn('percent', expr('round((crime_count/244720928)*100, 2)'))

df_percent.show(truncate=False)

+---------------+-----------------+----------+-------------------------------------------------+-----------+-------+
|12_month_ending|PFA              |Region    |Offence                                          |crime_count|percent|
+---------------+-----------------+----------+-------------------------------------------------+-----------+-------+
|31/03/2003     |Avon and Somerset|South West|All other theft offences                         |25959      |0.01   |
|31/03/2003     |Avon and Somerset|South West|Bicycle theft                                    |3090       |0.0    |
|31/03/2003     |Avon and Somerset|South West|Criminal damage and arson                        |26202      |0.01   |
|31/03/2003     |Avon and Somerset|South West|Death or serious injury caused by illegal driving|2          |0.0    |
|31/03/2003     |Avon and Somerset|South West|Domestic burglary                                |14561      |0.01   |
|31/03/2003     |Avon and Somerset|South West|Drug offences     

In [48]:
df_percent.orderBy(df_percent['percent'].desc()).show(truncate=False)

+---------------+------------+-------------------+------------+-----------+-------+
|12_month_ending|PFA         |Region             |Offence     |crime_count|percent|
+---------------+------------+-------------------+------------+-----------+-------+
|31/12/2018     |Action Fraud|Fraud: Action Fraud|Action Fraud|306126     |0.13   |
|31/03/2017     |CIFAS       |Fraud: CIFAS       |CIFAS       |306195     |0.13   |
|30/06/2016     |CIFAS       |Fraud: CIFAS       |CIFAS       |308901     |0.13   |
|30/09/2015     |CIFAS       |Fraud: CIFAS       |CIFAS       |283654     |0.12   |
|30/06/2018     |Action Fraud|Fraud: Action Fraud|Action Fraud|285070     |0.12   |
|31/03/2016     |CIFAS       |Fraud: CIFAS       |CIFAS       |298968     |0.12   |
|31/12/2015     |CIFAS       |Fraud: CIFAS       |CIFAS       |295525     |0.12   |
|30/12/2017     |CIFAS       |Fraud: CIFAS       |CIFAS       |283288     |0.12   |
|30/09/2018     |Action Fraud|Fraud: Action Fraud|Action Fraud|295549     |0

In [51]:
df.selectExpr('*', 'round((crime_count/244720928)*100, 2) as percent').filter("Region = 'South West'").toPandas()

Unnamed: 0,12_month_ending,PFA,Region,Offence,crime_count,percent
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959,0.01
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090,0.00
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202,0.01
3,31/03/2003,Avon and Somerset,South West,Death or serious injury caused by illegal driving,2,0.00
4,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561,0.01
...,...,...,...,...,...,...
5265,31/12/2018,Wiltshire,South West,Stalking and harassment,2380,0.00
5266,31/12/2018,Wiltshire,South West,Theft from the person,347,0.00
5267,31/12/2018,Wiltshire,South West,Vehicle offences,2895,0.00
5268,31/12/2018,Wiltshire,South West,Violence with injury,5701,0.00
