In [0]:
#Link Databricks to AWS using an authentication key
# pyspark functions
import pyspark.sql.functions as F
import pyspark.sql.types as T

# URL processing
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")


In [0]:
# Read in pin data from Kinesis stream and convert to df. Transform df and save as Databricks Delta table


# Disable format checks during the reading of Delta tables
spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")

# 1. Read in pin table
pin_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12d6e5017cf5-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

#Deserialise the data column to create a dataframe containing all the data
df_pin_data = pin_stream.selectExpr("CAST(data as STRING)")

#Create schema for JSON
schema = T.StructType([ 
    T.StructField("index",T.StringType(),True), 
    T.StructField("unique_id",T.StringType(),True), 
    T.StructField("title",T.StringType(),True), 
    T.StructField("description", T.StringType(), True),
    T.StructField("poster_name", T.StringType(), True),
    T.StructField("follower_count", T.StringType(), True),
    T.StructField("tag_list", T.StringType(), True),
    T.StructField("is_image_or_video", T.StringType(), True),
    T.StructField("image_src", T.StringType(), True),
    T.StructField("downloaded", T.StringType(), True),
    T.StructField("save_location", T.StringType(), True),
    T.StructField("category", T.StringType(), True),
  ])

#Convert JSON string column to struct type
df2 = df_pin_data.withColumn("data",F.from_json(F.col("data"),schema))
df2.printSchema()

#Convert to multiple columns
df_pin=df2.select("data.*")
df_pin.printSchema()
display(df_pin)


#Clean df_pin dataframe

#Replace entries with no relevant data in each column with 'None'
df_pin = df_pin.replace({
    'No description available Story format':None,
    'User Info Error':None, 
    'Image src error.':None,
    'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e':None, 
    'No Title Data Available':None,
    '':None,
    'null':None
    })
#Replace all empty entries with 'None'
df_pin = df_pin.fillna(value='None')

#Transform follower_count to ensure every entry is a number, and cast to integer data type
df_pin = df_pin.withColumn('follower_count', F.regexp_replace('follower_count', 'k', '000').cast('int'))
display(df_pin)

#Cast downloaded and index to integer data type 
df_pin = df_pin.withColumn('downloaded', df_pin['downloaded'].cast('int'))
df_pin = df_pin.withColumn('index', df_pin['index'].cast('int'))

#Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn('save_location', F.regexp_replace('save_location', 'Local save in ', ''))

#Rename the index column to ind.
df_pin = df_pin.withColumnRenamed('index','ind')

#Reorder the DataFrame columns to have the following column order:
df_pin = df_pin[['ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category']]

#Print the column names and their data types
df_pin.printSchema()
display(df_pin)


#Save as Databricks Delta table
df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12d6e5017cf5_pin_table")


