In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib
import json

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# Import streaming data from kinesis as a dataframe
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f6b2c1ae4f-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f6b2c1ae4f-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f6b2c1ae4f-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# Deserialize the 'data' column of each dataframe to extract/visualise the json data contained in each of the streams:
df_pin = df_pin.selectExpr("CAST(data as STRING)")
df_geo = df_geo.selectExpr("CAST(data as STRING)")
df_user = df_user.selectExpr("CAST(data as STRING)")

# Defining the streaming schema for the pin table/dataframe
pin_schema = StructType([\
    StructField("index", IntegerType(),True),\
    StructField("unique_id", StringType(),True),\
    StructField("title", StringType(),True),\
    StructField("follower_count", StringType(),True),\
    StructField("poster_name", StringType(),True),\
    StructField("tag_list", StringType(),True),\
    StructField("is_image_or_video", StringType(),True),\
    StructField("image_src", StringType(),True),\
    StructField("save_location", StringType(),True),\
    StructField("category", StringType(),True),\
    StructField("downloaded", IntegerType(),True),\
    StructField("description", StringType(),True)\
])

df_pin = df_pin.withColumn("data", from_json(df_pin["data"], pin_schema))
df_pin = df_pin.selectExpr("data.*")


# Defining the streaming schema for the geo table/dataframe
geo_schema = StructType([\
    StructField("index", IntegerType(),True),\
    StructField("country", StringType(),True),\
    StructField("latitude", StringType(),True),\
    StructField("longitude", StringType(),True),\
    StructField("timestamp", StringType(),True),\
])

df_geo = df_geo.withColumn("data", from_json(df_geo["data"], geo_schema))
df_geo = df_geo.selectExpr("data.*")


# Defining the streaming schema for the user table/dataframe
user_schema = StructType([\
    StructField("index", IntegerType(),True),\
    StructField("first_name", StringType(),True),\
    StructField("last_name", StringType(),True),\
    StructField("age", StringType(),True),\
    StructField("date_joined", StringType(),True),\
])

df_user = df_user.withColumn("data", from_json(df_user["data"], user_schema))
df_user = df_user.selectExpr("data.*")


In [0]:
# df_pin.display()
# df_geo.display()
# df_user.display()

## Cleaning the streaming dataframe containing pinterest data

In [0]:
from pyspark.sql.functions import col,when
from pyspark.sql.functions import regexp_extract

df_pin = df_pin.withColumn("description", when(col("description").contains("No description available"), "None").otherwise(col("description")))
df_pin = df_pin.withColumn("image_src", when(col("image_src").contains("Image src error"), "None").otherwise(col("image_src")))
df_pin = df_pin.withColumn("follower_count", when(col("follower_count").contains("User Info Error"), "None").otherwise(col("follower_count")))
df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "M", "000000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "k", "000"))
df_pin = df_pin.withColumn("downloaded",col("downloaded").cast("int"))
df_pin = df_pin.withColumn("follower_count",col("follower_count").cast("int"))
df_pin = df_pin.withColumn("index",col("index").cast("int"))
df_pin = df_pin.withColumnRenamed("index","ind")
df_pin = df_pin.withColumn('save_location', regexp_replace(col("save_location"), "Local save in ", ""))
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

