## SQL at Scale with Spark SQL

Welcome to the SQL mini project. For this project, you will use the Databricks Platform and work through a series of exercises using Spark SQL. The dataset size may not be too big but the intent here is to familiarize yourself with the Spark SQL interface which scales easily to huge datasets, without you having to worry about changing your SQL queries. 

The data you need is present in the mini-project folder in the form of three CSV files. This data will be imported in Databricks to create the following tables under the __`country_club`__ database.

<br>
1. The __`bookings`__ table,
2. The __`facilities`__ table, and
3. The __`members`__ table.

You will be uploading these datasets shortly into the Databricks platform to understand how to create a database within minutes! Once the database and the tables are populated, you will be focusing on the mini-project questions.

In the mini project, you'll be asked a series of questions. You can solve them using the databricks platform, but for the final deliverable,
please download this notebook as an IPython notebook (__`File -> Export -> IPython Notebook`__) and upload it to your GitHub.

### Creating the Database

We will first create our database in which we will be creating our three tables of interest

In [None]:
import os
from pyspark.sql import SparkSession

# Set HADOOP_HOME
os.environ['HADOOP_HOME'] = '/path/to/your/hadoop'

# Set hadoop.home.dir
os.environ['hadoop.home.dir'] = '/path/to/your/hadoop'

# Create a Spark session
spark = SparkSession.builder.appName("mini-project").getOrCreate()

# Drop all tables in the database
spark.sql("DROP TABLE IF EXISTS country_club.bookings")
spark.sql("DROP TABLE IF EXISTS country_club.facilities")
spark.sql("DROP TABLE IF EXISTS country_club.members")

# Drop the database
spark.sql("DROP DATABASE IF EXISTS country_club CASCADE")

# Create the database
spark.sql("CREATE DATABASE country_club")

# Show databases
spark.sql("SHOW DATABASES").show()



### Creating the Tables

In this section, we will be creating the three tables of interest and populate them with the data from the CSV files already available to you. 
To get started, first upload the three CSV files to the DBFS as depicted in the following figure

![](https://i.imgur.com/QcCruBr.png)


Once you have done this, please remember to execute the following code to build the dataframes which will be saved as tables in our database

In [118]:
import shutil

# Update these paths with your local paths
local_file_location_bookings = "C:/Users/zoyaa/Downloads/mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001/mec-5.6.6-sql-with-spark-mini-project/Bookings.csv"
local_file_location_facilities = "C:/Users/zoyaa/Downloads/mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001/mec-5.6.6-sql-with-spark-mini-project/Facilities.csv"
local_file_location_members = "C:/Users/zoyaa/Downloads/mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001/mec-5.6.6-sql-with-spark-mini-project/Members.csv"

# Use the local file paths in your Spark code
file_location_bookings = local_file_location_bookings
file_location_facilities = local_file_location_facilities
file_location_members = local_file_location_members

# File type
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bookings_df = (spark.read.format(file_type) 
                    .option("inferSchema", infer_schema) 
                    .option("header", first_row_is_header) 
                    .option("sep", delimiter) 
                    .load(file_location_bookings))

facilities_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_facilities))

members_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_members))




### Viewing the dataframe schemas

We can take a look at the schemas of our potential tables to be written to our database soon

In [None]:
print('Bookings Schema')
bookings_df.printSchema()
print('Facilities Schema')
facilities_df.printSchema()
print('Members Schema')
members_df.printSchema()

### Create permanent tables
We will be creating three permanent tables here in our __`country_club`__ database as we discussed previously with the following code

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MiniProject").getOrCreate()

# Define file paths
bookings_path = r'C:\Users\zoyaa\Downloads\mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001\mec-5.6.6-sql-with-spark-mini-project\Bookings.csv'
facilities_path = r'C:\Users\zoyaa\Downloads\mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001\mec-5.6.6-sql-with-spark-mini-project\Facilities.csv'
members_path = r'C:\Users\zoyaa\Downloads\mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001\mec-5.6.6-sql-with-spark-mini-project\Members.csv'

# Load data into Spark DataFrames
bookings_df = spark.read.csv(bookings_path, header=True, inferSchema=True)
facilities_df = spark.read.csv(facilities_path, header=True, inferSchema=True)
members_df = spark.read.csv(members_path, header=True, inferSchema=True)

# Create temporary views for Spark SQL
bookings_df.createOrReplaceTempView("bookings")
facilities_df.createOrReplaceTempView("facilities")
members_df.createOrReplaceTempView("members")

# Example SQL query
result = spark.sql("""
    SELECT *
    FROM bookings
    WHERE memid = 1
""")
result.show()



