In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# AWS S3 bucket name: 
# to be edited for matching real situation!
AWS_S3_BUCKET = "user-0ea9a6e05a33-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/pinpip_mount"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)# this bridges

In [0]:
display(dbutils.fs.ls("/mnt/pinpip_mount/../.."))

path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,1702488399102
dbfs:/Volume/,Volume/,0,0
dbfs:/Volumes/,Volumes/,0,0
dbfs:/checkpoint/,checkpoint/,0,1702488399102
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/delta/,delta/,0,1702488399102
dbfs:/df_pin.csv/,df_pin.csv/,0,1702488399102
dbfs:/df_pin.parquet/,df_pin.parquet/,0,1702488399102
dbfs:/geo_dirty.csv/,geo_dirty.csv/,0,1702488399102


In [0]:
# File location and type
# Asterisk(*) indicates reading all the content of the specified file that have .json extension NB: THIS TIME THE DATA IS GEO FROM SPARK TOPIC .user
file_location = "/mnt/pinpip_mount/topics/*.user/partition=0/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"

# UNIFY CODE FOR EXTRACTING THE THREE DATASETS (originally FIRST .pin, THEN .geo and .user)

# Read in JSONs from mounted S3 bucket: df_user
df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
display(df_user)

# Read in JSONs from mounted S3 bucket: df_geo
df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
display(df_geo)

# Read in JSONs from mounted S3 bucket: df_pin
df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
display(df_pin)

age,date_joined,first_name,ind,last_name
27,2016-03-08 13:38:37,Christopher,2015,Bradshaw
59,2017-05-12 21:22:17,Alexander,10673,Cervantes
27,2016-03-08 13:38:37,Christopher,2015,Bradshaw
59,2017-05-12 21:22:17,Alexander,10673,Cervantes
48,2016-02-27 16:57:44,Christopher,1857,Hamilton
39,2016-06-29 20:43:59,Christina,6398,Davenport
20,2015-10-23 04:13:23,Alexandria,3599,Alvarado
39,2016-06-29 20:43:59,Christina,6398,Davenport
20,2015-10-23 04:13:23,Alexandria,3599,Alvarado
20,2015-10-23 04:13:23,Alexandria,4256,Alvarado
