In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e284c63dbbf-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e284c63dbbf-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0e284c63dbbf-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
display(df_pin)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
7b809a18-f685-4fce-8ca7-ecf6bf0a8611,eyJpbmRleCI6NTczMCwidW5pcXVlX2lkIjoiMWUxZjBjOGItOWZjZi00NjBiLTkxNTQtYzc3NTgyNzIwNmViIiwidGl0bGUiOiJJc2xhbmQgT2FzaXMgQ291cG9uIE9yZ2FuaXplciIsImRlc2NyaXB0aW9uIjoiRGVzY3JpcHQ= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178113304805666838526717591554,2024-05-06T23:33:02.471+0000
ff93236f-d538-4af5-b98e-cea9b7a87466,eyJpbmRleCI6NTA2OSwidW5pcXVlX2lkIjoiYjc1YjZmODctZGViMy00NDRmLWIyOWUtY2U5MTYxYjJkZjQ5IiwidGl0bGUiOiJUaGUgVmF1bHQ6IEN1cmF0ZWQgJiBSZWZpbmVkIFdlZGRpbmcgSW5zcGlyYXRpb24iLCJkZXM= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178121767286404141824293732354,2024-05-06T23:33:14.870+0000
d9851a83-75bd-431b-b8c2-4bcddb09ac6d,eyJpbmRleCI6MzA4OSwidW5pcXVlX2lkIjoiODhmOTIyN2UtODhkMC00YjFjLWIwYmUtYmNmYzMwMjhiOGUyIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178124185138043371288801574914,2024-05-06T23:33:17.695+0000
e556dbbd-411d-4a46-98a5-473148e069d7,eyJpbmRleCI6NjA2MywidW5pcXVlX2lkIjoiNjA2OTM3MjctNDkyNy00YmQ2LWE4YzUtMDk2YTM5MmQ2M2U2IiwidGl0bGUiOiI0MSBHb3JnZW91cyBGYWxsIERlY29yIElkZWFzIEZvciBZb3VyIEhvbWUgLSBDaGF5bG9yICY= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178125394063862986124134711298,2024-05-06T23:33:20.125+0000
ad2deb81-baf2-46aa-98da-5c0f1f8f7025,eyJpbmRleCI6NzM0MywidW5pcXVlX2lkIjoiYjhjNjNhOTUtNmZlOC00ZDdmLTk0NTUtMWVlZjM0NjJmZWUwIiwidGl0bGUiOiJUaGlzIEZhc2hpb24gUnVsZSBJcyBOb25zZW5zZSwgQW5kIFdlIFNob3VsZCBBbGwgRm9yZ2U= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178137483322059133721551831042,2024-05-06T23:33:39.613+0000
5ed11cd4-8644-48c4-9861-a437872cb7a0,eyJpbmRleCI6OTk3OSwidW5pcXVlX2lkIjoiMmIyYWJjODUtZmM1MS00ODFmLThhZTYtMTc2ODE5OTNkYTI4IiwidGl0bGUiOiJQYXJpcyBpbiB0aGUgU3VtbWVyLiAxMCBmdW4gdGhpbmdzIHRvIGRvIGluIFBhcmlzIGluIHQ= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178139901173698363186059673602,2024-05-06T23:33:43.510+0000
a4fd6f78-54ad-4691-994f-3b65cac07137,eyJpbmRleCI6NzE2NiwidW5pcXVlX2lkIjoiNGE4NDRiMDMtZTE2MS00N2ExLTkwNGItNTkxZWI1ZGM0ZmIxIiwidGl0bGUiOiJUaGUgS2lsbGVycyAtIE1yLiBCcmlnaHRzaWRlIC0gV29tZW4ncyBULVNoaXJ0IC0gSGVhdGg= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178150781506074896223021563906,2024-05-06T23:34:02.824+0000
d04e6e3b-00fb-4ed9-be2b-e67f64af8f91,eyJpbmRleCI6OTU5MCwidW5pcXVlX2lkIjoiYjg0OWU5NzYtNTJmNi00NWQ3LThiMjUtNTE1NTkzMThmMTY3IiwidGl0bGUiOiJUb3AgMjAgVGhpbmdzIFRvIFNlZSBBbmQgRG8gSW4gSXJlbGFuZCIsImRlc2NyaXB0aW9uIjo= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178154408283533740522862542850,2024-05-06T23:34:09.208+0000
ab3e0448-b98b-4e6d-9248-e3e1d46ecf94,eyJpbmRleCI6NDUwOCwidW5pcXVlX2lkIjoiOTA2NGY0YTItMjc1My00NzZjLTgxNWUtZGIzNjBmNDVhOTNlIiwidGl0bGUiOiJDdXN0b20gRXZlbnQgYW5kIFNob3AgTmVvbiBTaWduIExpZ2h0cyAtIEV2ZW50ICYgU2hvcCI= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178156826135172970124809338882,2024-05-06T23:34:13.206+0000
333516ea-5c4f-4137-8190-b7a3fe18279f,eyJpbmRleCI6MTcwNCwidW5pcXVlX2lkIjoiNWZiZjk4NjMtZmI3OS00NzdjLWE1YjYtNTQwYzMwMjBhNTVmIiwidGl0bGUiOiJDaHJpc3RtYXMgVHJlZXMgRnJvbSBQYWxsZXQgV29vZCB8IEhvbGlkYXkgRElZIiwiZGVzY3I= (truncated),streaming-0e284c63dbbf-pin,shardId-000000000000,49651803373326359874034070178168915393369117584787505154,2024-05-06T23:34:31.560+0000


