In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# AWS S3 bucket name
AWS_S3_BUCKET = "user-12e255fc4fcd-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/mount_name"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

path,name,size,modificationTime
dbfs:/mnt/0a2a5872851b-s3-bucket/,0a2a5872851b-s3-bucket/,0,0
dbfs:/mnt/0a2bc878981f-s3-data/,0a2bc878981f-s3-data/,0,0
dbfs:/mnt/0a2f66c3e41f.df_geo/,0a2f66c3e41f.df_geo/,0,0
dbfs:/mnt/0a2f66c3e41f.df_pin/,0a2f66c3e41f.df_pin/,0,0
dbfs:/mnt/0a2f66c3e41f.df_user/,0a2f66c3e41f.df_user/,0,0
dbfs:/mnt/0a2f66c3e41f.geo/,0a2f66c3e41f.geo/,0,0
dbfs:/mnt/0a4ac73a0561-mount/,0a4ac73a0561-mount/,0,0
dbfs:/mnt/0a4ac73a0561-mounted/,0a4ac73a0561-mounted/,0,0
dbfs:/mnt/0a5afda0229f/,0a5afda0229f/,0,0
dbfs:/mnt/0a65154c50dd-mount/,0a65154c50dd-mount/,0,0


In [0]:
%fs ls /mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/

path,name,size,modificationTime
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001703.json,12e255fc4fcd.geo+0+0000001703.json,108,1703791925000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001704.json,12e255fc4fcd.geo+0+0000001704.json,108,1703791934000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001705.json,12e255fc4fcd.geo+0+0000001705.json,108,1703791947000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001706.json,12e255fc4fcd.geo+0+0000001706.json,113,1703791958000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001707.json,12e255fc4fcd.geo+0+0000001707.json,105,1703791967000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001708.json,12e255fc4fcd.geo+0+0000001708.json,107,1703791972000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001709.json,12e255fc4fcd.geo+0+0000001709.json,113,1703791983000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001710.json,12e255fc4fcd.geo+0+0000001710.json,125,1703791994000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001711.json,12e255fc4fcd.geo+0+0000001711.json,109,1703791998000
dbfs:/mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/12e255fc4fcd.geo+0+0000001712.json,12e255fc4fcd.geo+0+0000001712.json,111,1703792013000


In [0]:
%sql
/* turn-off format checks during the reading of Delta tables */
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
#location for files I would like to read from
file_location_geo = "mnt/mount_name/topics/12e255fc4fcd.geo/partition=0/*.json"
file_location_user = "mnt/mount_name/topics/12e255fc4fcd.user/partition=0/*.json"
file_location_pin = "mnt/mount_name/topics/12e255fc4fcd.pin/partition=0/*.json"

file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df_geo = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_geo)

df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_user)

df_pin = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location_pin)
# Display Spark dataframe to check its content
display(df_geo)

country,ind,latitude,longitude,timestamp
British Indian Ocean Territory (Chagos Archipelago),9455,-82.9272,-150.346,2022-03-15T01:46:32
British Indian Ocean Territory (Chagos Archipelago),6814,-86.5675,-149.565,2022-09-02T11:34:28
British Indian Ocean Territory (Chagos Archipelago),5111,-83.7472,8.65953,2021-04-01T00:56:57
Antarctica (the territory South of 60 deg S),10073,-32.8885,-170.295,2021-06-29T19:56:04
Antarctica (the territory South of 60 deg S),10073,-32.8885,-170.295,2021-06-29T19:56:04
Antarctica (the territory South of 60 deg S),2418,-88.4642,-171.061,2022-05-27T11:30:59
Antarctica (the territory South of 60 deg S),5162,-71.6607,-149.206,2019-09-27T19:06:43
Antarctica (the territory South of 60 deg S),1335,-77.9931,-175.682,2022-03-19T17:29:42
Antarctica (the territory South of 60 deg S),9185,-10.3764,-22.9809,2019-10-06T18:12:55
Antarctica (the territory South of 60 deg S),9335,-88.4642,-171.061,2020-11-14T23:42:22
