## SQL at Scale with Spark SQL

Welcome to the SQL mini project. For this project, you will use the Databricks Platform and work through a series of exercises using Spark SQL. The dataset size may not be too big but the intent here is to familiarize yourself with the Spark SQL interface which scales easily to huge datasets, without you having to worry about changing your SQL queries. 

The data you need is present in the mini-project folder in the form of three CSV files. This data will be imported in Databricks to create the following tables under the __`country_club`__ database.

<br>
1. The __`bookings`__ table,
2. The __`facilities`__ table, and
3. The __`members`__ table.

You will be uploading these datasets shortly into the Databricks platform to understand how to create a database within minutes! Once the database and the tables are populated, you will be focusing on the mini-project questions.

In the mini project, you'll be asked a series of questions. You can solve them using the databricks platform, but for the final deliverable,
please download this notebook as an IPython notebook (__`File -> Export -> IPython Notebook`__) and upload it to your GitHub.

### Creating the Database

We will first create our database in which we will be creating our three tables of interest

In [0]:
%sql 
drop database if exists country_club1 cascade;
create database country_club1;
show databases;

databaseName
country_club
country_club1
default


### Creating the Tables

In this section, we will be creating the three tables of interest and populate them with the data from the CSV files already available to you. 
To get started, first upload the three CSV files to the DBFS as depicted in the following figure

![](https://i.imgur.com/QcCruBr.png)


Once you have done this, please remember to execute the following code to build the dataframes which will be saved as tables in our database

In [0]:
# File location and type
file_location_bookings = "/FileStore/tables/Bookings.csv"
file_location_facilities = "/FileStore/tables/Facilities.csv"
file_location_members = "/FileStore/tables/Members.csv"

file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bookings_df = (spark.read.format(file_type) 
                    .option("inferSchema", infer_schema) 
                    .option("header", first_row_is_header) 
                    .option("sep", delimiter) 
                    .load(file_location_bookings))

facilities_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_facilities))

members_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_members))

### Viewing the dataframe schemas

We can take a look at the schemas of our potential tables to be written to our database soon

In [0]:
print('Bookings Schema')
bookings_df.printSchema()
print('Facilities Schema')
facilities_df.printSchema()
print('Members Schema')
members_df.printSchema()

Bookings Schema
root
 |-- bookid: integer (nullable = true)
 |-- facid: integer (nullable = true)
 |-- memid: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- slots: integer (nullable = true)

Facilities Schema
root
 |-- facid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- membercost: double (nullable = true)
 |-- guestcost: double (nullable = true)
 |-- initialoutlay: integer (nullable = true)
 |-- monthlymaintenance: integer (nullable = true)

Members Schema
root
 |-- memid: integer (nullable = true)
 |-- surname: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- telephone: string (nullable = true)
 |-- recommendedby: integer (nullable = true)
 |-- joindate: timestamp (nullable = true)



### Create permanent tables
We will be creating three permanent tables here in our __`country_club`__ database as we discussed previously with the following code

In [0]:
permanent_table_name_bookings = "country_club1.Bookings"
bookings_df.write.format("parquet").saveAsTable(permanent_table_name_bookings)

permanent_table_name_facilities = "country_club1.Facilities"
facilities_df.write.format("parquet").saveAsTable(permanent_table_name_facilities)

permanent_table_name_members = "country_club1.Members"
members_df.write.format("parquet").saveAsTable(permanent_table_name_members)

### Refresh tables and check them

In [0]:
%sql
use country_club1;
REFRESH table bookings;
REFRESH table facilities;
REFRESH table members;
show tables;

database,tableName,isTemporary
country_club1,bookings,False
country_club1,facilities,False
country_club1,members,False


### Test a sample SQL query

__Note:__ You can use __`%sql`__ at the beginning of a cell and write SQL queries directly as seen in the following cell. Neat isn't it!

In [0]:
%sql
select * from bookings limit 3

bookid,facid,memid,starttime,slots
0,3,1,2012-07-03T11:00:00.000+0000,2
1,4,1,2012-07-03T08:00:00.000+0000,2
2,6,0,2012-07-03T18:00:00.000+0000,2


In [0]:
bookings_df.limit(3).toPandas()

Unnamed: 0,bookid,facid,memid,starttime,slots
0,0,3,1,2012-07-03 11:00:00,2
1,1,4,1,2012-07-03 08:00:00,2
2,2,6,0,2012-07-03 18:00:00,2


#### Q1: Some of the facilities charge a fee to members, but some do not. Please list the names of the facilities that do.

In [0]:
%sql
select name from facilities where membercost > 0