In [0]:
display(df_user)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
239a7f25-5ead-4f52-ba73-41c0309f6fc4,eyJpbmQiOjU3MzAsImZpcnN0X25hbWUiOiJSYWNoZWwiLCJsYXN0X25hbWUiOiJEYXZpcyIsImFnZSI6MzYsImRhdGVfam9pbmVkIjoiMjAxNS0xMi0wOFQyMDowMjo0MyJ9,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447917165132750568572749611010,2024-05-06T23:33:03.236+0000
f28ba07e-9b85-4f9e-8286-5a5ae1bc1e7b,eyJpbmQiOjEzMTMsImZpcnN0X25hbWUiOiJCcml0dGFueSIsImxhc3RfbmFtZSI6IkpvbmVzIiwiYWdlIjozMiwiZGF0ZV9qb2luZWQiOiIyMDE2LTA0LTAyVDAzOjUxOjIzIn0=,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447920791910209412872590589954,2024-05-06T23:33:08.614+0000
2689b15a-2287-43c5-ba2e-506b71f1fb4f,eyJpbmQiOjMwODksImZpcnN0X25hbWUiOiJBYmlnYWlsIiwibGFzdF9uYW1lIjoiQWxpIiwiYWdlIjoyMCwiZGF0ZV9qb2luZWQiOiIyMDE1LTEwLTI0VDExOjIzOjUxIn0=,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447928045465127101266114117634,2024-05-06T23:33:18.403+0000
4258da03-50d1-4867-aa93-5b6253b4c634,eyJpbmQiOjYwNjMsImZpcnN0X25hbWUiOiJDb3JleSIsImxhc3RfbmFtZSI6IkFuZHJld3MiLCJhZ2UiOjIzLCJkYXRlX2pvaW5lZCI6IjIwMTUtMTEtMjVUMTM6MzY6MjIifQ==,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447929254390946716170166730754,2024-05-06T23:33:20.859+0000
aa4346cf-0380-44dd-aba7-c30164b4a762,eyJpbmQiOjEwNjI1LCJmaXJzdF9uYW1lIjoiQ2hyaXN0aWFuIiwibGFzdF9uYW1lIjoiTGFuZyIsImFnZSI6MzIsImRhdGVfam9pbmVkIjoiMjAxNy0xMC0xMFQyMDowOTozMyJ9,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447934090094225175099182415874,2024-05-06T23:33:30.181+0000
d2e7f40a-222c-4f0c-924a-e94b55017116,eyJpbmQiOjI0MTgsImZpcnN0X25hbWUiOiJBbWFuZGEiLCJsYXN0X25hbWUiOiJBZGFtcyIsImFnZSI6MjAsImRhdGVfam9pbmVkIjoiMjAxNS0xMC0yMVQwODoyNzozNiJ9,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447937716871684019536462348290,2024-05-06T23:33:35.815+0000
c20e8704-adcd-4f1b-9882-d19146313847,eyJpbmQiOjk2NzIsImZpcnN0X25hbWUiOiJKZW5uaWZlciIsImxhc3RfbmFtZSI6Ikh1ZHNvbiIsImFnZSI6MjIsImRhdGVfam9pbmVkIjoiMjAxNi0wMi0xMVQyMDo0NjowNCJ9,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447946179352421322765319012354,2024-05-06T23:33:48.380+0000
a09077ea-7bb0-4529-9ec9-9fdcb9ba2a21,eyJpbmQiOjg4ODcsImZpcnN0X25hbWUiOiJBdXN0aW4iLCJsYXN0X25hbWUiOiJSb2RyaWd1ZXoiLCJhZ2UiOjI0LCJkYXRlX2pvaW5lZCI6IjIwMTYtMDMtMzFUMjA6NTY6MzkifQ==,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447948597204060552367265808386,2024-05-06T23:33:53.336+0000
4507906c-67f5-4e61-a19b-ab8cba364071,eyJpbmQiOjQ1MDgsImZpcnN0X25hbWUiOiJNaWNoYWVsIiwibGFzdF9uYW1lIjoiQ2FydGVyIiwiYWdlIjo1OCwiZGF0ZV9qb2luZWQiOiIyMDE2LTA2LTAzVDIzOjM1OjMwIn0=,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447960686462256700102121881602,2024-05-06T23:34:14.015+0000
7eb7349d-1336-49d5-9366-6fd2861eceff,eyJpbmQiOjUwNzYsImZpcnN0X25hbWUiOiJDaHJpc3RvcGhlciIsImxhc3RfbmFtZSI6IkJ1dGxlciIsImFnZSI6MjAsImRhdGVfam9pbmVkIjoiMjAxNS0xMi0wMVQxNTowODozMSJ9,streaming-0e284c63dbbf-user,shardId-000000000000,49651803346565465635797322447963104313895929635349200898,2024-05-06T23:34:16.960+0000


