# SQL and Spark Recitation Fall 2022



**Before proceeding, make sure you download/make a copy of this Colab notebook!**

A relational database is one type of database. It uses a structure that allows us to identify and access data in relation to another piece of data in the database. Data in a relational database is organized into tables.

### SQL recap

Recall that SQL is a language to perform operations on tabular data (e.g. selection, projection, joins, etc…)

- We write queries in SQL to retrieve data and answer questions about it.
- Declarative Language (not procedural) - You describe what the result you want is, NOT how to obtain the result.

Using an SQL query, you can create and delete, or modify tables, as well as select, insert, and delete data from existing tables.

NOTE: The exact syntax of SQL may vary depending on the underlying database you are using. But most are very similar.


In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import sqlite3


In [2]:
!pip install pandasql


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pandasql
  Downloading pandasql-0.7.3.tar.gz (26 kB)
Building wheels for collected packages: pandasql
  Building wheel for pandasql (setup.py) ... [?25l[?25hdone
  Created wheel for pandasql: filename=pandasql-0.7.3-py3-none-any.whl size=26784 sha256=27fa634573ef4d438b5a5b1cd76e419d906678656322f45b98d30f74d982ee61
  Stored in directory: /root/.cache/pip/wheels/5c/4b/ec/41f4e116c8053c3654e2c2a47c62b4fca34cc67ef7b55deb7f
Successfully built pandasql
Installing collected packages: pandasql
Successfully installed pandasql-0.7.3


In [3]:
import pandasql as ps


In [4]:
# Set up a database
conn = sqlite3.connect('recitationTest.db')

### Our Dataset

#### Spaceship Management Database 

We want to keep track of:
- Crew members and their roles (captain,scientist, etc…)
- Equipment (centrifuge, lab gloves, soldering stations, etc..).
- Hours and what days crew members worked.
- Which crew members manage which equipment.
- We want to ask questions about this data.

### Setting up our data tables
      

In [5]:
### Create Tables with Schema
# Data on Crew
conn.execute("""
DROP TABLE IF EXISTS crew
""")

conn.execute('''
CREATE TABLE crew (
id INTEGER PRIMARY KEY,
name TEXT,
rank INTEGER,
role_id INTEGER
)''')

print('done')

done


In [6]:
# Roles
conn.execute("""
DROP TABLE IF EXISTS roles
""")

conn.execute('''
CREATE TABLE roles (
role_id INTEGER PRIMARY KEY,
name TEXT
)''')

print('done')

done


In [7]:
#Equipment
conn.execute("""
DROP TABLE IF EXISTS equipment
""")

conn.execute('''
CREATE TABLE equipment (
id INTEGER PRIMARY KEY,
name TEXT
)''')

print('done')

done


In [8]:
#Manages 
conn.execute("""
DROP TABLE IF EXISTS manages
""")

conn.execute('''
CREATE TABLE manages (
id INTEGER PRIMARY KEY,
crew_id INTEGER,
equip_id INTEGER
)
''')

print('done')

done


In [9]:
#Worklog
conn.execute("""
DROP TABLE IF EXISTS worklog
""")

conn.execute('''
CREATE TABLE worklog (
id INTEGER PRIMARY KEY,
crew_id INTEGER,
day INTEGER,
hours INTEGER
)
''')

print('done')

done


Now let's insert values into the database

In [10]:
conn.execute('''
INSERT INTO crew VALUES
(1, "Jane", 10, 1),
(2, "Dan", 9, 2),
(3, "Alex", 4, 3),
(4, "Jen", 4, 4),
(5, "Brandon", 1, NULL)
''')


conn.execute('''
INSERT INTO roles VALUES
(1, "captain"),
(2, "scientist"),
(3, "engineer"),
(4, "engineer 2")
''')


conn.execute('''
INSERT INTO equipment VALUES
(1, "Centrifuge"),
(2, "Soldering Station"),
(3, "Notebook"),
(4, "Chemical Z")
''')


conn.execute('''
INSERT INTO manages VALUES
(1, 2, 1),
(2, 3, 2),
(3, 1, 3),
(4, 2, 4),
(5, 1, 4)
''')


conn.execute('''
INSERT INTO worklog VALUES
(1, 1, 1, 10),
(2, 2, 1, 5),
(3, 3, 1, 8),
(4, 4, 1, 12),
(5, 1, 2, 5),
(6, 2, 2, 8),
(7, 3, 2, 9),
(8, 4, 2, 8),
(9, 4, 2, 2)
''')



<sqlite3.Cursor at 0x7f91e4b3bce0>

### Examining the Data

In [11]:
#Load the crew table
crew_df = pd.read_sql('''SELECT *
                        FROM crew''', conn) #Selecting all columns here
crew_df

Unnamed: 0,id,name,rank,role_id
0,1,Jane,10,1.0
1,2,Dan,9,2.0
2,3,Alex,4,3.0
3,4,Jen,4,4.0
4,5,Brandon,1,


In [12]:
crew_df.dtypes

id           int64
name        object
rank         int64
role_id    float64
dtype: object

In [13]:
roles_df = pd.read_sql('''SELECT *
                        FROM roles''', conn) #Selecting all columns here
roles_df

Unnamed: 0,role_id,name
0,1,captain
1,2,scientist
2,3,engineer
3,4,engineer 2


In [14]:
roles_df.dtypes

role_id     int64
name       object
dtype: object

In [15]:
equipment_df = pd.read_sql('''SELECT *
                        FROM equipment''', conn) #Selecting all columns here
equipment_df

Unnamed: 0,id,name
0,1,Centrifuge
1,2,Soldering Station
2,3,Notebook
3,4,Chemical Z


In [16]:
manages_df = pd.read_sql('''SELECT *
                        FROM manages''', conn) #Selecting all columns here
manages_df

Unnamed: 0,id,crew_id,equip_id
0,1,2,1
1,2,3,2
2,3,1,3
3,4,2,4
4,5,1,4


In [17]:
worklog_df = pd.read_sql('''SELECT *
                        FROM worklog''', conn) #Selecting all columns here
worklog_df

Unnamed: 0,id,crew_id,day,hours
0,1,1,1,10
1,2,2,1,5
2,3,3,1,8
3,4,4,1,12
4,5,1,2,5
5,6,2,2,8
6,7,3,2,9
7,8,4,2,8
8,9,4,2,2


Now that we have formed dataframes for our tables, we can use pandasql. 
The idea of pandasql is to make Python speak SQL! 
You can find more information here: https://community.alteryx.com/t5/Data-Science-Blog/pandasql-Make-python-speak-SQL/ba-p/138435


Suppose we just want to list down the names of crew members! 'Select' helps in retrieving rows and columns which we would like to see 

In [18]:
### Select only the names of crew members
query_crew_names = '''SELECT name FROM crew_df'''
crew_names_df = ps.sqldf(query_crew_names, locals())
crew_names_df

Unnamed: 0,name
0,Jane
1,Dan
2,Alex
3,Jen
4,Brandon


#### Conditional Retrieval

We use the WHERE clause to apply a condition to our retrieval.

In [19]:
#Load the crew table
#####Retrieve all tuples where crew members have rank either 10 or 4 and their name starts with letter J
query_conditional = '''SELECT *
                        FROM crew_df
                        WHERE rank IN (4,10) AND name LIKE 'j%'
                        '''
crew_rank_df = ps.sqldf(query_conditional, locals()) #Selecting all columns here
crew_rank_df

Unnamed: 0,id,name,rank,role_id
0,1,Jane,10,1.0
1,4,Jen,4,4.0


#### Ordering

You can order your results by values in the columns.

Let’s retrieve the equipment list in increasing lexicographic
order.

In [20]:
query_ordering2 = '''SELECT * FROM manages_df ORDER BY crew_id ASC, equip_id DESC '''
manages_order_df = ps.sqldf(query_ordering2, locals())
manages_order_df

Unnamed: 0,id,crew_id,equip_id
0,5,1,4
1,3,1,3
2,4,2,4
3,1,2,1
4,2,3,2


Use DESC for descending.

You can order by multiple columns. List from highest priority to least. If there is an equal value in a column, the next one in the list will be used. E.g. ORDER BY name, id

#### Distinct Values

You can retrieve a unique set of values only. For example, let’s retrieve a list of all ranks that are assigned to our crew members (without any duplicates).

In [21]:
query_allRanks = '''SELECT rank FROM crew_df '''
all_ranks_df = ps.sqldf(query_allRanks, locals())
all_ranks_df

Unnamed: 0,rank
0,10
1,9
2,4
3,4
4,1


In [22]:
query_distinctRanks = '''SELECT DISTINCT rank FROM crew_df '''
distinct_ranks_df = ps.sqldf(query_distinctRanks, locals())
distinct_ranks_df

Unnamed: 0,rank
0,10
1,9
2,4
3,1


#### Null Values

Unless you specify in the schema (e.g. when creating the table), all values could take on NULL (except for primary key). 

In [23]:
query_null = '''SELECT * FROM crew_df WHERE role_id IS NULL'''
null_row_df = ps.sqldf(query_null, locals())
null_row_df

Unnamed: 0,id,name,rank,role_id
0,5,Brandon,1,


### Spark SQL

**Note**: the following cells are for Recitation 5, feel free to ignore them for now.

#### Loading Spark

***Warning:*** The following cell will take approx. 2 minutes to run!

In [24]:
!apt install libkrb5-dev
!wget https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!pip install findspark
!pip install pyspark==3.2.2
!apt update
!apt install gcc python-dev libkrb5-dev

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  comerr-dev krb5-multidev libcom-err2 libgssrpc4 libkadm5clnt-mit11
  libkadm5srv-mit11 libkdb5-9
Suggested packages:
  doc-base krb5-doc krb5-user
The following NEW packages will be installed:
  comerr-dev krb5-multidev libgssrpc4 libkadm5clnt-mit11 libkadm5srv-mit11
  libkdb5-9 libkrb5-dev
The following packages will be upgraded:
  libcom-err2
1 upgraded, 7 newly installed, 0 to remove and 19 not upgraded.
Need to get 358 kB of archives.
After this operation, 1,992 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 libcom-err2 amd64 1.44.1-1ubuntu1.4 [8,696 B]
Get:2 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 libgssrpc4 amd64 1.16-2ub

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

import os
os.environ['SPARK_HOME'] = "/content/spark-3.2.2-bin-hadoop3.2"
import pyspark
from pyspark.sql import SQLContext



In [26]:
os.environ['SPARK_HOME']

'/content/spark-3.2.2-bin-hadoop3.2'

In [27]:
spark = SparkSession.builder.appName('Recitation5').getOrCreate()

In [28]:
try:
    if(spark == None):
        spark = SparkSession.builder.appName('Initial').getOrCreate()
        sqlContext=SQLContext(spark)
except NameError:
    spark = SparkSession.builder.appName('Initial').getOrCreate()
    sqlContext=SQLContext(spark)


#### Conditional Retrieval

In [29]:
crew_sdf = spark.createDataFrame(crew_df)

In [30]:
crew_sdf.show()

+---+-------+----+-------+
| id|   name|rank|role_id|
+---+-------+----+-------+
|  1|   Jane|  10|    1.0|
|  2|    Dan|   9|    2.0|
|  3|   Alex|   4|    3.0|
|  4|    Jen|   4|    4.0|
|  5|Brandon|   1|    NaN|
+---+-------+----+-------+



In [31]:
crew_sdf.createOrReplaceTempView('crew_spark_table');

In [32]:
# Retrieve all tuples where crew members have rank either 10 or 4 and their name starts with letter J
crew_rank_sdf = spark.sql('SELECT * FROM crew_spark_table WHERE rank IN (4,10) AND name LIKE \'%J%\'')

In [33]:
crew_rank_sdf.show()

+---+----+----+-------+
| id|name|rank|role_id|
+---+----+----+-------+
|  1|Jane|  10|    1.0|
|  4| Jen|   4|    4.0|
+---+----+----+-------+



#### Ordering

In [34]:
# Let’s retrieve the names on the equipment list in increasing lexicographic order.
equipment_sdf = spark.createDataFrame(equipment_df)
equipment_sdf.createOrReplaceTempView('equipment_spark_table')
equip_order_sdf = spark.sql('SELECT name FROM equipment_spark_table ORDER BY name ASC')
equip_order_sdf.show()

+-----------------+
|             name|
+-----------------+
|       Centrifuge|
|       Chemical Z|
|         Notebook|
|Soldering Station|
+-----------------+



#### Where Clause and sdf.count()

In [35]:
# Retrieve the row(s) in crew_rank for Jane
crew_rank_name_row_sdf = crew_rank_sdf.where(crew_rank_sdf.name == 'Jane');
crew_rank_name_row_sdf.show()
print(crew_rank_name_row_sdf.count())

+---+----+----+-------+
| id|name|rank|role_id|
+---+----+----+-------+
|  1|Jane|  10|    1.0|
+---+----+----+-------+

1


#### Aggregate Operation

In [36]:
# Find out the sum and average rank of all crew members
rankSum = crew_sdf.agg({'rank':'sum'})
rankSum.show()
avgRank = crew_sdf.agg({'rank':'avg'});
avgRank.show()

+---------+
|sum(rank)|
+---------+
|       28|
+---------+

+---------+
|avg(rank)|
+---------+
|      5.6|
+---------+



#### Renaming Column in sdf

In [37]:
# Rename the column "name" to "Equipment Name" in equipment_sdf
equipment_renamed_sdf = equipment_sdf.withColumnRenamed('name','Equipment Name');
equipment_renamed_sdf.show()

+---+-----------------+
| id|   Equipment Name|
+---+-----------------+
|  1|       Centrifuge|
|  2|Soldering Station|
|  3|         Notebook|
|  4|       Chemical Z|
+---+-----------------+



#### Grouping

In [38]:
# Find out the count of each rank, name the columns to be "rank" and "count", order by count descending
rank_count_sdf = crew_sdf.groupBy("rank").agg({'name':'count'}).withColumnRenamed('count(name)','count').orderBy(["count"],ascending = [0])
rank_count_sdf.show()

+----+-----+
|rank|count|
+----+-----+
|   4|    2|
|   9|    1|
|   1|    1|
|  10|    1|
+----+-----+



#### Creating an Empty Spark Data frame

Create a scehma for storing information about Engineering buildings. The class column should have arrays with each element being a set of {class,time,room number}

In [39]:
schema = StructType([
            StructField("Id", IntegerType(), False),
            StructField("Name", StringType(), True),
            
            # TODO: Include schema for classes
            ])
empty_sdf = spark.createDataFrame({}, schema)
empty_sdf.show()

+---+----+
| Id|Name|
+---+----+
+---+----+



#### Union Operation

In [None]:
conn.execute("""
DROP TABLE IF EXISTS engineering_buildings
""")

# TODO: Complete the code for DType of classes
conn.execute('''
CREATE TABLE engineering_buildings (
id INTEGER PRIMARY KEY,
Name TEXT,
Classes .....)
''')
conn.execute('''
INSERT INTO engineering_buildings VALUES
(1, "Skirkanich",'[{"CIS 5500", "1:45 PM", "Berger Auditorium"},{"CIS 7000", "3:30 PM", "Berger Auditorium"}]'),
(2, "Towne",'[{"CIS 5450", "1:45 PM", "Towne 100"],["CIS 1230", "3:30 PM", "Towne 319"}]'),
(3, "Levine",'[{"CIS 2340", "1:45 PM", "Levine 101"],["CIS 3450", "3:30 PM", "Levine 612"}]')
''')
#Load the Building table
building_df = pd.read_sql('''SELECT *
                        FROM engineering_buildings''', conn) #Selecting all columns here
building_sdf = spark.createDataFrame(building_df)
building_sdf.show()

+---+----------+--------------------+
| id|      Name|             Classes|
+---+----------+--------------------+
|  1|Skirkanich|[{"CIS 5500", "1:...|
|  2|     Towne|[{"CIS 5450", "1:...|
|  3|    Levine|[{"CIS 2340", "1:...|
+---+----------+--------------------+



In [None]:
empty_sdf = empty_sdf.union(building_sdf)
empty_sdf.show()

NameError: ignored

####Time Example

#### Please download the two csv files and upload them using the upload button on the left panel!

In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: ignored

Please update `driver_standings_filepath` and `drivers_filepath` below with the filepaths to `driver_standings.csv` and `drivers.csv` in your Google Drive!

In [None]:
# TODO: update with appropriate filepaths to driver_standings.csv and drivers.csv in your Google Drive!
driver_standings_filepath = '/content/drive/Shareddrives/CIS 5450 2022 Fall/Recitations/Recitation 5/driver_standings.csv'
drivers_filepath = '/content/drive/Shareddrives/CIS 5450 2022 Fall/Recitations/Recitation 5/drivers.csv'

f1_drivers_standings_sdf = spark.read.format("csv").option("header","true").load(driver_standings_filepath)
f1_drivers_sdf = spark.read.format("csv").option("header","true").load(drivers_filepath)

We need to create a temp view of data before Spark can query it, as Spark does not persist data in memory!

In [None]:
f1_drivers_standings_sdf.createOrReplaceTempView('driver_standings')
f1_drivers_sdf.createOrReplaceTempView('drivers')

In [None]:
f1_drivers_sdf.show()

+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|driverId| driverRef|number|code| forename|   surname|       dob|nationality|                 url|
+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|       1|  hamilton|    44| HAM|    Lewis|  Hamilton|1985-01-07|    British|http://en.wikiped...|
|       2|  heidfeld|    \N| HEI|     Nick|  Heidfeld|1977-05-10|     German|http://en.wikiped...|
|       3|   rosberg|     6| ROS|     Nico|   Rosberg|1985-06-27|     German|http://en.wikiped...|
|       4|    alonso|    14| ALO| Fernando|    Alonso|1981-07-29|    Spanish|http://en.wikiped...|
|       5|kovalainen|    \N| KOV|   Heikki|Kovalainen|1981-10-19|    Finnish|http://en.wikiped...|
|       6|  nakajima|    \N| NAK|   Kazuki|  Nakajima|1985-01-11|   Japanese|http://en.wikiped...|
|       7|  bourdais|    \N| BOU|Sébastien|  Bourdais|1979-02-28|     French|http://en.wikiped...|
|       8|

In [None]:
f1_drivers_standings_sdf.show()

+-----------------+------+--------+------+--------+------------+----+
|driverStandingsId|raceId|driverId|points|position|positionText|wins|
+-----------------+------+--------+------+--------+------------+----+
|                1|    18|       1|    10|       1|           1|   1|
|                2|    18|       2|     8|       2|           2|   0|
|                3|    18|       3|     6|       3|           3|   0|
|                4|    18|       4|     5|       4|           4|   0|
|                5|    18|       5|     4|       5|           5|   0|
|                6|    18|       6|     3|       6|           6|   0|
|                7|    18|       7|     2|       7|           7|   0|
|                8|    18|       8|     1|       8|           8|   0|
|                9|    19|       1|    14|       1|           1|   1|
|               10|    19|       2|    11|       3|           3|   0|
|               11|    19|       3|     6|       6|           6|   0|
|               12| 

In [None]:
import time
f1_drivers_df = f1_drivers_sdf.toPandas()
f1_drivers_standings_df = f1_drivers_standings_sdf.toPandas()


In [None]:
start_time = time.time()
time_test = ps.sqldf('SELECT * from f1_drivers_standings_df WHERE driverId = 1')
time_elapsed_pandas = time.time() - start_time
time_elapsed_pandas

0.9432384967803955

In [None]:
start_time = time.time()
result = spark.sql('SELECT * from driver_standings WHERE driverId = 1')
time_elapsed_spark = time.time() - start_time
time_elapsed_spark

0.14135122299194336

In [None]:
driver_join_sdf = spark.sql('''
SELECT drivers.driverRef, driver_standings.wins, driver_standings.points, driver_standings.position, driver_standings.raceId FROM driver_standings
JOIN drivers
ON driver_standings.driverID = drivers.driverID
''')
driver_join_sdf.show()

+----------+----+------+--------+------+
| driverRef|wins|points|position|raceId|
+----------+----+------+--------+------+
|  hamilton|   1|    10|       1|    18|
|  heidfeld|   0|     8|       2|    18|
|   rosberg|   0|     6|       3|    18|
|    alonso|   0|     5|       4|    18|
|kovalainen|   0|     4|       5|    18|
|  nakajima|   0|     3|       6|    18|
|  bourdais|   0|     2|       7|    18|
| raikkonen|   0|     1|       8|    18|
|  hamilton|   1|    14|       1|    19|
|  heidfeld|   0|    11|       3|    19|
|   rosberg|   0|     6|       6|    19|
|    alonso|   0|     6|       7|    19|
|kovalainen|   0|    10|       4|    19|
|  nakajima|   0|     3|       9|    19|
|  bourdais|   0|     2|      10|    19|
| raikkonen|   1|    11|       2|    19|
|    kubica|   0|     8|       5|    19|
|    trulli|   0|     5|       8|    19|
|    webber|   0|     2|      11|    19|
| coulthard|   0|     0|      12|    19|
+----------+----+------+--------+------+
only showing top

#### CASE Statement

Create a new column 'decade' which contains the the decade in which the drivers were born

#### UDF(User Defined Function)

Create a column named 'Podium' which is 1 if the 'Position' is 1, 2 or 3 and is 0 otherwise

In [None]:
def podium(position):

  ### Code here

# Register udf as a SQL function. DO NOT EDIT
spark.udf.register("Podium", podium, #ReturnType)