name
Tennis Court 1
Tennis Court 2
Massage Room 1
Massage Room 2
Squash Court


In [0]:
facilities_df.filter(facilities_df["membercost"] > 0 ).select("name").toPandas()

Unnamed: 0,name
0,Tennis Court 1
1,Tennis Court 2
2,Massage Room 1
3,Massage Room 2
4,Squash Court


####  Q2: How many facilities do not charge a fee to members?

In [0]:
%sql
select count(1) as non_fee_facilities from facilities where membercost = 0

non_fee_facilities
4


In [0]:
facilities_df.filter(facilities_df["membercost"] == 0).count()

Out[67]: 4

#### Q3: How can you produce a list of facilities that charge a fee to members, where the fee is less than 20% of the facility's monthly maintenance cost? 
#### Return the facid, facility name, member cost, and monthly maintenance of the facilities in question.

In [0]:
%sql
select facid, name, membercost, monthlymaintenance from facilities where membercost / monthlymaintenance < .2 and membercost > 0

facid,name,membercost,monthlymaintenance
0,Tennis Court 1,5.0,200
1,Tennis Court 2,5.0,200
4,Massage Room 1,9.9,3000
5,Massage Room 2,9.9,3000
6,Squash Court,3.5,80


In [0]:
facilities_df.filter(facilities_df.membercost / facilities_df.monthlymaintenance < .2) \
    .filter(facilities_df.membercost > 0) \
    .select("facid", "name","membercost","monthlymaintenance") \
    .toPandas()

Unnamed: 0,facid,name,membercost,monthlymaintenance
0,0,Tennis Court 1,5.0,200
1,1,Tennis Court 2,5.0,200
2,4,Massage Room 1,9.9,3000
3,5,Massage Room 2,9.9,3000
4,6,Squash Court,3.5,80


#### Q4: How can you retrieve the details of facilities with ID 1 and 5? Write the query without using the OR operator.

In [0]:
%sql
select * from facilities where facid in (1, 5)

facid,name,membercost,guestcost,initialoutlay,monthlymaintenance
1,Tennis Court 2,5.0,25.0,8000,200
5,Massage Room 2,9.9,80.0,4000,3000


In [0]:

#?
# facilities_df.filter[facilities_df.facid.isin("1","5")].show()
#??? TypeError: 'method' object is not subscriptable

sql = "select * from facilities where facid in (1, 5)"
spark.sql(sql).toPandas()


Unnamed: 0,facid,name,membercost,guestcost,initialoutlay,monthlymaintenance
0,1,Tennis Court 2,5.0,25.0,8000,200
1,5,Massage Room 2,9.9,80.0,4000,3000


#### Q5: How can you produce a list of facilities, with each labelled as 'cheap' or 'expensive', depending on if their monthly maintenance cost is more than $100? 
#### Return the name and monthly maintenance of the facilities in question.

In [0]:
%sql
select 
  name, monthlymaintenance, 
case 
  when monthlymaintenance <= 100 then 'Cheap' 
  else 'Expensive' 
  end as maintenance_type
  from facilities

name,monthlymaintenance,maintenance_type
Tennis Court 1,200,Expensive
Tennis Court 2,200,Expensive
Badminton Court,50,Cheap
Table Tennis,10,Cheap
Massage Room 1,3000,Expensive
Massage Room 2,3000,Expensive
Squash Court,80,Cheap
Snooker Table,15,Cheap
Pool Table,15,Cheap


In [0]:
from pyspark.sql import functions as F
facilities_df.select("name","monthlymaintenance", F.when(facilities_df.monthlymaintenance <= 100, "Cheap").otherwise("Expensive") ).toPandas()

Unnamed: 0,name,monthlymaintenance,CASE WHEN (monthlymaintenance <= 100) THEN Cheap ELSE Expensive END
0,Tennis Court 1,200,Expensive
1,Tennis Court 2,200,Expensive
2,Badminton Court,50,Cheap
3,Table Tennis,10,Cheap
4,Massage Room 1,3000,Expensive
5,Massage Room 2,3000,Expensive
6,Squash Court,80,Cheap
7,Snooker Table,15,Cheap
8,Pool Table,15,Cheap


#### Q6: You'd like to get the first and last name of the last member(s) who signed up. Do not use the LIMIT clause for your solution.

In [0]:
%sql
select firstname, surname from members where joindate = (select max(joindate) from members)

firstname,surname
Darren,Smith


In [0]:
import pandas as pd
df = pd.DataFrame(members_df.select(members_df.firstname, members_df.surname).sort(members_df.joindate, ascending=False).head(1))
df.columns = ["First name", "Last Name"]
df