In [0]:
display(df_geo)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
bd258b97-709a-4ead-8481-7b0fcbcaef06,eyJpbmQiOjgzMDQsInRpbWVzdGFtcCI6IjIwMTktMDktMTNUMDQ6NTA6MjkiLCJsYXRpdHVkZSI6LTI4Ljg4NTIsImxvbmdpdHVkZSI6LTE2NC44NywiY291bnRyeSI6IkZyZW5jaCBHdWlhbmEifQ==,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098049312719341748079659570954242,2024-05-06T00:23:56.511+0000
b48f6b17-de28-4eeb-a867-df6eb8d17779,eyJpbmQiOjEzMTMsInRpbWVzdGFtcCI6IjIwMTgtMDYtMjZUMDI6Mzk6MjUiLCJsYXRpdHVkZSI6NzcuMDQ0NywibG9uZ2l0dWRlIjo2MS45MTE5LCJjb3VudHJ5IjoiTWFsZGl2ZXMifQ==,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098050881905055607868603217477634,2024-05-06T00:24:00.054+0000
88c04590-9984-423b-96e7-fc7c99d7521a,eyJpbmQiOjQzMTUsInRpbWVzdGFtcCI6IjIwMTktMTItMTVUMDM6NTE6MjgiLCJsYXRpdHVkZSI6LTQ1Ljg1MDgsImxvbmdpdHVkZSI6NjYuMTAwMywiY291bnRyeSI6IkNvdGUgZCdJdm9pcmUifQ==,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098052153695017842458632447328258,2024-05-06T00:24:02.701+0000
2e068c55-13fb-414a-835f-e1abc073afad,eyJpbmQiOjEwNzk0LCJ0aW1lc3RhbXAiOiIyMDIyLTAxLTAxVDAyOjI2OjUwIiwibGF0aXR1ZGUiOi04OS41MjM2LCJsb25naXR1ZGUiOi0xNTQuNTY3LCJjb3VudHJ5IjoiQ29jb3MgKEtlZWxpbmcpIElzbGFuZHMifQ==,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098052981809204278479685840535554,2024-05-06T00:24:04.402+0000
1299b3b2-3c68-4525-b90d-0d5890ffbad8,eyJpbmQiOjYwNjMsInRpbWVzdGFtcCI6IjIwMjEtMDctMjBUMDk6MDI6NDciLCJsYXRpdHVkZSI6LTg5LjE3OTcsImxvbmdpdHVkZSI6LTE3NC4wMTUsImNvdW50cnkiOiJBbmd1aWxsYSJ9,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098056913235969665254449179787266,2024-05-06T00:24:13.363+0000
dd2fa5ca-88b5-455c-9d13-64ea1422a178,eyJpbmQiOjk4NzUsInRpbWVzdGFtcCI6IjIwMjAtMDMtMjBUMTM6MDM6MTgiLCJsYXRpdHVkZSI6LTc0LjMzODIsImxvbmdpdHVkZSI6LTExMC40ODQsImNvdW50cnkiOiJCYXJiYWRvcyJ9,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098062053588554666658662103121922,2024-05-06T00:24:26.987+0000
b544358d-2df0-46e1-abab-24e4e527cdce,eyJpbmQiOjg2NTMsInRpbWVzdGFtcCI6IjIwMjItMDQtMTFUMTg6MzA6MTkiLCJsYXRpdHVkZSI6NDguNDU2OSwibG9uZ2l0dWRlIjotMTM5LjY1OCwiY291bnRyeSI6IlNleWNoZWxsZXMifQ==,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098069922486714538281197216202754,2024-05-06T00:24:45.696+0000
de46cf29-5406-42a4-a381-379b63723b97,eyJpbmQiOjc5MjIsInRpbWVzdGFtcCI6IjIwMjEtMDEtMjdUMDk6MTQ6MTkiLCJsYXRpdHVkZSI6LTg4LjA5NzQsImxvbmdpdHVkZSI6LTE3Mi4wNTIsImNvdW50cnkiOiJBbnRpZ3VhIGFuZCBCYXJidWRhIn0=,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098072952054818492542321346740226,2024-05-06T00:24:51.163+0000
53b562d4-3093-486e-86ac-d1e0681cfcb1,eyJpbmQiOjI2OTgsInRpbWVzdGFtcCI6IjIwMjEtMTEtMjRUMDg6MzM6NTEiLCJsYXRpdHVkZSI6LTcyLjcxNzQsImxvbmdpdHVkZSI6MjQuMTY5LCJjb3VudHJ5IjoiRWd5cHQifQ==,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098080351889760353688667954348034,2024-05-06T00:25:09.172+0000
28ece190-8076-4617-948d-6ddece88407b,eyJpbmQiOjgwOCwidGltZXN0YW1wIjoiMjAxOS0wMS0wM1QxNTo0MzoxMiIsImxhdGl0dWRlIjotNzEuNjg1NiwibG9uZ2l0dWRlIjotMTc5LjEyNiwiY291bnRyeSI6IkFsYmFuaWEifQ==,streaming-0e284c63dbbf-geo,shardId-000000000000,49650559150168775755628098092566876241739903979489329154,2024-05-06T00:25:39.293+0000


