In [0]:
# pyspark functions
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# URL processing
import urllib
from pyspark.sql.functions import regexp_replace, col
import pyspark.sql.functions as F


In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
#KINESIS PIN CONNECTION / CLEANING

df_pin_data = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f6caff3559-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_pin = df_pin_data.selectExpr("CAST(data as STRING)")

pin_schema = StructType([
  StructField("category", StringType(), True),
  StructField("description", StringType(), True),
  StructField("downloaded", LongType(), True),
  StructField("follower_count", StringType(), True),
  StructField("image_src", StringType(), True),
  StructField("index", LongType(), True),
  StructField("is_image_or_video", StringType(), True),
  StructField("poster_name", StringType(), True),
  StructField("save_location", StringType(), True),
  StructField("tag_list", StringType(), True),
  StructField("title", StringType(), True),
  StructField("unique_id", StringType(), True)
])


df_pin = df_pin.select(from_json("data", pin_schema).alias("pin_data")).select("pin_data.*")

display(df_pin)
def pin_cleaner(df_pin):
    # Replaces empty strings with None
    df_pin = df_pin.replace('Untitled', None)
    df_pin = df_pin.replace('', None)

    # Transform columns to int
    df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
    df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
    df_pin = df_pin.withColumn("follower_count", df_pin["follower_count"].cast("integer"))
    df_pin = df_pin.withColumn("index", df_pin["index"].cast("integer"))

    # Clean save_location column
    df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

    # Rename index column to ind
    df_pin = df_pin.withColumnRenamed("index", "ind")

    # Drop downloaded column
    df_pin = df_pin.drop("downloaded")

    # Reorder
    df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

    # Drop duplicates based on description
    df_pin = df_pin.dropDuplicates(['description'])

    return df_pin

clean_pin_data = pin_cleaner(df_pin)

clean_pin_data.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f6caff3559_pin_table")


category,description,downloaded,follower_count,image_src,index,is_image_or_video,poster_name,save_location,tag_list,title,unique_id
mens-fashion,No description available Story format,0,User Info Error,Image src error.,7528,multi-video(story page format),User Info Error,Local save in /data/mens-fashion,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",No Title Data Available,fbe53c66-3442-4773-b19e-d3ec6f54dddf
diy-and-crafts,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,1,124k,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,2863,image,Of Life & Lisa | Lifestyle Blog,Local save in /data/diy-and-crafts,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",25 Super Fun Summer Crafts for Kids - Of Life and Lisa,9bf39437-42a6-4f02-99a0-9a0383d8cd70
finance,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",1,0,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,5730,image,Consuelo Aguirre,Local save in /data/finance,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",Island Oasis Coupon Organizer,1e1f0c8b-9fcf-460b-9154-c775827206eb
quotes,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,1,51k,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,8304,image,Commitment Connection,Local save in /data/quotes,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",The #1 Reason You’re Not His Priority Anymore - Matthew Coast,5b6d0913-25e4-43ab-839d-85d5516f78a4
tattoos,"Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design.",1,211k,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,8731,image,TheTrendSpotter,Local save in /data/tattoos,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",20 Koi Fish Tattoos For Lucky Men,ea760f71-febf-4023-b592-d17396659039
beauty,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",1,43k,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,1313,video,Thrive Causemetics,Local save in /data/beauty,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",Liquid Lash Extensions Mascara,44662045-e891-4821-8a19-ebe7eedd371a
education,"Podcasts for Teachers or Parents of Teenagers: Teaching teens middle school and high school can feel joyful and rewarding most days, but can also frustrate you with one challeng…",1,25k,https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg,4315,image,Math Giraffe,Local save in /data/education,"Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom",Podcasts for Teachers or Parents of Teenagers,21b59ba9-829d-4c33-8c27-4cd4c56d26b8
vehicles,Nissan GT-R. Sick.,1,437,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,10794,image,Ray Uyemura,Local save in /data/vehicles,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",TireBuyer,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1
finance,"If you love budgeting, make sure to give Dave Ramsey's 7 Baby Steps a try. Follow these steps to begin your debt snowball, build an emergency fund, invest and reach riches. I ca…",1,26k,https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png,5494,image,"Living Low Key | Save Money, Make Money, & Frugal Living",Local save in /data/finance,"Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas",Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You,8fb2af68-543b-4639-8119-de33d28706ed
event-planning,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,1,6M,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,5069,image,Style Me Pretty,Local save in /data/event-planning,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",The Vault: Curated & Refined Wedding Inspiration,b75b6f87-deb3-444f-b29e-ce9161b2df49


In [0]:
#KINESIS GEO DATA CONNECTION / CLEANING

df_geo_data = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f6caff3559-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = df_geo_data.selectExpr("CAST(data as STRING)")

geo_schema = StructType([
  StructField("country", StringType(), True),
  StructField("ind", LongType(), True),
  StructField("latitude", DoubleType(), True),
  StructField("longitude", DoubleType(), True),
  StructField("timestamp", StringType(), True)
])

df_geo = df_geo.select(from_json("data", geo_schema).alias("geo_data")).select("geo_data.*")


def geo_cleaner(df_geo):
    # Create a new column 'coordinates' as an array of 'latitude' and 'longitude'
    df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

    # Drop the 'latitude' and 'longitude' columns
    df_geo = df_geo.drop("latitude", "longitude")

    # Reorder
    df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

    # Convert 'timestamp' column from string to timestamp data type
    df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

    # Drop duplicates based on 'ind' column
    df_geo = df_geo.dropDuplicates(['ind'])

    return df_geo


clean_geo_data = geo_cleaner(df_geo)

clean_geo_data.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f6caff3559_geo_table")




In [0]:
#KINESIS USER DATA CONNECTION / CLEANING

df_user_data = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12f6caff3559-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = df_user_data.selectExpr("CAST(data as STRING)")

user_schema = StructType([
  StructField("age", LongType(), True),
  StructField("date_joined", StringType(), True),
  StructField("first_name", StringType(), True),
  StructField("ind", LongType(), True),
  StructField("last_name", StringType(), True)
])

df_user = df_user.select(from_json("data", user_schema).alias("user_data")).select("user_data.*")

def user_data_cleaner(df_user):
    # Create user name column
    df_user = df_user.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

    # Drop the 'first_name' and 'last_name' columns
    df_user = df_user.drop("first_name", "last_name")

    # Convert 'date_joined' column from string to timestamp data type
    df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))

    # Reorder 
    df_user = df_user.select("ind", "user_name", "age", "date_joined")

    return df_user


clean_user_data = user_data_cleaner(df_user)

clean_user_data.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12f6caff3559_user_table")



In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)