Unnamed: 0,First name,Last Name
0,Darren,Smith


####  Q7: How can you produce a list of all members who have used a tennis court?
- Include in your output the name of the court, and the name of the member formatted as a single column. 
- Ensure no duplicate data
- Also order by the member name.

In [0]:
%sql
select facility_name, member_name from (
  select distinct 
    facilities.name as facility_name, 
    concat(firstname, ' ', surname) as member_name,
    firstname,
    surname
  from 
    bookings 
      inner join facilities on bookings.facid = facilities.facid and name like 'Tennis Court%'
      inner join members on members.memid = bookings.memid
)
  order by firstname, surname


facility_name,member_name
Tennis Court 1,Anne Baker
Tennis Court 2,Anne Baker
Tennis Court 2,Burton Tracy
Tennis Court 1,Burton Tracy
Tennis Court 1,Charles Owen
Tennis Court 2,Charles Owen
Tennis Court 2,Darren Smith
Tennis Court 1,David Farrell
Tennis Court 2,David Farrell
Tennis Court 2,David Jones


In [0]:
from pyspark.sql.functions import concat, lit

bookings_df \
    .join(facilities_df, [facilities_df.facid == bookings_df.facid, facilities_df.name.like("Tennis Court%") ]) \
    .join(members_df, members_df.memid == bookings_df.memid) \
    .select(facilities_df.name, concat(members_df.firstname, lit(' '), members_df.surname).alias("member_name") ) \
    .distinct() \
    .orderBy(["firstname","surname"]) \
    .toPandas()

Unnamed: 0,name,member_name
0,Tennis Court 1,Anne Baker
1,Tennis Court 2,Anne Baker
2,Tennis Court 1,Burton Tracy
3,Tennis Court 2,Burton Tracy
4,Tennis Court 1,Charles Owen
5,Tennis Court 2,Charles Owen
6,Tennis Court 2,Darren Smith
7,Tennis Court 1,David Farrell
8,Tennis Court 2,David Farrell
9,Tennis Court 1,David Jones


#### Q8: How can you produce a list of bookings on the day of 2012-09-14 which will cost the member (or guest) more than $30? 

- Remember that guests have different costs to members (the listed costs are per half-hour 'slot')
- The guest user's ID is always 0. 

#### Include in your output the name of the facility, the name of the member formatted as a single column, and the cost.

- Order by descending cost, and do not use any subqueries.

In [0]:
%sql
select facilities.name, concat(firstname, ' ', surname) member_name,
  case 
    when members.memid = 0 then (membercost * slots)
    else (guestcost * slots) 
  end as cost
from 
  bookings 
    inner join facilities on bookings.facid = facilities.facid
    inner join members on members.memid = bookings.memid
where 
  cast(starttime as date) = '2012-09-14'
  and ((guestcost * slots > 30 AND bookings.memid = 0) OR (membercost * slots > 30 AND bookings.memid > 0))
order by 
  cost
  


name,member_name,cost
Squash Court,GUEST GUEST,7.0
Squash Court,GUEST GUEST,7.0
Squash Court,GUEST GUEST,14.0
Tennis Court 1,GUEST GUEST,15.0
Tennis Court 1,GUEST GUEST,15.0
Tennis Court 2,GUEST GUEST,15.0
Massage Room 1,GUEST GUEST,19.8
Massage Room 1,GUEST GUEST,19.8
Massage Room 1,GUEST GUEST,19.8
Tennis Court 2,GUEST GUEST,30.0


In [0]:
#from pyspark.sql.functions import to_date
# bookings_df \
#     .join(facilities_df, facilities_df.facid == bookings_df.facid) \
#     .join(members_df, members_df.memid == bookings_df.memid) \
#     .filter(to_date(bookings_df.starttime) == '2012-09-14') \
#     .filter( \
#                 ( \
#                     ( \
#                         (bookings_df.guestcost * bookings_df.slots) > 30 \
#                     ) & (bookings_df.memid == 0) \
#                 ) \
#                 | \
#                 (\
#                     (\
#                         (\
#                             (bookings_df.membercost * bookings_df.slots) > 30 & (bookings_df.memid > 0 )\
#                         )\
#                     )\
#                 ) \
#            )
    
    # ???????? 'DataFrame' object has no attribute 'guestcost'
    # Yes it does