#### Defining streaming schema using StructType

In [0]:
# For pin data
pin_schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])
# "ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define the schema to match the structure of your JSON data
user_test_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),  
    StructField("date_joined", StringType(), True)
])

# "ind", "user_name", "age", "date_joined"

In [0]:
geo_test_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("country", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("timestamp", StringType(), True)
])

#### Deserialize the data column of the dataframe to a string

In [0]:
from pyspark.sql.types import StructType,StructField, StringType


# df_geo = df_geo.selectExpr("CAST(data AS STRING)") \
#                     .select('data', from_json('data', geo_schema)) \
#                     .select("data.*")

df_geo_parsed = df_geo.selectExpr("CAST(data AS STRING)") \
                     .select('data', from_json('data', geo_test_schema).alias('parsed')) \
                     .select("parsed.*")
# Displaying the DataFrames
display(df_geo_parsed)

ind,country,longitude,latitude,timestamp
8304,French Guiana,-164.87,-28.8852,2019-09-13T04:50:29
1313,Maldives,61.9119,77.0447,2018-06-26T02:39:25
4315,Cote d'Ivoire,66.1003,-45.8508,2019-12-15T03:51:28
10794,Cocos (Keeling) Islands,-154.567,-89.5236,2022-01-01T02:26:50
6063,Anguilla,-174.015,-89.1797,2021-07-20T09:02:47
9875,Barbados,-110.484,-74.3382,2020-03-20T13:03:18
8653,Seychelles,-139.658,48.4569,2022-04-11T18:30:19
7922,Antigua and Barbuda,-172.052,-88.0974,2021-01-27T09:14:19
2698,Egypt,24.169,-72.7174,2021-11-24T08:33:51
808,Albania,-179.126,-71.6856,2019-01-03T15:43:12


