In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib



In [None]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")


In [None]:
pin_df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-126dc60b95b3-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
#Initialises the spark dataframe to read data from a kinesis stream. 

#Defines the schema for the pin dataframe and corresponds to the structure of the data that is expected from the stream.
pin_schema = StructType([
    StructField("category", StringType(), True),
    StructField("description", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("follower_count", IntegerType(), True), 
    StructField("image_src", StringType(), True),
    StructField("index", IntegerType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("title", StringType(), True),
    StructField("unique_id", StringType(), True)
])
# Cast the data in the stream to a string then to JSON format before applying the defined schema. This allows for querying of the dataset.
pin_df = pin_df \
    .selectExpr("cast(data as STRING) as json_data") \
    .select(from_json("json_data", pin_schema).alias("pin")) \
    .select("pin.*")

display(pin_df)

In [None]:
#Initialises the spark dataframe to read data from a kinesis stream. 
geo_df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-126dc60b95b3-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()


#Defines the schema for the pin dataframe and corresponds to the structure of the data that is expected from the stream.
geo_schema = StructType([
    StructField("country", StringType(), True),
    StructField("ind", StringType(), True),  # Assuming 'ind' is a string-type identifier
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Cast the data in the stream to a string then to JSON format before applying the defined schema. This allows for querying of the dataset.
geo_df = geo_df \
    .selectExpr("cast(data as STRING) as json_data") \
    .select(from_json("json_data", geo_schema).alias("geo")) \
    .select("geo.*")

display(geo_df)



In [None]:
#Initialises the spark dataframe to read data from a kinesis stream. 
user_df = spark \
.readStream \
.format('kinesis')\
.option('streamName','streaming-126dc60b95b3-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

#Defines the schema for the pin dataframe and corresponds to the structure of the data that is expected from the stream.
user_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", TimestampType(), True)
])

# Cast the data in the stream to a string then to JSON format before applying the defined schema. This allows for querying of the dataset.
user_df = user_df \
    .selectExpr("cast(data as STRING) as json_data") \
    .select(from_json("json_data", user_schema).alias("user")) \
    .select("user.*")

display(user_df)

In [None]:
# Pinterest Posts DF cleaning

# Define the dictionary of replacements where empty strings, 'N/A', and hyphens are replaced with None (null values).
replacements = {'': None, 'N/A': None, "-": None}

# Apply the replacement dictionary across all columns of pin_df to clean the data.
pin_df_cleaned = pin_df.na.replace(replacements, subset =[c for c in pin_df.columns])

# Replace instances of 'k' in the 'follower_Count' column with '000' to convert the count from thousands to a full number.
pin_df_cleaned = pin_df_cleaned.withColumn('follower_Count', regexp_replace('follower_Count', 'k', '000'))


# Replace instances of 'M' in the 'follower_Count' column with '000000' for counts in millions to be represented as a full number.
pin_df_cleaned = pin_df_cleaned.withColumn('follower_Count', regexp_replace('follower_Count', 'M', '000000'))

# Remove the text "Local save in " from the 'save_location' column so the column is uniform.
pin_df_cleaned = pin_df_cleaned.withColumn('save_location', regexp_replace('save_location', "Local save in ", ""))

# Rename the 'index' column to 'ind'.
pin_df_cleaned = pin_df_cleaned.withColumnRenamed('index', "ind")

# Reorder the columns for the cleaned DataFrame. 
pin_df_cleaned_ordered = pin_df_cleaned.select(
    "ind", "unique_id", "title", "description", "follower_count", 
    "poster_name", "tag_list", "is_image_or_video", "image_src", 
    "save_location", "category", "downloaded"
)



In [None]:
display(pin_df_cleaned_ordered)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category,downloaded
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,/data/finance,finance,1
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,/data/quotes,quotes,1
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,Nissan GT-R. Sick.,,Ray Uyemura,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,/data/vehicles,vehicles,1
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,,Style Me Pretty,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,/data/event-planning,event-planning,1
3454,46bd3f86-b09d-4e29-9033-7ff2df595e51,What can you use to color resin?,HELPFUL RESOURCES – Check out my resin colorants resources page here with links to all the products mentioned in this article (and more). Let me know if you have any that you lo…,,Mixed Media Crafts,"Epoxy Resin Art,Diy Resin Art,Diy Resin Crafts,Resin Molds,Ice Resin,Resin Pour,Diy Epoxy,Diy Resin Painting,Diy Resin Dice",image,https://i.pinimg.com/originals/d4/12/78/d4127833023ca32600571ddca16f1556.jpg,/data/diy-and-crafts,diy-and-crafts,1
8731,ea760f71-febf-4023-b592-d17396659039,20 Koi Fish Tattoos For Lucky Men,"Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design.",,TheTrendSpotter,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",image,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,/data/tattoos,tattoos,1
4315,21b59ba9-829d-4c33-8c27-4cd4c56d26b8,Podcasts for Teachers or Parents of Teenagers,"Podcasts for Teachers or Parents of Teenagers: Teaching teens middle school and high school can feel joyful and rewarding most days, but can also frustrate you with one challeng…",,Math Giraffe,"Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom",image,https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg,/data/education,education,1
5494,8fb2af68-543b-4639-8119-de33d28706ed,Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You,"If you love budgeting, make sure to give Dave Ramsey's 7 Baby Steps a try. Follow these steps to begin your debt snowball, build an emergency fund, invest and reach riches. I ca…",,"Living Low Key | Save Money, Make Money, & Frugal Living","Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas",image,https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png,/data/finance,finance,1
2923,52fa3af5-24a4-4ccb-8f17-9c3eb12327ee,UFO Paper Plate Craft,A fun space activity for kids. Preshoolers and kindergartners will love making their own alien spacecraft!,,The Crafting Chicks,"Paper Plate Crafts For Kids,Fun Crafts For Kids,Summer Crafts,Toddler Crafts,Art For Kids,Outer Space Crafts For Kids,Kid Crafts,Space Kids,Back To School Crafts For Kids",image,https://i.pinimg.com/originals/6f/e8/aa/6fe8aa405513c6d2f77b5f47d17cdce8.jpg,/data/diy-and-crafts,diy-and-crafts,1
6063,60693727-4927-4bd6-a8c5-096a392d63e6,41 Gorgeous Fall Decor Ideas For Your Home - Chaylor & Mads,"Beautiful and easy ways to update every room in your home with fall decor. Plus, my favorite finds in fall decor for 2020!",,"Kristen | Lifestyle, Mom Tips & Teacher Stuff Blog","Fall Home Decor,Autumn Home,Fall Decor Outdoor,Front Porch Fall Decor,Home Decor Ideas,Porch Ideas For Fall,Fall Outdoor Decorating,Decorating Ideas For Fall,Fall Front Doors",image,https://i.pinimg.com/originals/e5/ae/dc/e5aedc14ce557e3a69f672e0f8c88f6e.png,/data/home-decor,home-decor,1


In [None]:
# Pinterest Geo DF cleaning
# Cleaning the Geo dataframe by creating the 'coordinates' columns using an array of the 'longitude' and 'latitude' columns.
cleaned_geo_df = geo_df.withColumn("coordinates", array(geo_df.longitude, geo_df.latitude))

#Dropping the original 'latitude' and 'longitude' columns as they are no longer necessary.
cleaned_geo_df = cleaned_geo_df.drop("latitude", "longitude")

# Casting the 'timestamp' column to a timestamp type.
cleaned_geo_df = cleaned_geo_df.withColumn('timestamp', col('timestamp').cast('timestamp'))

# Re-ordering the columns.
cleaned_geo_df = cleaned_geo_df.select("ind", "country", "coordinates", "timestamp")


In [None]:
# Pinterest User DF Cleaning
# Cleaning the User Dataframe by concatenating 'first_name' and 'last_name' into a single 'user_name' column.
cleaned_user_df = user_df.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))

# Dropping the original 'first_name' and 'last_name' column as they are no longer necessary.
cleaned_user_df = cleaned_user_df.drop("first_name", "last_name")

# Reordering the columns.
cleaned_user_df = cleaned_user_df.select("ind", "user_name", "age", "date_joined")

# Casting the 'date_joined' column to a timestamp type.
cleaned_user_df = cleaned_user_df.withColumn("date_joined", col("date_joined").cast('timestamp'))


In [None]:
# Writing the cleaned Pinterest info dataframe to a Delta table.

pin_df_cleaned_ordered.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("126dc60b95b3_pin_table")

In [None]:
# Writing the cleaned User dataframe to a Delta table.
cleaned_user_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("126dc60b95b3_user_table")

In [None]:
display(cleaned_geo_df)

ind,country,coordinates,timestamp
5730,Colombia,"List(-101.437, -77.015)",2021-04-19T17:37:03.000+0000
8304,French Guiana,"List(-164.87, -28.8852)",2019-09-13T04:50:29.000+0000
10794,Cocos (Keeling) Islands,"List(-154.567, -89.5236)",2022-01-01T02:26:50.000+0000
5069,Azerbaijan,"List(-157.474, -63.0063)",2021-03-20T09:32:44.000+0000
3454,Cambodia,"List(49.8106, -0.375174)",2021-07-25T02:20:29.000+0000
8731,Aruba,"List(-171.302, -83.104)",2020-07-17T04:39:09.000+0000
4315,Cote d'Ivoire,"List(66.1003, -45.8508)",2019-12-15T03:51:28.000+0000
5494,Bulgaria,"List(-129.202, -82.6768)",2021-07-21T02:02:35.000+0000
2923,Cote d'Ivoire,"List(-164.507, -84.6302)",2019-09-08T22:53:09.000+0000
6063,Anguilla,"List(-174.015, -89.1797)",2021-07-20T09:02:47.000+0000


In [None]:
# Writing the cleaned Geo table to a Delta table.
cleaned_geo_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("126dc60b95b3_geo_table")