In [0]:
from pyspark import SparkContext 
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


sc = SparkSession.builder.master("local[1]").appName("spark-sample").getOrCreate()
spark = SQLContext(sc)

spark.sql('drop database if exists country_club cascade;')
spark.sql('create database country_club;')
spark.sql('show databases;' )

# File location and type
file_location_bookings = "dbfs:/user/hive/warehouse/bookings_custom"
file_location_facilities = "dbfs:/user/hive/warehouse/facilities_custom"
file_location_members = "dbfs:/user/hive/warehouse/members_custom"


file_type = "delta"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bookings_df = (spark.read.format(file_type) 
                    .option("inferSchema", infer_schema) 
                    .option("header", first_row_is_header) 
                    .option("sep", delimiter) 
                    .load(file_location_bookings))

facilities_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_facilities))

members_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_members))
     







In [0]:
print('Bookings Schema')
bookings_df.printSchema()
print('Facilities Schema')
facilities_df.printSchema()
print('Members Schema')
members_df.printSchema()

Bookings Schema
root
 |-- bookid: long (nullable = true)
 |-- facid: long (nullable = true)
 |-- memid: long (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- slots: long (nullable = true)

Facilities Schema
root
 |-- facid: long (nullable = true)
 |-- name: string (nullable = true)
 |-- membercost: double (nullable = true)
 |-- guestcost: double (nullable = true)
 |-- initialoutlay: long (nullable = true)
 |-- monthlymaintenance: long (nullable = true)

Members Schema
root
 |-- memid: long (nullable = true)
 |-- surname: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: long (nullable = true)
 |-- telephone: string (nullable = true)
 |-- recommendedby: long (nullable = true)
 |-- joindate: timestamp (nullable = true)



In [0]:
permanent_table_name_bookings = "country_club.Bookings"
bookings_df.write.format("parquet").saveAsTable(permanent_table_name_bookings)

permanent_table_name_facilities = "country_club.Facilities"
facilities_df.write.format("parquet").saveAsTable(permanent_table_name_facilities)

permanent_table_name_members = "country_club.Members"
members_df.write.format("parquet").saveAsTable(permanent_table_name_members)

In [0]:
spark.sql('USE country_club;')
spark.sql('REFRESH TABLE bookings;')
spark.sql('REFRESH TABLE facilities;')
spark.sql('REFRESH TABLE members;')
spark.sql('SHOW TABLES;')

Out[18]: DataFrame[database: string, tableName: string, isTemporary: boolean]

In [0]:
query = spark.sql('select * from bookings limit 3')
query.show()

+------+-----+-----+-------------------+-----+
|bookid|facid|memid|          starttime|slots|
+------+-----+-----+-------------------+-----+
|     0|    3|    1|2012-07-03 11:00:00|    2|
|     1|    4|    1|2012-07-03 08:00:00|    2|
|     2|    6|    0|2012-07-03 18:00:00|    2|
+------+-----+-----+-------------------+-----+



In [0]:
q1_query = spark.sql('''SELECT name, membercost FROM facilities 
                        WHERE membercost != 0''')
q1_query.show()

+--------------+----------+
|          name|membercost|
+--------------+----------+
|Tennis Court 1|       5.0|
|Tennis Court 2|       5.0|
|Massage Room 1|       9.9|
|Massage Room 2|       9.9|
|  Squash Court|       3.5|
+--------------+----------+



In [0]:
q2_query1 = spark.sql('SELECT count(*) FROM facilities WHERE membercost = 0')
q2_query1.show()

q2_query2 = spark.sql('SELECT name, membercost FROM facilities WHERE membercost = 0')
q2_query2.show()
     

+--------+
|count(1)|
+--------+
|       4|
+--------+

+---------------+----------+
|           name|membercost|
+---------------+----------+
|Badminton Court|       0.0|
|   Table Tennis|       0.0|
|  Snooker Table|       0.0|
|     Pool Table|       0.0|
+---------------+----------+



In [0]:

q3_query = spark.sql('''SELECT facid, name, membercost, monthlymaintenance 
                        FROM facilities WHERE membercost != 0 AND 
                        membercost < (monthlymaintenance * 0.2)''')
q3_query.show()

     

+-----+--------------+----------+------------------+
|facid|          name|membercost|monthlymaintenance|
+-----+--------------+----------+------------------+
|    0|Tennis Court 1|       5.0|               200|
|    1|Tennis Court 2|       5.0|               200|
|    4|Massage Room 1|       9.9|              3000|
|    5|Massage Room 2|       9.9|              3000|
|    6|  Squash Court|       3.5|                80|
+-----+--------------+----------+------------------+



In [0]:
q4_query = spark.sql('''SELECT * FROM facilities 
                        WHERE facid in (1, 5)''')
q4_query.show()
     

+-----+--------------+----------+---------+-------------+------------------+
|facid|          name|membercost|guestcost|initialoutlay|monthlymaintenance|
+-----+--------------+----------+---------+-------------+------------------+
|    1|Tennis Court 2|       5.0|     25.0|         8000|               200|
|    5|Massage Room 2|       9.9|     80.0|         4000|              3000|
+-----+--------------+----------+---------+-------------+------------------+



In [0]:
q5_query = spark.sql('''SELECT name, monthlymaintenance, 
                        CASE WHEN (monthlymaintenance > 100) 
                              THEN 'expensive' 
                              ELSE 'cheap' END AS price_guage 
                        FROM facilities; ''')
q5_query.show()
     

+---------------+------------------+-----------+
|           name|monthlymaintenance|price_guage|
+---------------+------------------+-----------+
| Tennis Court 1|               200|  expensive|
| Tennis Court 2|               200|  expensive|
|Badminton Court|                50|      cheap|
|   Table Tennis|                10|      cheap|
| Massage Room 1|              3000|  expensive|
| Massage Room 2|              3000|  expensive|
|   Squash Court|                80|      cheap|
|  Snooker Table|                15|      cheap|
|     Pool Table|                15|      cheap|
+---------------+------------------+-----------+



In [0]:
q6_query = spark.sql('''SELECT firstname,surname FROM members
                        WHERE memid=(SELECT max(memid) FROM members);''')
q6_query.show()

+---------+-------+
|firstname|surname|
+---------+-------+
|   Darren|  Smith|
+---------+-------+



In [0]:
q7_query = spark.sql('''SELECT DISTINCT memid, firstname, surname 
                      FROM members WHERE memid IN 
                        (SELECT memid FROM bookings WHERE facid = 0 OR facid = 1) 
                      ORDER BY surname;''')

q7_query.show(members_df.count(), truncate = False)

+-----+---------+--------+
|memid|firstname|surname |
+-----+---------+--------+
|15   |Florence |Bader   |
|12   |Anne     |Baker   |
|16   |Timothy  |Baker   |
|8    |Tim      |Boothe  |
|5    |Gerald   |Butters |
|22   |Joan     |Coplin  |
|36   |Erica    |Crumpet |
|7    |Nancy    |Dare    |
|13   |Jemima   |Farrell |
|28   |David    |Farrell |
|0    |GUEST    |GUEST   |
|20   |Matthew  |Genting |
|35   |John     |Hunt    |
|11   |David    |Jones   |
|26   |Douglas  |Jones   |
|4    |Janice   |Joplette|
|10   |Charles  |Owen    |
|17   |David    |Pinker  |
|30   |Millicent|Purview |
|3    |Tim      |Rownam  |
|27   |Henrietta|Rumney  |
|24   |Ramnaresh|Sarwin  |
|14   |Jack     |Smith   |
|1    |Darren   |Smith   |
|2    |Tracy    |Smith   |
|9    |Ponder   |Stibbons|
|6    |Burton   |Tracy   |
+-----+---------+--------+



In [0]:
q8_query = spark.sql('''SELECT members.firstname || ' ' || members.surname as fullname, facilities.name as facility,
                        CASE
                            WHEN members.memid = 0 THEN 
                            (facilities.guestcost*bookings.slots)
                            WHEN members.memid != 0 THEN
                            (facilities.membercost*bookings.slots) 
                        END AS cost

           FROM members INNER JOIN bookings ON members.memid = bookings.memid
                        INNER JOIN facilities ON bookings.facid = facilities.facid 

          WHERE (DATE(bookings.starttime) >= '2012-09-14' AND 
                DATE(bookings.starttime) < '2012-09-15') AND (
		            (members.memid = 0 AND bookings.slots*facilities.guestcost >= 30) OR
                 (members.memid != 0 AND bookings.slots*facilities.membercost >= 30)
                 )
          ORDER BY cost DESC;''')
      
q8_query.show()

+--------------+--------------+-----+
|      fullname|      facility| cost|
+--------------+--------------+-----+
|   GUEST GUEST|Massage Room 2|320.0|
|   GUEST GUEST|Massage Room 1|160.0|
|   GUEST GUEST|Massage Room 1|160.0|
|   GUEST GUEST|Massage Room 1|160.0|
|   GUEST GUEST|Tennis Court 2|150.0|
|   GUEST GUEST|Tennis Court 1| 75.0|
|   GUEST GUEST|Tennis Court 1| 75.0|
|   GUEST GUEST|Tennis Court 2| 75.0|
|   GUEST GUEST|  Squash Court| 70.0|
|Jemima Farrell|Massage Room 1| 39.6|
|   GUEST GUEST|  Squash Court| 35.0|
|   GUEST GUEST|  Squash Court| 35.0|
|   David Jones|Tennis Court 2| 30.0|
|    Tim Boothe|Tennis Court 2| 30.0|
+--------------+--------------+-----+



In [0]:
q9_query = spark.sql('''SELECT fullname, facility, cost FROM (
                        SELECT members.firstname || ' ' || members.surname as fullname, 
                        facilities.name as facility,
                        CASE
                            WHEN members.memid = 0 THEN 
                            (facilities.guestcost*bookings.slots)
                            WHEN members.memid != 0 THEN
                            (facilities.membercost*bookings.slots) 
                        END AS cost

            FROM members INNER JOIN bookings ON members.memid = bookings.memid
                         INNER JOIN facilities ON bookings.facid = facilities.facid 
            WHERE 
               DATE(bookings.starttime) >= '2012-09-14' AND 
               DATE(bookings.starttime) < '2012-09-15'
                            ) AS bookings WHERE cost >= 30 ORDER by cost DESC; ''')
            
        
q9_query.show()

+--------------+--------------+-----+
|      fullname|      facility| cost|
+--------------+--------------+-----+
|   GUEST GUEST|Massage Room 2|320.0|
|   GUEST GUEST|Massage Room 1|160.0|
|   GUEST GUEST|Massage Room 1|160.0|
|   GUEST GUEST|Massage Room 1|160.0|
|   GUEST GUEST|Tennis Court 2|150.0|
|   GUEST GUEST|Tennis Court 1| 75.0|
|   GUEST GUEST|Tennis Court 1| 75.0|
|   GUEST GUEST|Tennis Court 2| 75.0|
|   GUEST GUEST|  Squash Court| 70.0|
|Jemima Farrell|Massage Room 1| 39.6|
|   GUEST GUEST|  Squash Court| 35.0|
|   GUEST GUEST|  Squash Court| 35.0|
|   David Jones|Tennis Court 2| 30.0|
|    Tim Boothe|Tennis Court 2| 30.0|
+--------------+--------------+-----+



In [0]:
q10_query1 = spark.sql('''SELECT facilities.name, 
                          sum(CASE
                               WHEN bookings.memid = 0 THEN 
                                  facilities.guestcost*bookings.slots
                               WHEN bookings.memid != 0 THEN
                                  facilities.membercost*bookings.slots END)
                           AS total_revenue 

                      FROM bookings INNER JOIN facilities 
                      ON bookings.facid = facilities.facid 
                      GROUP BY facilities.name HAVING 
                          sum(CASE 
                                WHEN bookings.memid = 0 THEN 
                                  facilities.guestcost*bookings.slots
                               WHEN bookings.memid != 0 THEN
                                  facilities.membercost*bookings.slots END) < 1000
                      ORDER BY total_revenue;''' )
q10_query1.show()
     

+-------------+-------------+
|         name|total_revenue|
+-------------+-------------+
| Table Tennis|        180.0|
|Snooker Table|        240.0|
|   Pool Table|        270.0|
+-------------+-------------+



In [0]:

q10_query2 = spark.sql(''' SELECT name, total_revenue FROM (
                          SELECT facilities.name, 
                          sum(CASE
                               WHEN bookings.memid = 0 THEN 
                                  facilities.guestcost*bookings.slots
                               WHEN bookings.memid != 0 THEN
                                  facilities.membercost*bookings.slots END)
                           AS total_revenue

                      FROM bookings INNER JOIN facilities 
                      ON bookings.facid = facilities.facid GROUP BY facilities.name
                               ) AS agg WHERE total_revenue < 1000 ORDER BY total_revenue;''') 

q10_query2.show()

+-------------+-------------+
|         name|total_revenue|
+-------------+-------------+
| Table Tennis|        180.0|
|Snooker Table|        240.0|
|   Pool Table|        270.0|
+-------------+-------------+