### Refresh tables and check them

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS country_club")

# Use the database
spark.sql("USE country_club")

# Refresh tables
spark.sql("REFRESH TABLE bookings")
spark.sql("REFRESH TABLE facilities")
spark.sql("REFRESH TABLE members")

# Show tables
tables = spark.sql("SHOW TABLES")
tables.show()


### Test a sample SQL query

__Note:__ You can use __`%sql`__ at the beginning of a cell and write SQL queries directly as seen in the following cell. Neat isn't it!

In [None]:
import pandas as pd

# Load CSV files into DataFrames
bookings_df = pd.read_csv(r'C:\Users\zoyaa\Downloads\mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001\mec-5.6.6-sql-with-spark-mini-project\Bookings.csv')
facilities_df = pd.read_csv(r'C:\Users\zoyaa\Downloads\mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001\mec-5.6.6-sql-with-spark-mini-project\Facilities.csv')
members_df = pd.read_csv(r'C:\Users\zoyaa\Downloads\mec-5.6.6-sql-with-spark-mini-project-20231102T223441Z-001\mec-5.6.6-sql-with-spark-mini-project\Members.csv')

# Display the first few rows of each DataFrame to verify the data has been loaded
print("Bookings DataFrame:")
print(bookings_df.head())

print("\nFacilities DataFrame:")
print(facilities_df.head())

print("\nMembers DataFrame:")
print(members_df.head())



#### Q1: Some of the facilities charge a fee to members, but some do not. Please list the names of the facilities that do.

In [None]:
%sql



# Assuming 'spark' is your SparkSession
result = spark.sql("SELECT name FROM facilities WHERE membercost > 0")
result.show()





####  Q2: How many facilities do not charge a fee to members?

In [None]:
# Install ipython-sql
!pip install ipython-sql

# Load the sql extension
%load_ext sql

# Assuming 'spark' is your SparkSession
result = spark.sql("SELECT COUNT(*) AS num_facilities_no_fee FROM facilities WHERE membercost = 0")
result.show()





#### Q3: How can you produce a list of facilities that charge a fee to members, where the fee is less than 20% of the facility's monthly maintenance cost? 
#### Return the facid, facility name, member cost, and monthly maintenance of the facilities in question.

In [None]:
%sql

%load_ext sql

# Assuming 'spark' is your SparkSession
result = spark.sql("""
    SELECT facid, name AS facility_name, membercost, monthlymaintenance
    FROM facilities
    WHERE membercost > 0 AND membercost < 0.2 * monthlymaintenance
""")
result.show()



#### Q4: How can you retrieve the details of facilities with ID 1 and 5? Write the query without using the OR operator.

In [None]:
%%sql

SELECT *
FROM facilities
WHERE facid IN (1, 5);

# Separate cell for Python code

%load_ext sql

# Assuming 'spark' is your SparkSession
result = spark.sql("""
    SELECT *
    FROM facilities
    WHERE facid IN (1, 5)
""")
result.show()




#### Q5: How can you produce a list of facilities, with each labelled as 'cheap' or 'expensive', depending on if their monthly maintenance cost is more than $100? 
#### Return the name and monthly maintenance of the facilities in question.

In [None]:
%sql

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Assuming 'spark' is your SparkSession
spark = SparkSession.builder.appName("facility_analysis").getOrCreate()

# Assuming 'facilities' is your DataFrame representing the 'facilities' table
facilities = spark.sql("SELECT * FROM facilities")

# Create a new DataFrame with the desired columns and the 'label' column
result = facilities.select(
    col("name").alias("facility_name"),
    col("monthlymaintenance"),
    when(col("monthlymaintenance") > 100, "expensive").otherwise("cheap").alias("label")
)

# Show the result
result.show()


#### Q6: You'd like to get the first and last name of the last member(s) who signed up. Do not use the LIMIT clause for your solution.

In [128]:
import pandas as pd

# Read the Members.csv file into a DataFrame
members_df = pd.read_csv("Members.csv")

# Convert 'joindate' column to datetime if needed
members_df['joindate'] = pd.to_datetime(members_df['joindate'])

# Find the maximum join date
max_joindate = members_df['joindate'].max()

# Filter members with the maximum join date
result = members_df[members_df['joindate'] == max_joindate][['firstname', 'surname']]

# Display the result
print(result)





   firstname surname
30    Darren   Smith


####  Q7: How can you produce a list of all members who have used a tennis court?
- Include in your output the name of the court, and the name of the member formatted as a single column. 
- Ensure no duplicate data
- Also order by the member name.

In [129]:
import pandas as pd

