In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table containing AWS keys
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table into a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the Spark DataFrame
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0][0]
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0][0]

# Encode the secret key for security purposes
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")



In [0]:
# Define the schema for Pinterest data
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),  # Stored as string to process "k" and "M"
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# Define the schema for Geolocation data
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("country", StringType())
])

# Define the schema for User data
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),  # Assuming age is integer
    StructField("date_joined", TimestampType())
])

In [0]:
# Reading streaming data from Kinesis - Pinterest Data
pin_df = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", "streaming-12b83b649269-pin") \
  .option("initialPosition", "earliest") \
  .option("region", "us-east-1") \
  .option("awsAccessKey", ACCESS_KEY) \
  .option("awsSecretKey", SECRET_KEY) \
  .load() \
  .selectExpr("CAST(data AS STRING)") \
  .select(from_json(col("data"), pin_schema).alias("pin_data")) \
  .select("pin_data.*")

# Cleaning Logic for Pinterest Data (Version 1)
df_pin_clean = pin_df.withColumn("follower_count",
                                 regexp_replace(col("follower_count"), "M", "000000")) \
                     .withColumn("follower_count",
                                 regexp_replace(col("follower_count"), "K", "000")) \
                     .withColumn("follower_count",
                                 regexp_replace(col("follower_count"), "[^0-9]", "")) \
                     .withColumn("follower_count", col("follower_count").cast("int"))

df_pin_clean = df_pin_clean.select(
    col("index").alias("ind"),
    col("unique_id"),
    when(col("title").contains("No Title Data Available"), None).otherwise(col("title")).alias("title"),
    when(col("description").contains("No description available Story format"), None).otherwise(col("description")).alias("description"),
    col("follower_count"),
    when(col("poster_name").contains("User Info Error"), None).otherwise(col("poster_name")).alias("poster_name"),
    when(col("tag_list").contains("N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"), None).otherwise(col("tag_list")).alias("tag_list"),
    col("is_image_or_video"),
    when(col("image_src").contains("Image src error."), None).otherwise(col("image_src")).alias("image_src"),
    regexp_replace(col("save_location"), "[^A-Za-z0-9_/\.]", "").alias("save_location"),
    col("category")
)

# Handle empty strings
for column in df_pin_clean.columns:
    df_pin_clean = df_pin_clean.withColumn(column, when(col(column) == "", None).otherwise(col(column)))

