In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

df = spark \
.readStream \
.format('kinesis') \
.option('streamName','Kinesis-Prod-Stream') \
.option('initialPosition','latest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
Pin_Structure = StructType([
    StructField("index", IntegerType(), True), 
    StructField("unique_id", StringType(), True), 
    StructField("title", StringType(), True), 
    StructField("description", StringType(), True), 
    StructField("poster_name", StringType(), True), 
    StructField("follower_count", StringType(), True), 
    StructField("tag_list", StringType(), True), 
    StructField("is_image_or_video", StringType(), True), 
    StructField("image_src", StringType(), True), 
    StructField("downloaded", StringType(), True), 
    StructField("save_location", StringType(), True), 
    StructField("category", StringType(), True)])

geo_Structure = StructType([
    StructField("ind", IntegerType(), True), 
    StructField("timestamp", StringType(), True), 
    StructField("latitude", StringType(), True), 
    StructField("longitude", StringType(), True), 
    StructField("country", StringType(), True)])

user_Structure = StructType([
    StructField("ind", IntegerType(), True), 
    StructField("first_name", StringType(), True), 
    StructField("last_name", StringType(), True), 
    StructField("age", IntegerType(), True), 
    StructField("date_joined", StringType(), True)])


In [0]:
from pyspark.sql.functions import regexp_replace, col


df_pin = df.filter(df.partitionKey == "pin-Data")
df_pin = df_pin.selectExpr("CAST(data as STRING) jsonData")
df_pin = df_pin.select(from_json("jsonData", Pin_Structure).alias("data")).select("data.*")

df_pinclean = df_pin.replace({'': None})
df_pinclean = df_pinclean.replace({'User Info Error': None}, subset=['poster_name','follower_count'])
df_pinclean = df_pinclean.replace({'No description available Story format': None}, subset=['description'])
df_pinclean = df_pinclean.replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset=['tag_list'])

df_pinclean = df_pinclean.withColumn("follower_count", regexp_replace("follower_count", '[k]', "000"))
df_pinclean = df_pinclean.withColumn("follower_count", regexp_replace("follower_count", '[M]', "000000"))

df_pinclean = df_pinclean.withColumn("follower_count", df_pinclean["follower_count"].cast("int"))
df_pinclean = df_pinclean.withColumn("downloaded", df_pinclean["downloaded"].cast("int"))
df_pinclean = df_pinclean.withColumn("index", df_pinclean["index"].cast("int"))

df_pinclean = df_pinclean.filter(col("follower_count").isNotNull())
df_pinclean = df_pinclean.filter(col("downloaded").isNotNull())
df_pinclean = df_pinclean.filter(col("index").isNotNull())

df_pinclean = df_pinclean.withColumn("save_location", regexp_replace("save_location", '^.*?\/', ""))

df_pinclean = df_pinclean.withColumnRenamed("index", "ind")

df_pinclean = df_pinclean.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

display(df_pinclean)

dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("e89446818119_pin_table")

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
778,f82fe1cc-22ce-4c4d-82cc-2e8c9325701d,Your complete guide to watercolors for beginners!,Looking for a complete post on how to watercolor? This guide shows you step-by-step how to learn how to watercolor on your own!,85000,Menucha - Moms and Crafters,"Watercolor Beginner,Watercolor Paintings For Beginners,Step By Step Watercolor,Watercolor Tips,Watercolour Tutorials,Watercolor Techniques,Art Techniques,Simple Watercolor,Tattoo Watercolor",image,https://i.pinimg.com/originals/d9/bf/31/d9bf3191828e8a4d145db739d5fac53e.jpg,data/art,art
480,f5dfee8f-1b62-417d-966c-8cf205418c68,Abstract Portrait,No description available,6000,Ovetta Jackson,"Abstract Portrait,Portrait Art,Arte Black,African Art Paintings,Black Art Painting,Afro Art,Arte Pop,African American Art,Love Art",image,https://i.pinimg.com/originals/05/ed/11/05ed1128fe3a8784ad68900d310654d6.jpg,data/art,art
3011,8d5339e6-ba89-412a-bc61-4d5205fe32f8,Kids paper plate string art for every holiday - My Silly Squirts,Kid's String Art Craft for Every Holiday,63000,JDaniel4's Mom,"Kids Crafts,Preschool Crafts,Easter Crafts,Projects For Kids,Craft Projects,Craft Ideas,Paper Plate Crafts For Kids,Preschool Christmas,Christmas Crafts",image,https://i.pinimg.com/originals/bd/86/74/bd867450a3588b0ce1712f83fb5a58a4.jpg,data/diy-and-crafts,diy-and-crafts
9475,5f389774-c116-4b5d-bd81-763d1e729773,New Zealand North Island 7 Day Itinerary,"One week may feel too short, but with this New Zealand North Island 7 Day Itinerary you can cover a lot of ground! Plus, read about where to stay, self-driving, flights, and more!",1000,Alanna | Periodic Adventures,"Adventure Aesthetic,Travel Aesthetic,Best Places To Vacation,Places To See,North Island New Zealand,New Zealand Adventure,New Zealand Travel Guide,Bali,Beautiful Places To Travel",image,https://i.pinimg.com/originals/2e/0d/fc/2e0dfc4293e6d413d18d601248bbe041.jpg,data/travel,travel
7586,28525ffe-fb38-4e4b-90be-55464575cc3b,64 Bad Bitch Quotes To Awaken Your Inner Savage -Our Mindful Life,A feminist? More like a warrior. These bad bitch quotes might be the perfect slap-in-the-face you'll need to fight self-doubts.,190000,Our Mindful Life,"Bitch Quotes,Sarcastic Quotes,True Quotes,Funny Quotes,Girls Attitude Quotes,Sassy Quotes Bitchy,Bad Words Quotes,Bad Girl Quotes,Girl Qoutes",image,https://i.pinimg.com/originals/43/91/f5/4391f5ce5501ded201706022572514ae.png,data/quotes,quotes
1864,6f1951f0-63be-4c4f-8d21-e4995217f69e,120 Christmas Decorations from the Dollar Store,Love Christmas decorations but hate spending a lot bunch of money? Check out some of these budget DIY decorations you can easily make from the dollar store!,42000,Caroline|CarolineVencil.com | Saving & Making Money | Pro Blogger,"Diy Snowman Decorations,Christmas Candle Decorations,Diy Christmas Ornaments,Christmas Ideas,Christmas Christmas,Snowman Ornaments,Christmas Diy Gifts,Vase Decorations,Diy Christmas Decorations For Home",image,https://i.pinimg.com/originals/30/85/21/3085215db77e55770202724268465490.jpg,data/christmas,christmas
6014,d4c57afb-4775-4482-89c8-71d1bf85b488,Coffee Table Decor Ideas for a Cozy Living Room - Salvaged Living,"Grab these coffee table decor ideas for a cozy living room. This post is awesome, it has a list of must have elements for cozy coffee table styling plus a list of supply ideas f…",40000,Salvaged Living,"Coffee Table Decor Living Room,Coffee Table Vignettes,Coffee Table Centerpieces,Coffee Table Styling,Diy Coffee Table,Decorating Coffee Tables,Cozy Living Rooms,Livingroom Table Decor,Living Room Candles",image,https://i.pinimg.com/originals/77/b2/bb/77b2bb477d1164908048dabcd78cabd5.jpg,data/home-decor,home-decor
1967,0b9d5b95-51a6-465e-ae4a-2cb68ceada29,15 Fun & Festive Christmas Porch Ideas,15 unique Christmas porch ideas that will leave you feeling inspired and help you tackle decorating your own entryway for the holidays! It’s almost time to start decorating for…,19000,Ashley - Modern Glam,"Exterior Christmas Lights,Front Door Christmas Decorations,Christmas Lights Outside,Christmas House Lights,Decorating With Christmas Lights,Porch Decorating,Christmas Porch Decorations,Front Porch Ideas For Christmas,Christmas Lights Outdoor Trees",image,https://i.pinimg.com/originals/ff/f8/3b/fff83b02aeb29e2e9341a56fc5e63345.png,data/christmas,christmas
1699,e930ea57-d34a-499f-9811-126d39ed1fee,Easy to Make Mason Jar Christmas Scenes,ow to make easy and inexpensive Christmas decor with these cute mason jar Christmas scenes. Who doesn't love mason jar crafts for Christmas?,142000,Twelve On Main,"Christmas Decorations Diy Crafts,Christmas Crafts For Gifts,Diy Decoration,Diy Ornaments,Decor Ideas,Gift Ideas,Decorating Ideas,Diy Christmas Room Decor,Diy Christmas Projects",image,https://i.pinimg.com/originals/0c/31/a1/0c31a189ab7e503c035c8af991d5bd29.jpg,data/christmas,christmas
4996,cf6c021f-1f41-47da-9492-cdec9d32fca8,The Secret To Authentic Networking and Building a Professional Network | Career Contessa,Tips on how to build an authentic network and find genuine connections in your professional network.,265,BEIMER,"Event Planning Template,Event Planning Quotes,Event Planning Checklist,Event Planning Business,Event Planning Design,Business Events,Event Decor,Pastel,Career",image,https://i.pinimg.com/originals/ae/25/a7/ae25a72d5584a4c8f75e972fb70d48e2.jpg,data/event-planning,event-planning