# Load Members.csv, Bookings.csv, and Facilities.csv into Pandas DataFrames
members_df = pd.read_csv("Members.csv")
bookings_df = pd.read_csv("Bookings.csv")
facilities_df = pd.read_csv("Facilities.csv")

# Merge the Members, Bookings, and Facilities DataFrames
merged_df = pd.merge(pd.merge(members_df, bookings_df, on='memid'), facilities_df, on='facid')

# Filter for tennis court bookings
tennis_court_bookings = merged_df[merged_df['name'].str.startswith('Tennis Court')]

# Create a formatted column for member names
tennis_court_bookings['member_name'] = tennis_court_bookings['surname'] + ', ' + tennis_court_bookings['firstname']

# Select and display the desired columns
result = tennis_court_bookings[['member_name', 'name']].drop_duplicates().sort_values('member_name')
print(result)



             member_name            name
1425     Bader, Florence  Tennis Court 2
1802     Bader, Florence  Tennis Court 1
1388         Baker, Anne  Tennis Court 2
1773         Baker, Anne  Tennis Court 1
1803      Baker, Timothy  Tennis Court 1
1433      Baker, Timothy  Tennis Court 2
1234         Boothe, Tim  Tennis Court 2
1726         Boothe, Tim  Tennis Court 1
1217     Butters, Gerald  Tennis Court 2
1613     Butters, Gerald  Tennis Court 1
1834        Coplin, Joan  Tennis Court 1
1865      Crumpet, Erica  Tennis Court 1
1223         Dare, Nancy  Tennis Court 2
1701         Dare, Nancy  Tennis Court 1
1855      Farrell, David  Tennis Court 1
1452      Farrell, David  Tennis Court 2
1423     Farrell, Jemima  Tennis Court 2
1779     Farrell, Jemima  Tennis Court 1
1069        GUEST, GUEST  Tennis Court 2
1458        GUEST, GUEST  Tennis Court 1
1833    Genting, Matthew  Tennis Court 1
1454          Hunt, John  Tennis Court 2
1861          Hunt, John  Tennis Court 1
1358        Jone

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  tennis_court_bookings['member_name'] = tennis_court_bookings['surname'] + ', ' + tennis_court_bookings['firstname']


#### Q8: How can you produce a list of bookings on the day of 2012-09-14 which will cost the member (or guest) more than $30? 

- Remember that guests have different costs to members (the listed costs are per half-hour 'slot')
- The guest user's ID is always 0. 

#### Include in your output the name of the facility, the name of the member formatted as a single column, and the cost.

- Order by descending cost, and do not use any subqueries.

In [None]:
result = run_sql('''
                    SELECT name,
                           CONCAT(firstname,' ',surname) AS member_name,
                           CASE WHEN bookings.memid = 0 THEN guestcost * slots
                           ELSE membercost * slots END AS cost 
                    FROM bookings
                    JOIN facilities
                      ON Bookings.facid = facilities.facid
                    JOIN members
                      ON bookings.memid = members.memid
                    WHERE starttime LIKE '%%2012-09-14%%'AND
                          ((guestcost * slots > 30 AND bookings.memid = 0) OR
                          (membercost * slots > 30 AND bookings.memid != 0))
                    ORDER BY cost DESC
                 ''')
result.toPandas()



#### Q9: This time, produce the same result as in Q8, but using a subquery.

In [None]:
%sql

result = run_sql('''
                    SELECT DISTINCT name, surname, 
                            CASE
                               WHEN surname == 'GUEST' THEN guestcost
                               ELSE membercost
                            END as cost 
                    FROM (SELECT *
                            FROM bookings 
                            INNER JOIN facilities
                            USING (facid)
                            INNER JOIN members
                            USING (memid))
                    WHERE starttime like "%2012-09-14%"
                   AND (membercost > 30
                   OR guestcost > 30)
                   ORDER BY cost desc
                 ''')
result.toPandas()


#### Q10: Produce a list of facilities with a total revenue less than 1000.
- The output should have facility name and total revenue, sorted by revenue. 
- Remember that there's a different cost for guests and members!

In [None]:
result = run_sql('''
                    SELECT name,
                           SUM(fee * slots) AS revenue
                    FROM (SELECT CASE WHEN bookings.memid = 0 THEN guestcost
                                 ELSE membercost END AS fee,
                                 slots,
                                 name
                          FROM bookings
                          JOIN facilities
                            ON bookings.facid = facilities.facid) AS booking_Fees
                    GROUP BY name
                    HAVING revenue < 1000
                    ORDER BY revenue
                 ''')
result.toPandas()