# Display cleaned Pinterest data
display(df_pin_clean)

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
8024,bf5f57dc-0270-460a-ac7c-6632ac28c5bb,Relation à distance : Les 3 clés pour la réussir à la perfection,#citation #citationdujour #proverbe #quote #frenchquote #pensées #phrases #french #français #amour,17.0,Alexandre Cormont,"Love Quotes For Him Boyfriend,Future Husband Quotes,Love Quotes For Him Funny,Love Quotes For Crush,Love Quotes For Him Romantic,Love Yourself Quotes,Funny Quotes,Hard Quotes,Romantic Pictures",image,https://i.pinimg.com/originals/f9/b8/dd/f9b8dddc1559ed684a2945ae105993e8.jpg,Localsavein/data/quotes,quotes
8210,070ef1bc-ab96-4e1d-9bed-30cae672a14a,31 Quotes That'll Get You Over Any Break Up,Dealing with a break up or getting the strength to leave a bad relationship can be one of the most emotionally painful experiences in life but it doesn't have to be. Whether you…,67.0,Twins Dish,"Quotes Thoughts,True Quotes,Words Quotes,Motivational Quotes,Quotes Inspirational,Budist Quotes,Dating Quotes,Deep Quotes,Wisdom Thoughts",image,https://i.pinimg.com/originals/75/75/f9/7575f984db8fb70762ed5ec24f6e4915.jpg,Localsavein/data/quotes,quotes
4079,980fae1e-76c4-407a-8451-a9df487425c2,"Communications in Computer and Information Science: Research and Education in Robotics - Eurobot 2011 : International Conference, Prague, Czech Republic, June 15-17, 2011 Proceedings (Series #161)…","This book constitutes the proceedings of the International Conference on Research and Education in Robotics, EUROBOT 2011, held in Prague, Czech Republic, in June 2011. The 28 r…",2000000.0,Walmart,"Educational Robots,Intelligent Robot,Prague Czech Republic,Robot Design,Research,Conference,Behavior,Hold On,University",image,https://i.pinimg.com/originals/34/b5/dc/34b5dc646c71566af38c6d54ba68a7c7.jpg,Localsavein/data/education,education
5215,84d90bd4-c2fb-4d8a-a903-97fcfb73545b,Power of Compounding Interest,,2.0,JG Finance,"Goal Digger,Money Quotes,Budgeting Finances,Good Habits,Self Care Routine,Finance Tips,Money Management,Money Tips,Personal Finance",multi-video(story page format),https://i.pinimg.com/videos/thumbnails/originals/b5/f1/a1/b5f1a1c20c734b591113b43876bf2a23.0000001.jpg,Localsavein/data/finance,finance
4357,ccf116e9-9096-4943-a344-1960ce216445,First Steps in Launching Your Own Event Business - Learn About Event Planning,"Updated: January 25, 2017 You’ve organized some events for your family, friends or community and you have gained a budding reputation for knowing how put events together. You’ve…",4.0,EventPlanning.com | Learn How To Become An Event Planner,"Event Planning Quotes,Event Planning Checklist,Event Planning Business,Business Events,Business Ideas,Business Names,Business Opportunities,Corporate Events,Wedding Event Planner",image,https://i.pinimg.com/originals/c3/2b/c6/c32bc6ad263857cb0eea19f9cd12beb9.jpg,Localsavein/data/eventplanning,event-planning
830,1cf9adb2-cd65-4991-9d50-976381036257,51 Enigmatic Forest Concept Art That Will Amaze You,51 Enigmatic Forest Concept Art That Will Amaze You | Homesthetics - Inspiring ideas for your home.,556.0,Homesthetics.net,"Art And Illustration,Art Illustrations,Concept Art Landscape,Fantasy Landscape,Landscape Art,Art Environnemental,Fantasy Kunst,Posca Art,Art Disney",image,https://i.pinimg.com/originals/52/0a/e8/520ae83082ec78dca19efb10e2ff7bd8.jpg,Localsavein/data/art,art
10909,7528924a-a9e2-4dc2-85ac-06e08fd0f25b,RBP 94R-1890-73-12C - RBP 94R Chrome & Black Wheels,"Add the perfect blend of flash and class to your ride with RBP 94R Chrome and Black Wheels. Boasting a chrome finish with glossy black inserts, these all-aluminum dubs turn head…",6.0,AutoAnything,"Auto Jeep,Jeep Jk,Jeep Truck,Chevy Trucks,Dually Trucks,Pickup Trucks,Maserati Merak,Maserati Granturismo,Wrangler Jeep",image,https://i.pinimg.com/originals/8d/51/b0/8d51b0b325982b61ebe71d4146ea5a23.jpg,Localsavein/data/vehicles,vehicles
3525,d16327f7-cc51-4ae1-bf6f-dee2a880a9d7,Feed the Dog Sight Word Activity,"FREE sight word recognition activity for kids to read sight words while feeding bones to the dog. Fun and motivational literacy game for pre-k, kindergarten and first grade kids.",161.0,Totschooling | Toddler & Preschool Printable Activities,"Literacy Games,Phonics Activities,Kindergarten Activities,Literacy Centers,First Grade Activities,Math Games,First Grade Crafts,Student Games,Spelling Games",image,https://i.pinimg.com/originals/63/f9/c4/63f9c40584ba824df8bfe3589a0ecf57.jpg,Localsavein/data/education,education
10639,5435fc5c-ac4c-424c-9c39-5e2cfac77e55,Adam Phillips: Former Lego Designer Wins Pilkington Vehicle Design Awards,"A former toy designer has scooped top honours at this year's Pilkington Vehicle Design Awards at the Royal College of Art, for his new concept on the modern family car that uses…",35.0,Dexigner,"Car Interior Sketch,Bus Interior,Car Interior Design,Car Design Sketch,Interior Concept,Automotive Design,Airplane Interior,Lego,Adam Phillips",image,https://i.pinimg.com/originals/a2/d6/1d/a2d61d30ae1c74c32af370fac5e25040.jpg,Localsavein/data/vehicles,vehicles
2663,8aa16e94-eec5-4fa2-8011-ea36ad20f476,cereal box monster jaws | fun & easy big kid craft,"This is a perfect craft for big kids or older children - cardboard monster jaws. Made from cereal boxes, this is a fun, easy craft activity for summer.",498.0,It's Always Autumn,"Craft Projects For Kids,Fun Crafts For Kids,Easy Crafts For Kids,Craft Activities For Kids,Arts And Crafts,Children Crafts,Creative Crafts,Quick Crafts,Cereal Box Craft For Kids",image,https://i.pinimg.com/originals/99/92/3b/99923b1997c81f5eabb636e7ae3733da.jpg,Localsavein/data/diyandcrafts,diy-and-crafts