In [0]:
from pyspark.sql.functions import from_json

df_user_parsed = df_user.selectExpr("CAST(data AS STRING)") \
                        .select('data', from_json('data', user_test_schema).alias('parsed')) \
                        .select("parsed.*")

# Displaying the parsed DataFrame
display(df_user_parsed)


ind,first_name,last_name,age,date_joined
5730,Rachel,Davis,36,2015-12-08T20:02:43
1313,Brittany,Jones,32,2016-04-02T03:51:23
3089,Abigail,Ali,20,2015-10-24T11:23:51
6063,Corey,Andrews,23,2015-11-25T13:36:22
10625,Christian,Lang,32,2017-10-10T20:09:33
2418,Amanda,Adams,20,2015-10-21T08:27:36
9672,Jennifer,Hudson,22,2016-02-11T20:46:04
8887,Austin,Rodriguez,24,2016-03-31T20:56:39
4508,Michael,Carter,58,2016-06-03T23:35:30
5076,Christopher,Butler,20,2015-12-01T15:08:31


In [0]:
from pyspark.sql.functions import from_json, col

df_pin_parsed = df_pin.selectExpr("CAST(data AS STRING)") \
                      .select('data', from_json('data', pin_schema).alias('parsed')) \
                      .select("parsed.*")

display(df_pin_parsed)