index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category
7102,de70426d-b46f-40f7-af3e-1baf13b2e7f6,Stylish Mens Outfits,No description available,Caleb,49.0,"Stylish Mens Outfits,Casual Summer Outfits,Outfit Summer,Men's Winter Outfits,Fall Hiking Outfit,Nice Casual Outfits For Men,Stylish Jeans For Men,Mens Fashion Summer Outfits,Business Casual Men",image,https://i.pinimg.com/originals/3e/d8/a3/3ed8a3338515e50a3e0a68881318b801.jpg,1,Local save in /data/mens-fashion,mens-fashion
3089,88f9227e-88d0-4b1c-b0be-bcfc3028b8e2,,,,,,multi-video(story page format),,0,Local save in /data/diy-and-crafts,diy-and-crafts
2959,439cac4b-18e8-4aaa-88c5-edb26a701111,The Cutest Toilet Paper Crafts,One thing I love about Crafts is how the evolve! These toilet paper rolls are adorable and modern. I love that something as simple as a recycled toilet paper roll can be taken t…,CraftGossip.com,502000.0,"Toilet Paper Roll Crafts,Cardboard Crafts,Diy Paper,Paper Crafting,Toilet Paper Rolls,Cardboard Playhouse,Paper Art,Kids Crafts,Toddler Crafts",image,https://i.pinimg.com/originals/c6/cd/0d/c6cd0d04da9cfc11566cfd34eeb1f918.jpg,1,Local save in /data/diy-and-crafts,diy-and-crafts
10449,667b4d08-82cc-4fa1-9abd-7d62549b412e,Champagne Wishes and Caviar Dreams,"I'm a 50 year old lady from the United States. I have a 20 year old son, who I am raising alone. My blog has pretty things, fashion, food, , jewelry, beauty, luxury, and glamour…",Dakota Bronson,168.0,"Auto Rolls Royce,Voiture Rolls Royce,Luxury Sports Cars,Sport Cars,Rolls Royce Vintage,Maserati,Ferrari F40,Lamborghini,Automobile",image,https://i.pinimg.com/originals/4c/46/96/4c4696ce02c3447ec4ad2d1df779437d.jpg,1,Local save in /data/vehicles,vehicles
9979,2b2abc85-fc51-481f-8ae6-17681993da28,Paris in the Summer. 10 fun things to do in Paris in the Summertime • Petite in Paris,"Are you traveling to Paris during the summer? Find out what to do in Paris, France during the summer. Fun summertime activities in Paris. Enjoy the incredible outdoors when trav…",Petite in Paris,3000.0,"Torre Eiffel Paris,Tour Eiffel,Picnic In Paris,Hello France,Voyage Europe,Destination Voyage,Beautiful Places To Travel,Travel Aesthetic,Paris Travel",image,https://i.pinimg.com/originals/6c/4c/90/6c4c90bba27ebf8c8bfe4c1acfb9f07a.jpg,1,Local save in /data/travel,travel
8887,5df9f6e5-07f5-4ce8-a82e-96586bbc05d8,25 Ultra Sexy Back Tattoo Ideas For Girls,Tattoos are one of the most efficient ways through which one decides to express themselves…,RapidLeaks,4000.0,"Dream Tattoos,Body Art Tattoos,New Tattoos,Small Tattoos,Cross Tattoos,Random Tattoos,Fashion Tattoos,Bird Tattoos,Fitness Tattoos",image,https://i.pinimg.com/originals/ab/8e/50/ab8e505b04d4abc8f23e273c15f8a65d.jpg,1,Local save in /data/tattoos,tattoos
6582,fc86b899-a5fb-44ff-8ec7-c2a962703fe6,A ‘Hollywood Bungalow’ Style House Has Some Surprisingly Affordable Decor Ideas,"This ""Hollywood bungalow"" style house has some surprisingly affordable decor ideas. | House Tours by Apartment Therapy #bungalow #livingroom #livingroomideas #livingroomdecor #l…",Apartment Therapy,,"Decor Room,Room Decorations,Diy Home Decor,Tv Decor,Wall Decor,Natural Home Decor,Entryway Decor,Natural Interior,Wood Home Decor",image,https://i.pinimg.com/originals/5b/cf/7d/5bcf7d252837d034580b3c0b9017436f.jpg,1,Local save in /data/home-decor,home-decor
8593,dc6e4bff-3bc9-4f74-8c77-06b871ea1936,Sunsum® Manifestation Temporary Tattoos - No. 0 - Soul Tattoos,"Sunsum® Manifestation Temporary Tattoos, No. 0 - Soul. We are connected to all things. The Tree of Life symbol represents our personal development, uniqueness and individual bea…",TheWMarketplace,11.0,"Candle Tattoo,Tree Of Life Symbol,Soul Tattoo,Dog Smells,Candle Branding,Best Candles,Get A Tattoo,Temporary Tattoo,Burning Candle",image,https://i.pinimg.com/originals/6d/38/72/6d3872aef00be9ab8e403d2062ea0285.jpg,1,Local save in /data/tattoos,tattoos
719,d7c53e34-9540-4f48-a31b-89b6ed1852bb,10 Art Sub Lessons that only need a Pencil,10 art sub lessons that only need a pencil. Cover lessons for art teachers. Make the perfect art sub lessson folder with this amazing resources.,The Arty Teacher,25000.0,"Art Lessons For Kids,Art Lessons Elementary,Art For Kids,Art Sub Plans,Art Lesson Plans,Art Substitute Plans,High School Art,Middle School Art,Primary School Art",image,https://i.pinimg.com/originals/ee/a8/78/eea878911033897d981a69d9f6b2fb7c.png,1,Local save in /data/art,art
2579,c5372c45-a226-40a4-b46f-e5bf3824b2a5,60 Best DIY Christmas Garlands,60 Best DIY Christmas Garlands #Christmas #ChristmasDecor #ChristmasDecorations #DIY #ChristmasCrafts,"Prudent Penny Pincher - Home Decor, Organization, Crafts, Recipes",647000.0,"Diy Christmas Garland,Christmas Mantels,Diy Christmas Gifts,Rustic Christmas,All Things Christmas,Christmas Holidays,Christmas Vacation,Christmas Island,Christmas Ideas",image,https://i.pinimg.com/originals/ca/eb/de/caebde40a05dc4f2d668d9d8c1746e3a.jpg,1,Local save in /data/christmas,christmas