# display(df_pin)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,,,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),,/data/mens-fashion,mens-fashion
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",43000.0,Thrive Causemetics,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,/data/beauty,beauty
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,Nissan GT-R. Sick.,437.0,Ray Uyemura,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,/data/vehicles,vehicles
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,6000000.0,Style Me Pretty,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,/data/event-planning,event-planning
2923,52fa3af5-24a4-4ccb-8f17-9c3eb12327ee,UFO Paper Plate Craft,A fun space activity for kids. Preshoolers and kindergartners will love making their own alien spacecraft!,192000.0,The Crafting Chicks,"Paper Plate Crafts For Kids,Fun Crafts For Kids,Summer Crafts,Toddler Crafts,Art For Kids,Outer Space Crafts For Kids,Kid Crafts,Space Kids,Back To School Crafts For Kids",image,https://i.pinimg.com/originals/6f/e8/aa/6fe8aa405513c6d2f77b5f47d17cdce8.jpg,/data/diy-and-crafts,diy-and-crafts
3089,88f9227e-88d0-4b1c-b0be-bcfc3028b8e2,No Title Data Available,,,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),,/data/diy-and-crafts,diy-and-crafts
6063,60693727-4927-4bd6-a8c5-096a392d63e6,41 Gorgeous Fall Decor Ideas For Your Home - Chaylor & Mads,"Beautiful and easy ways to update every room in your home with fall decor. Plus, my favorite finds in fall decor for 2020!",92000.0,"Kristen | Lifestyle, Mom Tips & Teacher Stuff Blog","Fall Home Decor,Autumn Home,Fall Decor Outdoor,Front Porch Fall Decor,Home Decor Ideas,Porch Ideas For Fall,Fall Outdoor Decorating,Decorating Ideas For Fall,Fall Front Doors",image,https://i.pinimg.com/originals/e5/ae/dc/e5aedc14ce557e3a69f672e0f8c88f6e.png,/data/home-decor,home-decor
10625,d31885b7-742a-4e2a-bbb7-ac5f9d334340,Jaguar Type E,"1937 Jaguar SS100 - 2 1/2 Litre Roadster, one of the most sought after pre-war sports cars",8000.0,hobbyDB,"Jaguar Type E,Jaguar Xk,Jaguar Cars,Jaguar Roadster,Jaguar Sport,Retro Cars,Vintage Cars,Antique Cars,British Sports Cars",image,https://i.pinimg.com/originals/26/81/a7/2681a71bd0c8f7fd0ab79c455338a49a.jpg,/data/vehicles,vehicles
9875,782dcbad-ff91-40a6-ba60-216efe29adb7,European Bucket List: 35 Things NOT To Miss When Traveling Europe,"35 European bucket list destinations for any traveler heading to Europe. From Cinque Terre, Italy to Iceland - there are so many amazing sites to see in Europe.",28000.0,Nicki,"Backpacking Europe,Europe Travel Guide,Travel Guides,Travel Packing,Traveling Europe,Travelling,Travel Backpack,Budget Travel,2 Week Europe Itinerary",image,https://i.pinimg.com/originals/71/04/1a/71041ad83ede43d9665741e719c58a86.jpg,/data/travel,travel
2418,da8745a6-5160-46c4-877d-181d50a729fd,100 DIY Christmas Centerpieces You'll Love To Decorate Your Home With For The Christmas Season - Hike n Dip,Here are the best DIY Christmas Centerpieces ideas perfect for your Christmas & holiday season home decor. From Christmas Vignettes to Table Centerpieces.,500000.0,HikenDip,"Farmhouse Christmas Decor,Rustic Christmas,Christmas Time,Vintage Christmas,Xmas,Primitive Christmas Crafts,Christmas Vignette,Indoor Christmas Decorations,Diy Christmas Ornaments",image,https://i.pinimg.com/originals/aa/6d/0f/aa6d0f44d7c1c96b998cb9aa6c4446b8.png,/data/christmas,christmas


In [0]:
df_geo = df_geo.withColumn("timestamp", col("timestamp").cast("timestamp"))
df_geo = df_geo.withColumn("index", col("index").cast("int"))
df_geo = df_geo.withColumnRenamed("index","ind")
df_geo = df_geo.withColumn('coordinates', array('latitude', 'longitude'))
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

# display(df_geo)

ind,country,coordinates,timestamp
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47.000+0000
2863,Armenia,"List(-5.34445, -177.924)",2020-04-27T13:34:16.000+0000
5730,Colombia,"List(-77.015, -101.437)",2021-04-19T17:37:03.000+0000
8304,French Guiana,"List(-28.8852, -164.87)",2019-09-13T04:50:29.000+0000
8731,Aruba,"List(-83.104, -171.302)",2020-07-17T04:39:09.000+0000
1313,Maldives,"List(77.0447, 61.9119)",2018-06-26T02:39:25.000+0000
4315,Cote d'Ivoire,"List(-45.8508, 66.1003)",2019-12-15T03:51:28.000+0000
10794,Cocos (Keeling) Islands,"List(-89.5236, -154.567)",2022-01-01T02:26:50.000+0000
5494,Bulgaria,"List(-82.6768, -129.202)",2021-07-21T02:02:35.000+0000
5069,Azerbaijan,"List(-63.0063, -157.474)",2021-03-20T09:32:44.000+0000


## Cleaning the streaming dataframe containing geolocation data

## Cleaning the streaming dataframe containing user data

In [0]:
df_user = df_user.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))
df_user = df_user.withColumn("date_joined", col("date_joined").cast("timestamp"))
df_user = df_user.withColumnRenamed("index","ind")
df_user = df_user.select("ind", "user_name", "age", "date_joined")

# display(df_user)

ind,user_name,age,date_joined
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
2863,Dylan Holmes,32,2016-10-23T14:06:51.000+0000
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
8731,Andrea Alexander,21,2015-11-10T09:27:42.000+0000
1313,Brittany Jones,32,2016-04-02T03:51:23.000+0000
4315,Michelle Prince,36,2015-12-20T16:38:13.000+0000
10794,Thomas Turner,34,2016-12-22T00:02:02.000+0000
5494,Anne Allen,27,2015-12-16T15:20:05.000+0000
5069,Amanda Ball,25,2016-01-13T17:36:30.000+0000


## Writing streaming data to databricks delta tables

In [0]:
# Before running the 'writeStream' function, delete the checkpoint folder 
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f6b2c1ae4f_pin_table")


df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f6b2c1ae4f_geo_table")


df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f6b2c1ae4f_user_table")