index,unique_id,title,description,poster_name,follower_count,tag_list,is_image_or_video,image_src,downloaded,save_location,category
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",Consuelo Aguirre,0,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,1,Local save in /data/finance,finance
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,Style Me Pretty,6M,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,1,Local save in /data/event-planning,event-planning
3089,88f9227e-88d0-4b1c-b0be-bcfc3028b8e2,No Title Data Available,No description available Story format,User Info Error,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,0,Local save in /data/diy-and-crafts,diy-and-crafts
6063,60693727-4927-4bd6-a8c5-096a392d63e6,41 Gorgeous Fall Decor Ideas For Your Home - Chaylor & Mads,"Beautiful and easy ways to update every room in your home with fall decor. Plus, my favorite finds in fall decor for 2020!","Kristen | Lifestyle, Mom Tips & Teacher Stuff Blog",92k,"Fall Home Decor,Autumn Home,Fall Decor Outdoor,Front Porch Fall Decor,Home Decor Ideas,Porch Ideas For Fall,Fall Outdoor Decorating,Decorating Ideas For Fall,Fall Front Doors",image,https://i.pinimg.com/originals/e5/ae/dc/e5aedc14ce557e3a69f672e0f8c88f6e.png,1,Local save in /data/home-decor,home-decor
7343,b8c63a95-6fe8-4d7f-9455-1eef3462fee0,"This Fashion Rule Is Nonsense, And We Should All Forget About It","Black And Brown Outfits You Need To Try This Fall Season. The trick with black and brown outfits is to know how to combine colors, fabrics, and shapes to get a cohesive look. So…",Cultura Colectiva,1M,"Winter Outfits Men,Stylish Mens Outfits,Casual Outfits,Men Casual,Smart Casual,Outfits For Men,Fall Outfits,Mens Winter Boots,Business Casual Men",image,https://i.pinimg.com/originals/3e/49/09/3e4909c0ccc4dbba3cad83d97eab4a61.png,1,Local save in /data/mens-fashion,mens-fashion
9979,2b2abc85-fc51-481f-8ae6-17681993da28,Paris in the Summer. 10 fun things to do in Paris in the Summertime • Petite in Paris,"Are you traveling to Paris during the summer? Find out what to do in Paris, France during the summer. Fun summertime activities in Paris. Enjoy the incredible outdoors when trav…",Petite in Paris,3k,"Torre Eiffel Paris,Tour Eiffel,Picnic In Paris,Hello France,Voyage Europe,Destination Voyage,Beautiful Places To Travel,Travel Aesthetic,Paris Travel",image,https://i.pinimg.com/originals/6c/4c/90/6c4c90bba27ebf8c8bfe4c1acfb9f07a.jpg,1,Local save in /data/travel,travel
7166,4a844b03-e161-47a1-904b-591eb5dc4fb1,The Killers - Mr. Brightside - Women's T-Shirt - Heather Dark Grey / S,"Women's T-shirt. Design inspired by the rock band The Killers' hit ""Mr. Brightside"". One of the greatest song from the album Hot Fuss released in 2004. Soft and light, 100% cott…",Mala Rock | Rock T-shirts,27,"Mr Brightside,Rock T Shirts,Greatest Songs,Timeless Classic,Rock Bands,Album,T Shirts For Women,Inspired,Hot",image,https://i.pinimg.com/originals/8c/42/39/8c42391d35fcad51a4a79f7cd81bf26d.jpg,1,Local save in /data/mens-fashion,mens-fashion
9590,b849e976-52f6-45d7-8b25-51559318f167,Top 20 Things To See And Do In Ireland,Best spots to see for travel through Ireland!,Fun Life Crisis,130k,"Vacation Ideas,Vacation Spots,Emerald Isle,London England,Travel Guides,Travel Tips,Travel Hacks,Travel Packing,Places To Travel",image,https://i.pinimg.com/originals/c7/50/d3/c750d36524856873d64406652d69b4fb.png,1,Local save in /data/travel,travel
4508,9064f4a2-2753-476c-815e-db360f45a93e,Custom Event and Shop Neon Sign Lights - Event & Shop,"Personalize your event or shop with a customized neon sign. Make a statement with your own custom vibes! This light is 32 -40 inches (80cm-100cm) if you need something bigger, p…",Life of Neon | Custom Neon Light Signs | Home Decor Wall Art,111,"Our Wedding,Wedding Venues,Dream Wedding,Wedding Cakes,Church Wedding,Wedding Flowers,Lace Wedding,Wedding Rings,Wedding Dresses",image,https://i.pinimg.com/originals/e9/c0/7c/e9c07cf0cf16cab23764a36718ab76c1.jpg,1,Local save in /data/event-planning,event-planning
1704,5fbf9863-fb79-477c-a5b6-540c3020a55f,Christmas Trees From Pallet Wood | Holiday DIY,Christmas Trees From Pallet Wood | Holiday DIY: Deck the yard with some fun outdoor Christmas Trees! We made these merry and bright decorations from two old pallets we had lying…,Instructables,3M,"Pallet Wood Christmas Tree,Wooden Christmas Crafts,Diy Christmas Tree,Christmas Projects,Holiday Crafts,Wooden Xmas Trees,Different Christmas Trees,Pallet Tree,Christmas Kitchen",image,https://i.pinimg.com/originals/64/7b/ca/647bca35169b7c144604116c64bcba8a.png,1,Local save in /data/christmas,christmas


#### Cleaning Process for the streaming data 

Started of first with cleaning the pin data

In [0]:
# Replace empty entries and entries with no relevant data in each column with Nones

# List of fields to be replaced that also can mean empty entries
fields_to_replace = ["", "No description available Story format", "User Info Error", "No Title Data Available", "No description available", "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", "Image src error."]

for column in df_pin_parsed.columns:
    df_pin_parsed = df_pin_parsed.withColumn(column, when(col(column).isin(fields_to_replace), None).otherwise(col(column)))

In [0]:
# Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int.
from pyspark.sql.functions import regexp_replace

df_pin_parsed = df_pin_parsed.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin_parsed = df_pin_parsed.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
df_pin_parsed = df_pin_parsed.withColumn("follower_count", df_pin_parsed["follower_count"].cast("int"))

# Ensure that each column containing numeric data has a numeric data type
# Nothing to change as downloaded and index are both LongType: 64-bit signed integer

# Clean the data in the save_location column to include only the save location path
df_pin_parsed = df_pin_parsed.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Rename the index column to ind
df_pin_parsed = df_pin_parsed.withColumnRenamed("index", "ind")

# Reorder the DataFrame columns to have the following column order:
df_pin_parsed = df_pin_parsed.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "downloaded", "save_location", "category")