In [0]:
# 2. Read in geo table, transform data, and write to Databricks Delta table
geo_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12d6e5017cf5-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
#Deserialise the data column to create a dataframe containing all the data
df_geo_data = geo_stream.selectExpr("CAST(data as STRING)")

#Create schema for JSON
schema = T.StructType([ 
    T.StructField("ind",T.StringType(),True), 
    T.StructField("timestamp",T.StringType(),True), 
    T.StructField("latitude",T.StringType(),True), 
    T.StructField("longitude", T.StringType(), True),
    T.StructField("country", T.StringType(), True)
  ])

#Convert JSON string column to struct type
df2 = df_geo_data.withColumn("data",F.from_json(F.col("data"),schema))
df2.printSchema()

#Convert to multiple columns
df_geo=df2.select("data.*")
df_geo.printSchema()


#Clean the df_geo dataframe

#Create a new column coordinates that contains an array based on the latitude and longitude columns
#Drop the latitude and longitude columns from the DataFrame
#Reorder the DataFrame columns:
df_geo = df_geo.select('ind', 'country', F.array('latitude', 'longitude').alias('coordinates'), 'timestamp')

#Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn('timestamp', F.regexp_replace('timestamp', 'T',' '))
df_geo = df_geo.withColumn('timestamp', F.to_timestamp('timestamp'))

#View the final table and data types
display(df_geo)
df_geo.printSchema()

#Save as Databricks Delta table
df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12d6e5017cf5_geo_table")


ind,country,coordinates,timestamp
3454,Cambodia,"List(-0.375174, 49.8106)",2021-07-25T02:20:29.000+0000
2959,Angola,"List(-68.0095, -157.227)",2019-08-19T08:22:02.000+0000
9275,American Samoa,"List(-88.2286, -178.919)",2017-12-07T02:39:00.000+0000
2698,Egypt,"List(-72.7174, 24.169)",2021-11-24T08:33:51.000+0000
831,Congo,"List(-43.7816, -66.1592)",2021-06-23T23:16:10.000+0000
808,Albania,"List(-71.6856, -179.126)",2019-01-03T15:43:12.000+0000
8856,Burundi,"List(58.9262, -107.682)",2022-02-11T04:37:58.000+0000
7441,Aruba,"List(-86.4063, -136.657)",2020-03-02T20:07:23.000+0000
9795,Ecuador,"List(-82.0334, -110.476)",2019-11-12T22:57:25.000+0000
3120,Bahrain,"List(-50.4777, -76.5482)",2022-01-23T15:53:47.000+0000


In [0]:
# 3. Read in user table, transform data, and write to Databricks Delta table
user_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12d6e5017cf5-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
#Deserialise the data column to create a dataframe containing all the data
df_user_data = user_stream.selectExpr("CAST(data as STRING)")


#Create schema for JSON
schema = T.StructType([ 
    T.StructField("ind",T.StringType(),True), 
    T.StructField("first_name",T.StringType(),True), 
    T.StructField("last_name",T.StringType(),True), 
    T.StructField("age", T.StringType(), True),
    T.StructField("date_joined", T.StringType(), True)
  ])

#Convert JSON string column to struct type
df2 = df_user_data.withColumn("data",F.from_json(F.col("data"),schema))
df2.printSchema()

#Convert to multiple columns
df_user=df2.select("data.*")
df_user.printSchema()


#Clean the df_user dataframe

#Create a new column user_name that concatenates the information found in the first_name and last_name columns. And drop the first_name and last_name columns.
df_user = df_user.select('ind', F.concat('first_name', 'last_name').alias('user_name'), 'age', 'date_joined')

#Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn('date_joined', F.to_timestamp('date_joined'))

#View the final table and data types 
df_user.printSchema()
display(df_user)

#Save as Databricks Delta table
df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12d6e5017cf5_user_table")

ind,user_name,age,date_joined
1313,BrittanyJones,32,2016-04-02T03:51:23.000+0000
1313,BrittanyJones,32,2016-04-02T03:51:23.000+0000
9275,AbigailBates,20,2015-11-07T20:59:32.000+0000
2074,AnnetteForbes,21,2016-01-03T15:42:12.000+0000
8887,AustinRodriguez,24,2016-03-31T20:56:39.000+0000
2698,KaylaBurton,44,2017-06-21T19:53:27.000+0000
10663,JulieCox,23,2016-06-23T14:38:00.000+0000
5468,LisaGamble,20,2016-07-23T20:51:06.000+0000
808,AaronBartlett,21,2015-11-24T02:15:36.000+0000
1094,AnnElliott,20,2016-07-02T02:32:30.000+0000
