## Overview

This notebook shows you how to create and query a table or DataFrame loaded from data stored in AWS S3. There are two ways to establish access to S3: [IAM roles](https://docs.databricks.com/user-guide/cloud-configurations/aws/iam-roles.html) and access keys.

*We recommend using IAM roles to specify which cluster can access which buckets. Keys can show up in logs and table metadata and are therefore fundamentally insecure.* If you do use keys, you'll have to escape the `/` in your keys with `%2F`.

This is a **Python** notebook so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` magic command. Python, Scala, SQL, and R are all supported.

#### Connecting to S3 bucket using a secret key and access key

In [0]:
import urllib
access_key = "****"
secret_key = "***"
encoded_secret_key = urllib.parse.quote(secret_key, "")
aws_bucket_name = "dvdrentalbucket"
mount_name = "aws3_data"

dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)


#### Displaying contents of the mounted bucket

In [0]:
display(dbutils.fs.ls("/mnt/aws3_data"))

path,name,size
dbfs:/mnt/aws3_data/actor.json,actor.json,21199
dbfs:/mnt/aws3_data/address.json,address.json,119744
dbfs:/mnt/aws3_data/category.json,category.json,1230
dbfs:/mnt/aws3_data/city.json,city.json,56557
dbfs:/mnt/aws3_data/country.json,country.json,8932
dbfs:/mnt/aws3_data/customer.json,customer.json,150586
dbfs:/mnt/aws3_data/film.json,film.json,548352
dbfs:/mnt/aws3_data/film_actor.json,film_actor.json,395254
dbfs:/mnt/aws3_data/film_category.json,film_category.json,74316
dbfs:/mnt/aws3_data/inventory.json,inventory.json,424439


In [0]:
actor = spark.read.json("/mnt/aws3_data/actor.json")
address = spark.read.json("/mnt/aws3_data/address.json")
category = spark.read.json("/mnt/aws3_data/category.json")
city = spark.read.json("/mnt/aws3_data/city.json")
country = spark.read.json("/mnt/aws3_data/country.json")
customer = spark.read.json("/mnt/aws3_data/customer.json")
film = spark.read.json("/mnt/aws3_data/film.json")
film_actor = spark.read.json("/mnt/aws3_data/film_actor.json")
film_category = spark.read.json("/mnt/aws3_data/film_category.json")
inventory = spark.read.json("/mnt/aws3_data/inventory.json")
language = spark.read.json("/mnt/aws3_data/language.json")
payment = spark.read.json("/mnt/aws3_data/payment.json")
rental = spark.read.json("/mnt/aws3_data/rental.json")
staff = spark.read.json("/mnt/aws3_data/staff.json")
store = spark.read.json("/mnt/aws3_data/store.json")

#### Display the contents of one file

In [0]:
display(store)

address_id,last_update,manager_staff_id,store_id
1,2006-02-15T09:57:12,1,1
2,2006-02-15T09:57:12,2,2


In [0]:
rental.printSchema()

# Analysis

### What are the top and least rented (in-demand) genres and what are what are their total sales?

Join Category to film_category to film to inventory to rental to customer to payment

In [0]:
# Create temp tables for all the dataframes involved  / try to find a more memory-efficient method
category.createOrReplaceTempView("table_category")
film_category.createOrReplaceTempView("table_film_category")
film.createOrReplaceTempView("table_film")
inventory.createOrReplaceTempView("table_inventory")
rental.createOrReplaceTempView("table_rental")
customer.createOrReplaceTempView("table_customer")
payment.createOrReplaceTempView("table_payment")

In [0]:

popular_genres = spark.sql(
  
"""WITH table1 AS (SELECT name AS Genre, count(customer_id) AS Total_rent_demand
                FROM table_category c
                JOIN table_film_category fc
                USING(category_id)
                JOIN table_film f
                USING(film_id)
                JOIN table_inventory i
                USING(film_id)
                JOIN table_rental r
                USING(inventory_id)
                JOIN table_customer cu
                USING(customer_id)
                GROUP BY 1
                ORDER BY 2 DESC),
              
    table2 AS (SELECT c.name AS Genre, SUM(p.amount) AS total_sales
                FROM table_category c
                JOIN table_film_category fc
                USING(category_id)
                JOIN table_film
                USING(film_id)
                JOIN table_inventory i
                USING(film_id)
                JOIN table_rental r
                USING(inventory_id)
                JOIN table_payment p
                USING(rental_id)
                GROUP BY 1
                ORDER BY 2 DESC)
                
SELECT table1.genre, table1.total_rent_demand, table2.total_sales
FROM table1
JOIN table2
ON table1.genre = table2.genre
ORDER BY 2 DESC; """
)
popular_genres.show()

In [0]:
popular_genres.write.saveAsTable("popular_genre")

In [0]:
# write dataframe to parquet files
#popular_genres.write.parquet("popular_genres.parquet")

### How many distinct users have rented each genre?

Join Category to film_category to film to inventory to rental to customer

In [0]:
unique_users_genre = spark.sql(
  
"""SELECT name AS Genre, count(DISTINCT customer_id) AS Total_rent_demand
FROM table_category c
JOIN table_film_category fc
USING(category_id)
JOIN table_film f
USING(film_id)
JOIN table_inventory i
USING(film_id)
JOIN table_rental r
USING(inventory_id)
JOIN table_customer cu
USING(customer_id)
GROUP BY 1
ORDER BY 2 DESC; """
)

unique_users_genre.show()

In [0]:
unique_users_genre.write.saveAsTable("unique_users_genres")

In [0]:
# write dataframe to parquet files
#unique_users_genre.write.parquet("unique_users_genre.parquet")

### What is the Average rental rate for each genre? 
Join category to film_category to film

In [0]:
avg_rental_rate = spark.sql(
  
"""SELECT c.name AS genre, ROUND(AVG(f.rental_rate),2) AS Average_rental_rate
FROM table_category c
JOIN table_film_category fc
USING(category_id)
JOIN table_film f
USING(film_id)
GROUP BY 1
ORDER BY 2 DESC; """
)

avg_rental_rate.show()

In [0]:
avg_rental_rate.write.saveAsTable("avg_rental_rates")

In [0]:
# write dataframe to parquet files
#avg_rental_rate.write.parquet("avg_rental_rate.parquet")

### How many rented films were returned late, early and on time?

Join film to inventory to rental

In [0]:
return_status_film_count = spark.sql(

"""WITH t1 AS (Select *, DATE_PART('day', CAST(return_date as TIMESTAMP) - CAST(rental_date as TIMESTAMP)) AS date_difference
            FROM table_rental),
t2 AS (SELECT rental_duration, date_difference,
              CASE
                WHEN rental_duration > date_difference THEN 'Returned early'
                WHEN rental_duration = date_difference THEN 'Returned on Time'
                ELSE 'Returned late'
              END AS Return_Status
          FROM table_film f
          JOIN table_inventory i
          USING(film_id)
          JOIN t1
          USING (inventory_id))
SELECT Return_status, count(*) As total_no_of_films
FROM t2
GROUP BY 1
ORDER BY 2 DESC; """)

return_status_film_count.show()

In [0]:
return_status_film_count.write.saveAsTable("returnstatus_filmcount")

In [0]:
# write dataframe to parquet files
#return_status_film_count.write.parquet("return_status_film_count.parquet")

### What is the customer base and total sales in each country? 

Join country to city to address to customer to payment

In [0]:
# Create temp tables for all the dataframes involved  / try to find a more memory-efficient method
country.createOrReplaceTempView("table_country")
city.createOrReplaceTempView("table_city")
address.createOrReplaceTempView("table_address")

In [0]:
customerbase_per_country = spark.sql(

"""SELECT country, count(DISTINCT customer_id) AS customer_base, SUM(amount) AS total_sales
FROM table_country
JOIN table_city
USING(country_id)
JOIN table_address
USING(city_id)
JOIN table_customer
USING (address_id)
JOIN table_payment
USING(customer_id)
GROUP BY 1
ORDER BY 2 DESC; """
)

customerbase_per_country.show()

In [0]:
customerbase_per_country.write.saveAsTable("customer_base_per_country")

In [0]:
# write dataframe to parquet files
#customerbase_per_country.write.parquet("customerbase_per_country.parquet")

### Who are the top 5 customers per total sales and can we get their detail just in case Rent A Film wants to reward them?

Join country to city to address to customer to payment

In [0]:
top_five_customers = spark.sql(

"""WITH t1 AS (SELECT *, first_name || ' ' || last_name AS full_name
		    FROM table_customer)
SELECT full_name, email, address, phone, city, country, sum(amount) AS total_purchase_in_currency
FROM t1
JOIN table_address
USING(address_id)
JOIN table_city
USING (city_id)
JOIN table_country
USING (country_id)
JOIN table_payment
USING(customer_id)
GROUP BY 1,2,3,4,5,6
ORDER BY 7 DESC
LIMIT 5; """)

top_five_customers.show()

In [0]:
top_five_customers.write.saveAsTable("topfive_customers")

In [0]:
# write dataframe to parquet files
#top_five_customers.write.parquet("top_five_customers.parquet")