In [None]:
# Import pyspark functions
from pyspark.sql.functions import *
# Import url processing
import urllib

In [None]:
# Define the path to the Delta table
delta_table_path =  "dbfs:/user/hive/warehouse/authentication_credentials"
# Reading the Delta table to a Spark DataFrame
aws_keys_df =  spark.read.format("delta").load(delta_table_path)


In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
# AWS S3 bucket name
AWS_S3_BUCKET = "user-0affe012670f-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/user-0affe012670f-mount"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

In [None]:
display(dbutils.fs.ls("/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/"))

path,name,size,modificationTime
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000000.json,0affe012670f.geo+0+0000000000.json,140,1721851865000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000001.json,0affe012670f.geo+0+0000000001.json,132,1721852019000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000002.json,0affe012670f.geo+0+0000000002.json,132,1721852147000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000003.json,0affe012670f.geo+0+0000000003.json,130,1721854171000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000004.json,0affe012670f.geo+0+0000000004.json,135,1721854833000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000005.json,0affe012670f.geo+0+0000000005.json,141,1721854893000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000006.json,0affe012670f.geo+0+0000000006.json,133,1721854949000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000007.json,0affe012670f.geo+0+0000000007.json,152,1721901787000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000008.json,0affe012670f.geo+0+0000000008.json,133,1721901789000
dbfs:/mnt/user-0affe012670f-mount/topics/0affe012670f.geo/partition=0/0affe012670f.geo+0+0000000009.json,0affe012670f.geo+0+0000000009.json,134,1721901791000


In [None]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

In [None]:
# File location and type
# indicates reading all the content of the file from the path that have .json extension but construct the path instead.
file_location = "/mnt/user-0affe012670f-mount/topics" 
file_type = "json"

# Construct paths to the JSON objects
user_id = "0affe012670f"
pin_data_path = f"{file_location}/{user_id}.pin/partition=0/"
geo_data_path = f"{file_location}/{user_id}.geo/partition=0/"
user_data_path = f"{file_location}/{user_id}.user/partition=0/"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
#df_pin = spark.read.json(pin_data_path, multiLine=true)
df_pin = spark.read.json(pin_data_path, multiLine=True, mode="PERMISSIVE")
df_geo = spark.read.json(geo_data_path, multiLine=True, mode="PERMISSIVE")
df_user = spark.read.json(user_data_path, multiLine=True, mode="PERMISSIVE")


display(df_pin)
display(df_geo)
display(df_user)