In [0]:
from pyspark.sql.functions import array
from pyspark.sql.functions import to_timestamp

df_geo = df.filter(df.partitionKey == "geo-Data")
df_geo = df_geo.selectExpr("CAST(data as STRING) jsonData")
df_geo = df_geo.select(from_json("jsonData", geo_Structure).alias("data")).select("data.*")

df_geoclean = df_geo.withColumn("coordinates", array("latitude", "longitude"))
df_geoclean = df_geoclean.drop("latitude", "longitude")
df_geoclean = df_geoclean.withColumn("Timestamp", to_timestamp("Timestamp"))
df_geoclean = df_geoclean.select("ind", "country", "coordinates", "timestamp")

display(df_geoclean)

dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("e89446818119_geo_table")

ind,country,coordinates,timestamp
480,Mozambique,"List(-30.7158, -167.461)",2019-02-08T03:34:58Z
3011,Kuwait,"List(-76.4675, 28.8054)",2019-12-17T17:46:18Z
9475,Panama,"List(-84.1953, 164.362)",2019-03-19T08:26:35Z
7586,Andorra,"List(-84.7363, -179.087)",2021-02-07T22:17:31Z
1864,Algeria,"List(-72.3958, -164.414)",2020-04-24T17:08:13Z
6014,French Southern Territories,"List(-26.6026, 155.206)",2019-04-30T12:33:13Z
1967,Australia,"List(32.74, -179.581)",2021-02-05T10:37:28Z
1699,Canada,"List(-54.3706, -127.783)",2022-07-03T13:25:48Z
4996,Poland,"List(39.9625, -118.569)",2019-04-16T23:49:42Z
10168,Norway,"List(-14.7211, 137.812)",2019-10-14T00:19:33Z


In [0]:
from pyspark.sql.functions import array
from pyspark.sql.functions import to_timestamp

df_user = df.filter(df.partitionKey == "user-Data")
df_user = df_user.selectExpr("CAST(data as STRING) jsonData")
df_user = df_user.select(from_json("jsonData", user_Structure).alias("data")).select("data.*")

df_userclean = df_user.withColumn("user_name", array("first_name", "last_name"))

df_userclean = df_userclean.drop("first_name", "last_name")

df_userclean = df_userclean.withColumn("date_joined", to_timestamp("date_joined"))

df_userclean = df_userclean.select("ind", "user_name", "age", "date_joined")

display(df_userclean)

dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("e89446818119_user_table")

ind,user_name,age,date_joined
1967,"List(Jay, Flynn)",24,2016-05-12T16:28:27Z
1699,"List(Brandon, Cummings)",22,2016-05-11T01:36:00Z
4996,"List(John, Anderson)",27,2016-03-03T07:28:39Z
10168,"List(Nicole, Gray)",53,2016-02-20T13:03:00Z
5333,"List(Aaron, Clark)",35,2016-11-19T22:36:49Z
2565,"List(Anthony, Adkins)",21,2015-10-28T03:59:42Z
9778,"List(Holly, Campbell)",30,2016-07-08T19:10:37Z
3145,"List(Crystal, Schmidt)",22,2017-04-14T13:57:52Z
1341,"List(Abigail, Ali)",20,2015-10-24T11:23:51Z
5162,"List(James, Jacobson)",28,2017-04-30T16:41:26Z
