Follow the instructions here https://medium.com/@naomi.fridman/install-pyspark-to-run-on-jupyter-notebook-on-windows-4ec2009de21f
to install PySpark

In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'E:\\spark\\spark-2.4.5-bin-hadoop2.7'

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession

In [3]:
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

### Spark SQL
The dataset size may not be too big but the intent here is to get familiarized with the Spark SQL interface which scales easily to huge datasets, without having to worry about changing SQL queries.

In [4]:
# Checking Existence of Spark Environment Variables
spark

In [5]:
sqlContext

<pyspark.sql.context.SQLContext at 0x188c85de0f0>

Creating a utility function to run SQL commands. 
Instead of typing the same python functions repeatedly, we build a small function where you can just pass your query to get results.
Remember we are using Spark SQL in PySpark,
we can't run multiple SQL statements in one go (no semi-colon ';' separated SQL statements),
we can run multi-line SQL queries (but still has to be a single statement).

In [6]:
def run_sql(statement):
    try:
        result = sqlContext.sql(statement)
    except Exception as e:
        print(e.desc, '\n', e.stackTrace)
        return
    return result

Creating the Database.

In [7]:
run_sql('drop database if exists country_club cascade')
run_sql('create database country_club')
dbs = run_sql('show databases')
dbs.toPandas()

Unnamed: 0,databaseName
0,country_club
1,default


Creating the Tables.

In [8]:
# File location and type
file_location_bookings = "./Bookings.csv"
file_location_facilities = "./Facilities.csv"
file_location_members = "./Members.csv"

file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bookings_df = (spark.read.format(file_type) 
                    .option("inferSchema", infer_schema) 
                    .option("header", first_row_is_header) 
                    .option("sep", delimiter) 
                    .load(file_location_bookings))

facilities_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_facilities))

members_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_members))

Viewing the dataframe schemas.

In [9]:
print('Bookings Schema')
bookings_df.printSchema()
print('Facilities Schema')
facilities_df.printSchema()
print('Members Schema')
members_df.printSchema()

Bookings Schema
root
 |-- bookid: integer (nullable = true)
 |-- facid: integer (nullable = true)
 |-- memid: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- slots: integer (nullable = true)

Facilities Schema
root
 |-- facid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- membercost: double (nullable = true)
 |-- guestcost: double (nullable = true)
 |-- initialoutlay: integer (nullable = true)
 |-- monthlymaintenance: integer (nullable = true)

Members Schema
root
 |-- memid: integer (nullable = true)
 |-- surname: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- telephone: string (nullable = true)
 |-- recommendedby: integer (nullable = true)
 |-- joindate: timestamp (nullable = true)



Create permanent tables.

In [10]:
permanent_table_name_bookings = "country_club.Bookings"
bookings_df.write.format("parquet").saveAsTable(permanent_table_name_bookings)

permanent_table_name_facilities = "country_club.Facilities"
facilities_df.write.format("parquet").saveAsTable(permanent_table_name_facilities)

permanent_table_name_members = "country_club.Members"
members_df.write.format("parquet").saveAsTable(permanent_table_name_members)

Refresh tables and check them.

In [11]:
run_sql('use country_club')
run_sql('REFRESH table bookings')
run_sql('REFRESH table facilities')
run_sql('REFRESH table members')
tbls = run_sql('show tables')
tbls.toPandas()

Unnamed: 0,database,tableName,isTemporary
0,country_club,bookings,False
1,country_club,facilities,False
2,country_club,members,False


Testing a sample SQL query.

In [12]:
result = run_sql('''
                    SELECT * 
                    FROM bookings 
                    LIMIT 3
                 ''')
result.toPandas()

Unnamed: 0,bookid,facid,memid,starttime,slots
0,0,3,1,2012-07-03 11:00:00,2
1,1,4,1,2012-07-03 08:00:00,2
2,2,6,0,2012-07-03 18:00:00,2


Some of the facilities charge a fee to members, but some do not. Listing the names of the facilities that do.

In [13]:
result = run_sql('''
                    SELECT name
                    FROM facilities
                    WHERE membercost > 0 AND membercost IS NOT NULL 
                 ''')
result.toPandas()

Unnamed: 0,name
0,Tennis Court 1
1,Tennis Court 2
2,Massage Room 1
3,Massage Room 2
4,Squash Court


How many facilities do not charge a fee to members?

