### This Notebook contains the python code ran inside of a Databricks Notebook to mount an S3 bucket onto the cluster

#### Step 1: Importing Packages

In [1]:
# Initially running findspark to locate Spark installation and add it to the Python path.
import findspark

findspark.init()

In [2]:
# Spark has now been initiated and we can import the necessary functions
from pyspark.sql.functions import *

# URL Processing within Python
import urllib

#### Step 2: Read the table containing AWS authentication credentials

In [None]:
# Defining the path to the Delta table where the credentials are stored
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Reading the delta table into a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

#### Step 3: Extracting both Access & Secret Keys from the created Spark DataFrame & encoding the Secret Key for security purposes

In [None]:
# Extraction of the AWS Access & Secret Keys from the DataFrame
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encoding the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

#### Step 4: Mounting the S3 bucket 

In [None]:
# Input the AWS S3 Bucket's name
AWS_S3_BUCKET = "<bucket_name>"

# Create a name for the mount
MOUNT_NAME = "/mnt/<mount_name>"

# Amazon S3 Source URL for accessing the data stored inside the bucket
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)

# Mounting the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

#### Step 5: Checking that the S3 bucket was mounted successfully

In [None]:
# This will output a DataFrame inclsuive of the .json files
# These .json files were the files created when sending via the API from the python script

display(dbutils.fs.ls("/mnt/<mount_name>/topics/<topic_name>/partition=0/"))

#### Step 5: Further check to display each JSON file into a Spark DataFrame

In [None]:
# Setting the block inside the Databricks notebook to not perform checks
# This allows for improved performance by skipping unnecessary checks
%sql

SET spark.databricks.delta.formatCheck.enabled=false

#### Step 5.1: Checking the .pin table

In [None]:
# Reading in JSON .pin topic.
file_location = "/mnt/0e4a38902653_mount/topics/0e4a38902653.pin/partition=0/*.json"
file_type = "json"
infer_schema = "true"
df_pin = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .load(file_location)

display(df_pin)

#### Step 5.2: Checking the .geo table

In [None]:
# Reading in JSON .geo topic.
file_location = "/mnt/0e4a38902653_mount/topics/0e4a38902653.geo/partition=0/*.json"
file_type = "json"
infer_schema = "true"
df_geo = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .load(file_location)

display(df_geo)

#### Step 5.3: Checking the .user table

In [None]:
# Reading in JSON .user topic.
file_location = "/mnt/0e4a38902653_mount/topics/0e4a38902653.user/partition=0/*.json"
file_type = "json"
infer_schema = "true"
df_user = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .load(file_location)

display(df_user)