In [0]:
display(df_pin_parsed)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,downloaded,save_location,category
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,1,/data/finance,finance
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,6000000.0,Style Me Pretty,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,1,/data/event-planning,event-planning
3089,88f9227e-88d0-4b1c-b0be-bcfc3028b8e2,,,,,,multi-video(story page format),,0,/data/diy-and-crafts,diy-and-crafts
6063,60693727-4927-4bd6-a8c5-096a392d63e6,41 Gorgeous Fall Decor Ideas For Your Home - Chaylor & Mads,"Beautiful and easy ways to update every room in your home with fall decor. Plus, my favorite finds in fall decor for 2020!",92000.0,"Kristen | Lifestyle, Mom Tips & Teacher Stuff Blog","Fall Home Decor,Autumn Home,Fall Decor Outdoor,Front Porch Fall Decor,Home Decor Ideas,Porch Ideas For Fall,Fall Outdoor Decorating,Decorating Ideas For Fall,Fall Front Doors",image,https://i.pinimg.com/originals/e5/ae/dc/e5aedc14ce557e3a69f672e0f8c88f6e.png,1,/data/home-decor,home-decor
7343,b8c63a95-6fe8-4d7f-9455-1eef3462fee0,"This Fashion Rule Is Nonsense, And We Should All Forget About It","Black And Brown Outfits You Need To Try This Fall Season. The trick with black and brown outfits is to know how to combine colors, fabrics, and shapes to get a cohesive look. So…",1000000.0,Cultura Colectiva,"Winter Outfits Men,Stylish Mens Outfits,Casual Outfits,Men Casual,Smart Casual,Outfits For Men,Fall Outfits,Mens Winter Boots,Business Casual Men",image,https://i.pinimg.com/originals/3e/49/09/3e4909c0ccc4dbba3cad83d97eab4a61.png,1,/data/mens-fashion,mens-fashion
9979,2b2abc85-fc51-481f-8ae6-17681993da28,Paris in the Summer. 10 fun things to do in Paris in the Summertime • Petite in Paris,"Are you traveling to Paris during the summer? Find out what to do in Paris, France during the summer. Fun summertime activities in Paris. Enjoy the incredible outdoors when trav…",3000.0,Petite in Paris,"Torre Eiffel Paris,Tour Eiffel,Picnic In Paris,Hello France,Voyage Europe,Destination Voyage,Beautiful Places To Travel,Travel Aesthetic,Paris Travel",image,https://i.pinimg.com/originals/6c/4c/90/6c4c90bba27ebf8c8bfe4c1acfb9f07a.jpg,1,/data/travel,travel
7166,4a844b03-e161-47a1-904b-591eb5dc4fb1,The Killers - Mr. Brightside - Women's T-Shirt - Heather Dark Grey / S,"Women's T-shirt. Design inspired by the rock band The Killers' hit ""Mr. Brightside"". One of the greatest song from the album Hot Fuss released in 2004. Soft and light, 100% cott…",27.0,Mala Rock | Rock T-shirts,"Mr Brightside,Rock T Shirts,Greatest Songs,Timeless Classic,Rock Bands,Album,T Shirts For Women,Inspired,Hot",image,https://i.pinimg.com/originals/8c/42/39/8c42391d35fcad51a4a79f7cd81bf26d.jpg,1,/data/mens-fashion,mens-fashion
9590,b849e976-52f6-45d7-8b25-51559318f167,Top 20 Things To See And Do In Ireland,Best spots to see for travel through Ireland!,130000.0,Fun Life Crisis,"Vacation Ideas,Vacation Spots,Emerald Isle,London England,Travel Guides,Travel Tips,Travel Hacks,Travel Packing,Places To Travel",image,https://i.pinimg.com/originals/c7/50/d3/c750d36524856873d64406652d69b4fb.png,1,/data/travel,travel
4508,9064f4a2-2753-476c-815e-db360f45a93e,Custom Event and Shop Neon Sign Lights - Event & Shop,"Personalize your event or shop with a customized neon sign. Make a statement with your own custom vibes! This light is 32 -40 inches (80cm-100cm) if you need something bigger, p…",111.0,Life of Neon | Custom Neon Light Signs | Home Decor Wall Art,"Our Wedding,Wedding Venues,Dream Wedding,Wedding Cakes,Church Wedding,Wedding Flowers,Lace Wedding,Wedding Rings,Wedding Dresses",image,https://i.pinimg.com/originals/e9/c0/7c/e9c07cf0cf16cab23764a36718ab76c1.jpg,1,/data/event-planning,event-planning
1704,5fbf9863-fb79-477c-a5b6-540c3020a55f,Christmas Trees From Pallet Wood | Holiday DIY,Christmas Trees From Pallet Wood | Holiday DIY: Deck the yard with some fun outdoor Christmas Trees! We made these merry and bright decorations from two old pallets we had lying…,3000000.0,Instructables,"Pallet Wood Christmas Tree,Wooden Christmas Crafts,Diy Christmas Tree,Christmas Projects,Holiday Crafts,Wooden Xmas Trees,Different Christmas Trees,Pallet Tree,Christmas Kitchen",image,https://i.pinimg.com/originals/64/7b/ca/647bca35169b7c144604116c64bcba8a.png,1,/data/christmas,christmas