In [14]:
result = run_sql('''
                    SELECT COUNT(name)
                    FROM facilities
                    WHERE membercost = 0 AND membercost IS NOT NULL
                 ''')
result.toPandas()

Unnamed: 0,count(name)
0,4


Producing a list of facilities that charge a fee to members, where the fee is less than 20% of the facility's monthly maintenance cost.
Returning the facid, facility name, member cost, and monthly maintenance of the facilities.

In [15]:
result = run_sql('''
                    SELECT facid, name, membercost, monthlymaintenance
                    FROM facilities
                    WHERE membercost > 0 AND membercost IS NOT NULL AND membercost < 0.2 * monthlymaintenance
                ''')
result.toPandas()

Unnamed: 0,facid,name,membercost,monthlymaintenance
0,0,Tennis Court 1,5.0,200
1,1,Tennis Court 2,5.0,200
2,4,Massage Room 1,9.9,3000
3,5,Massage Room 2,9.9,3000
4,6,Squash Court,3.5,80


Retrieve the details of facilities with ID 1 and 5? Write the query without using the OR operator.

In [16]:
result = run_sql('''
                    SELECT *
                    FROM facilities
                    WHERE facid = 1
                    UNION
                    SELECT *
                    FROM facilities
                    WHERE facid = 2
                ''')
result.toPandas()

Unnamed: 0,facid,name,membercost,guestcost,initialoutlay,monthlymaintenance
0,1,Tennis Court 2,5.0,25.0,8000,200
1,2,Badminton Court,0.0,15.5,4000,50


Produce a list of facilities, with each labelled as 'cheap' or 'expensive', depending on if their monthly maintenance cost is more than $100.
Return the name and monthly maintenance of the facilities.

In [17]:
result = run_sql('''
                    SELECT name, monthlymaintenance,
                    CASE WHEN monthlymaintenance > 100 THEN 'expensive'
                         ELSE 'cheap'
                         END
                         AS monthlymaintenance_label
                    FROM facilities
                 ''')
result.toPandas()

Unnamed: 0,name,monthlymaintenance,monthlymaintenance_label
0,Tennis Court 1,200,expensive
1,Tennis Court 2,200,expensive
2,Badminton Court,50,cheap
3,Table Tennis,10,cheap
4,Massage Room 1,3000,expensive
5,Massage Room 2,3000,expensive
6,Squash Court,80,cheap
7,Snooker Table,15,cheap
8,Pool Table,15,cheap


Get the first and last name of the last member(s) who signed up. Do not use the LIMIT clause.

In [18]:
result = run_sql('''
                    SELECT firstname, surname
                    FROM members
                    WHERE joindate IN (
                        SELECT MAX(joindate)
                        FROM members
                        )
                 ''')
result.toPandas()

Unnamed: 0,firstname,surname
0,Darren,Smith


Produce a list of all members who have used a tennis court.
Include in your output the name of the court, and the name of the member formatted as a single column.
Ensure no duplicate data.
Also order by the member name.

In [19]:
result = run_sql('''
                    SELECT name, CONCAT(firstname, " ", surname) AS membername
                    FROM members,
                        (SELECT DISTINCT memid, name
                         FROM bookings
                         INNER JOIN facilities
                         USING (facid)
                         WHERE facilities.name LIKE 'Tennis Court%') AS subquery
                    WHERE members.memid = subquery.memid
                    ORDER BY membername
                 ''')
result.toPandas()

Unnamed: 0,name,membername
0,Tennis Court 1,Anne Baker
1,Tennis Court 2,Anne Baker
2,Tennis Court 2,Burton Tracy
3,Tennis Court 1,Burton Tracy
4,Tennis Court 1,Charles Owen
5,Tennis Court 2,Charles Owen
6,Tennis Court 2,Darren Smith
7,Tennis Court 1,David Farrell
8,Tennis Court 2,David Farrell
9,Tennis Court 2,David Jones


Produce a list of bookings on the day of 2012-09-14 which will cost the member (or guest) more than $30.
Remember that guests have different costs to members (the listed costs are per half-hour 'slot').
The guest user's ID is always 0.
Include the name of the facility, the name of the member formatted as a single column, and the cost.
Order by descending cost, and do not use any subqueries.