In [0]:
# Reading streaming data from Kinesis - Geolocation Data
geo_df = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", "streaming-12b83b649269-geo") \
  .option("initialPosition", "earliest") \
  .option("region", "us-east-1") \
  .option("awsAccessKey", ACCESS_KEY) \
  .option("awsSecretKey", SECRET_KEY) \
  .load() \
  .selectExpr("CAST(data AS STRING)") \
  .select(from_json(col("data"), geo_schema).alias("geo_data")) \
  .select("geo_data.*")

# Step 1: Create `coordinates` array from `latitude` and `longitude`
df_geo_clean = geo_df.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Step 2: Drop `latitude` and `longitude` columns
df_geo_clean = df_geo_clean.drop("latitude", "longitude")

# Step 3: Ensure `timestamp` is properly formatted
df_geo_clean = df_geo_clean.withColumn("timestamp", to_timestamp(col("timestamp")))

# Step 4: Select the relevant columns for the final cleaned DataFrame
df_geo_clean = df_geo_clean.select(
    col("ind"),
    col("country"),
    col("coordinates"),
    col("timestamp")
)

# Display the schema and some sample data for verification
df_geo_clean.printSchema()
display(df_geo_clean)



ind,country,coordinates,timestamp
8574,Slovenia,"List(-62.9968, -78.0337)",2019-03-05T06:31:38.000+0000
8437,Burundi,"List(-83.2308, -116.587)",2019-08-15T23:57:05.000+0000
2587,Antarctica (the territory South of 60 deg S),"List(-39.3929, -177.136)",2019-07-09T21:57:35.000+0000
8671,Oman,"List(-81.4468, 39.6109)",2022-10-03T19:04:53.000+0000
5929,Germany,"List(-16.4612, 123.389)",2021-06-18T17:51:11.000+0000
1600,Georgia,"List(66.903, 39.6908)",2020-03-26T23:51:33.000+0000
1289,Sierra Leone,"List(-38.9289, -12.2407)",2021-03-24T21:44:14.000+0000
9040,Aruba,"List(61.0557, 74.8338)",2020-12-04T05:27:06.000+0000
1038,Monaco,"List(82.5827, 66.7698)",2018-07-29T22:17:45.000+0000
10201,Kuwait,"List(-42.3822, -118.628)",2021-04-08T01:45:13.000+0000


In [0]:
# Reading streaming data from Kinesis - User Data
user_df = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", "streaming-12b83b649269-user") \
  .option("initialPosition", "earliest") \
  .option("region", "us-east-1") \
  .option("awsAccessKey", ACCESS_KEY) \
  .option("awsSecretKey", SECRET_KEY) \
  .load() \
  .selectExpr("CAST(data AS STRING)") \
  .select(from_json(col("data"), user_schema).alias("user_data")) \
  .select("user_data.*")

# Step 1: Concatenate first and last names into user_name
df_user_clean = user_df.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Step 2: Drop first_name and last_name columns
df_user_clean = df_user_clean.drop("first_name", "last_name")

# Step 3: Ensure date_joined is properly formatted as a timestamp
df_user_clean = df_user_clean.withColumn("date_joined", to_timestamp(col("date_joined")))

# Step 4: Select the relevant columns for the final cleaned DataFrame
df_user_clean = df_user_clean.select(
  col("ind"),
  col("user_name"),
  col("age"),
  col("date_joined")
)

# Display the schema and some sample data for verification
df_user_clean.printSchema()
display(df_user_clean)


ind,user_name,age,date_joined
2346,Joseph Green,59,2016-02-08T06:04:31.000+0000
1171,Jessica Smith,58,2016-05-01T06:14:36.000+0000
4049,Gregory Bray,20,2016-11-10T12:44:31.000+0000
1172,Anne Hunt,30,2015-12-10T12:01:22.000+0000
8501,April Barr,26,2016-02-06T19:00:11.000+0000
899,Brenda Brown,36,2015-10-27T03:03:39.000+0000
10888,Chad Smith,32,2017-10-07T11:21:43.000+0000
5880,Andrea Blankenship,27,2016-04-11T21:27:52.000+0000
2672,Kelli Simmons,37,2016-02-13T12:02:10.000+0000
5993,Aaron Anderson,21,2015-10-23T03:43:54.000+0000


In [0]:
# Write cleaned Pinterest stream data to Delta Table
df_pin_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/12b83b649269_pin_table_checkpoint") \
  .table("12b83b649269_pin_table")



In [0]:
# Write cleaned Geolocation stream data to Delta Table
df_geo_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/12b83b649269_geo_table_checkpoint") \
  .table("12b83b649269_geo_table")



In [0]:
# Write cleaned User stream data to Delta Table
df_user_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/12b83b649269_user_table_checkpoint") \
  .table("12b83b649269_user_table")