Cleaning geo data

In [0]:
# Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo_parsed = df_geo_parsed.withColumn("coordinates", array("latitude", "longitude"))

# Drop the latitude and longitude columns from the DataFrame
# Dropping multiple columns
df_geo_parsed = df_geo_parsed.drop("latitude", "latitude")

# Convert the timestamp column from a string to a timestamp data type
df_geo_parsed = df_geo_parsed.withColumn("timestamp", df_geo_parsed["timestamp"].cast("timestamp"))

# Reorder the DataFrame columns to have the following column order:
df_geo_parsed = df_geo_parsed.select("ind", "country", "coordinates", "timestamp")

In [0]:
display(df_geo_parsed)

ind,country,coordinates,timestamp
8304,French Guiana,"List(-28.8852, -164.87)",2019-09-13T04:50:29.000+0000
1313,Maldives,"List(77.0447, 61.9119)",2018-06-26T02:39:25.000+0000
4315,Cote d'Ivoire,"List(-45.8508, 66.1003)",2019-12-15T03:51:28.000+0000
10794,Cocos (Keeling) Islands,"List(-89.5236, -154.567)",2022-01-01T02:26:50.000+0000
6063,Anguilla,"List(-89.1797, -174.015)",2021-07-20T09:02:47.000+0000
9875,Barbados,"List(-74.3382, -110.484)",2020-03-20T13:03:18.000+0000
8653,Seychelles,"List(48.4569, -139.658)",2022-04-11T18:30:19.000+0000
7922,Antigua and Barbuda,"List(-88.0974, -172.052)",2021-01-27T09:14:19.000+0000
2698,Egypt,"List(-72.7174, 24.169)",2021-11-24T08:33:51.000+0000
808,Albania,"List(-71.6856, -179.126)",2019-01-03T15:43:12.000+0000


Cleaning user data

In [0]:
# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user_parsed = df_user_parsed.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

# Drop the first_name and last_name columns from the DataFrame
df_user_parsed = df_user_parsed.drop("first_name", "last_name")

# Convert the date_joined column from a string to a timestamp data type
df_user_parsed = df_user_parsed.withColumn("date_joined", df_user_parsed["date_joined"].cast("timestamp"))

# Reorder the DataFrame columns to have the following column order:
df_user_parsed = df_user_parsed.select("ind", "user_name", "age", "date_joined")

In [0]:
display(df_user_parsed)

ind,user_name,age,date_joined
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
1313,Brittany Jones,32,2016-04-02T03:51:23.000+0000
3089,Abigail Ali,20,2015-10-24T11:23:51.000+0000
6063,Corey Andrews,23,2015-11-25T13:36:22.000+0000
10625,Christian Lang,32,2017-10-10T20:09:33.000+0000
2418,Amanda Adams,20,2015-10-21T08:27:36.000+0000
9672,Jennifer Hudson,22,2016-02-11T20:46:04.000+0000
8887,Austin Rodriguez,24,2016-03-31T20:56:39.000+0000
4508,Michael Carter,58,2016-06-03T23:35:30.000+0000
5076,Christopher Butler,20,2015-12-01T15:08:31.000+0000


#### Write the streaming data to Delta Tables

In [0]:
# Writing the streams to Databricks Delta tables
df_pin_parsed.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e284c63dbbf_pin_table")

In [0]:
# Writing the streams to Databricks Delta tables
df_geo_parsed.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e284c63dbbf_geo_table")

In [0]:
# Writing the streams to Databricks Delta tables
df_user_parsed.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0e284c63dbbf_user_table")