In [20]:
result = run_sql('''
                    SELECT name AS name_of_facility, CONCAT(firstname, " ", surname) AS name_of_member, cost
                    FROM members
                    INNER JOIN
                    (SELECT name, memid, cost
                    FROM
                    (SELECT name, memid,
                           CASE WHEN memid = 0 THEN slot_guestcost
                                ELSE slot_membercost
                                END
                                AS cost
                    FROM
                    (SELECT memid, name, slots * membercost AS slot_membercost, slots * guestcost AS slot_guestcost
                    FROM facilities
                    INNER JOIN
                    (SELECT memid, facid, slots
                    FROM bookings
                    WHERE DATE(starttime) = '2012-09-14'
                    GROUP BY memid, facid, slots
                    ORDER BY memid)
                    USING (facid)))
                    WHERE cost > 30)
                    USING (memid)
                    ORDER BY cost DESC
                 ''')
result.toPandas()

Unnamed: 0,name_of_facility,name_of_member,cost
0,Massage Room 2,GUEST GUEST,320.0
1,Massage Room 1,GUEST GUEST,160.0
2,Tennis Court 2,GUEST GUEST,150.0
3,Tennis Court 2,GUEST GUEST,75.0
4,Tennis Court 1,GUEST GUEST,75.0
5,Squash Court,GUEST GUEST,70.0
6,Massage Room 1,Jemima Farrell,39.6
7,Squash Court,GUEST GUEST,35.0


Now, solving the same problem as above but using subquery instead of join.

In [21]:
result = run_sql('''
                    SELECT name AS name_of_facility, CONCAT(firstname, " ", surname) AS name_of_member, cost
                    FROM members,
                    (SELECT name, memid, cost
                    FROM
                    (SELECT name, memid,
                           CASE WHEN memid = 0 THEN slot_guestcost
                                ELSE slot_membercost
                                END
                                AS cost
                    FROM
                    (SELECT memid, name, slots * membercost AS slot_membercost, slots * guestcost AS slot_guestcost
                    FROM facilities,
                        (SELECT memid, facid, slots
                         FROM bookings
                         WHERE DATE(starttime) = '2012-09-14'
                         GROUP BY memid, facid, slots
                         ORDER BY memid) AS subquery
                    WHERE facilities.facid = subquery.facid))
                    WHERE cost > 30) AS subquery
                    WHERE members.memid = subquery.memid
                    ORDER BY cost DESC                    
                 ''')
result.toPandas()

Unnamed: 0,name_of_facility,name_of_member,cost
0,Massage Room 2,GUEST GUEST,320.0
1,Massage Room 1,GUEST GUEST,160.0
2,Tennis Court 2,GUEST GUEST,150.0
3,Tennis Court 2,GUEST GUEST,75.0
4,Tennis Court 1,GUEST GUEST,75.0
5,Squash Court,GUEST GUEST,70.0
6,Massage Room 1,Jemima Farrell,39.6
7,Squash Court,GUEST GUEST,35.0


Produce a list of facilities with a total revenue less than 1000.
The output should have facility name and total revenue, sorted by revenue.
Remember that there's a different cost for guests and members!

In [22]:
result = run_sql('''
                    SELECT name AS facility_name, total_revenue
                    FROM
                    (SELECT name, total_profit - initialoutlay AS total_revenue
                    FROM
                    (SELECT name, initialoutlay, SUM(monthly_profit) AS total_profit
                    FROM
                    (SELECT name, year, month, initialoutlay, facility_year_month_profit - monthlymaintenance AS monthly_profit
                    FROM
                    (SELECT name, year, month, initialoutlay, monthlymaintenance, SUM(cost) AS facility_year_month_profit
                    FROM
                    (SELECT name, year, month, initialoutlay, monthlymaintenance,
                           CASE WHEN memid = 0 THEN slot_guestcost
                                ELSE slot_membercost
                                END
                                AS cost
                    FROM
                    (SELECT memid, slots, YEAR(starttime) as year, MONTH(starttime) as month, name, slots * membercost AS slot_membercost, slots * guestcost AS slot_guestcost, initialoutlay, monthlymaintenance
                    FROM bookings
                    INNER JOIN facilities
                    USING (facid)))
                    GROUP BY name, year, month, initialoutlay, monthlymaintenance)
                    ORDER BY year, month)
                    GROUP BY name, initialoutlay))
                    WHERE total_revenue < 1000
                    ORDER BY total_revenue
                 ''')
result.toPandas()

Unnamed: 0,facility_name,total_revenue
0,Badminton Court,-2243.5
1,Snooker Table,-255.0
2,Pool Table,-175.0
3,Table Tennis,-170.0