# I wouldn't write a complex query in that anyway.
sql = '''
select facilities.name, concat(firstname, ' ', surname) member_name,
  case 
    when members.memid = 0 then (membercost * slots)
    else (guestcost * slots) 
  end as cost
from 
  bookings 
    inner join facilities on bookings.facid = facilities.facid
    inner join members on members.memid = bookings.memid
where 
  cast(starttime as date) = '2012-09-14'
  and ((guestcost * slots > 30 AND bookings.memid = 0) OR (membercost * slots > 30 AND bookings.memid > 0))
order by 
  cost
'''

df = spark.sql(sql)
df.toPandas()






Unnamed: 0,name,member_name,cost
0,Squash Court,GUEST GUEST,7.0
1,Squash Court,GUEST GUEST,7.0
2,Squash Court,GUEST GUEST,14.0
3,Tennis Court 1,GUEST GUEST,15.0
4,Tennis Court 1,GUEST GUEST,15.0
5,Tennis Court 2,GUEST GUEST,15.0
6,Massage Room 1,GUEST GUEST,19.8
7,Massage Room 1,GUEST GUEST,19.8
8,Massage Room 1,GUEST GUEST,19.8
9,Tennis Court 2,GUEST GUEST,30.0


#### Q9: This time, produce the same result as in Q8, but using a subquery.

In [0]:
%sql
select 
  (select max(concat(firstname, ' ', surname)) from members where members.memid = bookings.memid) as name,
  facilities.name,
  case 
    when bookings.memid = 0 then (guestcost * slots)
    else (membercost * slots) 
  end as cost
from 
  bookings 
    inner join facilities on bookings.facid = facilities.facid    
where 
  cast(starttime as date) = '2012-09-14'
  and ((guestcost * slots > 30 AND bookings.memid = 0) OR (membercost * slots > 30 AND bookings.memid > 0))
order by 
  cost


name,name.1,cost
GUEST GUEST,Squash Court,35.0
GUEST GUEST,Squash Court,35.0
Jemima Farrell,Massage Room 1,39.6
GUEST GUEST,Squash Court,70.0
GUEST GUEST,Tennis Court 1,75.0
GUEST GUEST,Tennis Court 1,75.0
GUEST GUEST,Tennis Court 2,75.0
GUEST GUEST,Tennis Court 2,150.0
GUEST GUEST,Massage Room 1,160.0
GUEST GUEST,Massage Room 1,160.0


In [0]:
sql = '''

select 
  (select max(concat(firstname, ' ', surname)) from members where members.memid = bookings.memid) as name,
  facilities.name,
  case 
    when bookings.memid = 0 then (guestcost * slots)
    else (membercost * slots) 
  end as cost
from 
  bookings 
    inner join facilities on bookings.facid = facilities.facid    
where 
  cast(starttime as date) = '2012-09-14'
  and ((guestcost * slots > 30 AND bookings.memid = 0) OR (membercost * slots > 30 AND bookings.memid > 0))
order by 
  cost
'''

df = spark.sql(sql)
df.toPandas()

Unnamed: 0,name,name.1,cost
0,GUEST GUEST,Squash Court,35.0
1,GUEST GUEST,Squash Court,35.0
2,Jemima Farrell,Massage Room 1,39.6
3,GUEST GUEST,Squash Court,70.0
4,GUEST GUEST,Tennis Court 1,75.0
5,GUEST GUEST,Tennis Court 1,75.0
6,GUEST GUEST,Tennis Court 2,75.0
7,GUEST GUEST,Tennis Court 2,150.0
8,GUEST GUEST,Massage Room 1,160.0
9,GUEST GUEST,Massage Room 1,160.0


#### Q10: Produce a list of facilities with a total revenue less than 1000.
- The output should have facility name and total revenue, sorted by revenue. 
- Remember that there's a different cost for guests and members!

In [0]:
%sql

select facilities.name, sum(  case 
    when bookings.memid = 0 then (guestcost * slots)
    else (membercost * slots) 
  end ) as total_revenue
from 
  bookings 
    inner join facilities on bookings.facid = facilities.facid
  group by
    facilities.name
  having 
    total_revenue < 1000
  order by 
    total_revenue

name,total_revenue
Table Tennis,180.0
Snooker Table,240.0
Pool Table,270.0


In [0]:
sql = '''
select facilities.name, sum(  case 
    when bookings.memid = 0 then (guestcost * slots)
    else (membercost * slots) 
  end ) as total_revenue
from 
  bookings 
    inner join facilities on bookings.facid = facilities.facid
  group by
    facilities.name
  having 
    total_revenue < 1000
  order by 
    total_revenue
'''

df = spark.sql(sql)
df.toPandas()

Unnamed: 0,name,total_revenue
0,Table Tennis,180.0
1,Snooker Table,240.0
2,Pool Table,270.0
