In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- # Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124f98f775af-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
display(df_pin)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
partition-key,eyJpbmRleCI6NzUyOCwidW5pcXVlX2lkIjoiZmJlNTNjNjYtMzQ0Mi00NzczLWIxOWUtZDNlYzZmNTRkZGRmIiwidGl0bGUiOiJObyBUaXRsZSBEYXRhIEF2YWlsYWJsZSIsImRlc2NyaXB0aW9uIjoiTm8gZGVzY3JpcHRpb24= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612267573778609903714676848459810,2024-09-27T09:52:40.150+0000
partition-key,eyJpbmRleCI6Mjg2MywidW5pcXVlX2lkIjoiOWJmMzk0MzctNDJhNi00ZjAyLTk5YTAtOWEwMzgzZDhjZDcwIiwidGl0bGUiOiIyNSBTdXBlciBGdW4gU3VtbWVyIENyYWZ0cyBmb3IgS2lkcyAtIE9mIExpZmUgYW5kIExpc2E= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612270014599839705651186738659362,2024-09-27T09:52:42.576+0000
partition-key,eyJpbmRleCI6NTczMCwidW5pcXVlX2lkIjoiMWUxZjBjOGItOWZjZi00NjBiLTkxNTQtYzc3NTgyNzIwNmViIiwidGl0bGUiOiJJc2xhbmQgT2FzaXMgQ291cG9uIE9yZ2FuaXplciIsImRlc2NyaXB0aW9uIjoiRGVzY3JpcHQ= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612272246276902714256780685213730,2024-09-27T09:52:44.841+0000
partition-key,eyJpbmRleCI6ODMwNCwidW5pcXVlX2lkIjoiNWI2ZDA5MTMtMjVlNC00M2FiLTgzOWQtODVkNTUxNmY3OGE0IiwidGl0bGUiOiJUaGUgIzEgUmVhc29uIFlvdeKAmXJlIE5vdCBIaXMgUHJpb3JpdHkgQW55bW9yZSAtIE1hdHQ= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612274967568922666787259107246114,2024-09-27T09:52:47.430+0000
partition-key,eyJpbmRleCI6ODczMSwidW5pcXVlX2lkIjoiZWE3NjBmNzEtZmViZi00MDIzLWI1OTItZDE3Mzk2NjU5MDM5IiwidGl0bGUiOiIyMCBLb2kgRmlzaCBUYXR0b29zIEZvciBMdWNreSBNZW4iLCJkZXNjcmlwdGlvbiI6IktvaSA= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612276297387324242879420003516450,2024-09-27T09:52:48.754+0000
partition-key,eyJpbmRleCI6MTMxMywidW5pcXVlX2lkIjoiNDQ2NjIwNDUtZTg5MS00ODIxLThhMTktZWJlN2VlZGQzNzFhIiwidGl0bGUiOiJMaXF1aWQgTGFzaCBFeHRlbnNpb25zIE1hc2NhcmEiLCJkZXNjcmlwdGlvbiI6Ikluc3RhbnQ= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612277694905571717390814683332642,2024-09-27T09:52:50.166+0000
partition-key,eyJpbmRleCI6NDMxNSwidW5pcXVlX2lkIjoiMjFiNTliYTktODI5ZC00YzMzLThjMjctNGNkNGM1NmQyNmI4IiwidGl0bGUiOiJQb2RjYXN0cyBmb3IgVGVhY2hlcnMgb3IgUGFyZW50cyBvZiBUZWVuYWdlcnMiLCJkZXNjcmk= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612280216724831433507547998322722,2024-09-27T09:52:52.564+0000
partition-key,eyJpbmRleCI6MTA3OTQsInVuaXF1ZV9pZCI6ImM0YmQyNTc3LWE3YmItNDQwOS1iYjdhLTE3ZDVlZDdlMWNmMSIsInRpdGxlIjoiVGlyZUJ1eWVyIiwiZGVzY3JpcHRpb24iOiJOaXNzYW4gR1QtUi4gU2ljay4iLCJwb3N0ZXI= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612281808880135865974239805833250,2024-09-27T09:52:53.977+0000
partition-key,eyJpbmRleCI6NTQ5NCwidW5pcXVlX2lkIjoiOGZiMmFmNjgtNTQzYi00NjM5LTgxMTktZGUzM2QyODcwNmVkIiwidGl0bGUiOiJEYXZlIFJhbXNleSdzIDcgQmFieSBTdGVwczogV2hhdCBBcmUgVGhleSBBbmQgV2lsbCBUaGU= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612283316410632925416889383911458,2024-09-27T09:52:55.400+0000
partition-key,eyJpbmRleCI6NTA2OSwidW5pcXVlX2lkIjoiYjc1YjZmODctZGViMy00NDRmLWIyOWUtY2U5MTYxYjJkZjQ5IiwidGl0bGUiOiJUaGUgVmF1bHQ6IEN1cmF0ZWQgJiBSZWZpbmVkIFdlZGRpbmcgSW5zcGlyYXRpb24iLCJkZXM= (truncated),streaming-124f98f775af-pin,shardId-000000000002,49655440148212112345281612284675243254172260150473130018,2024-09-27T09:52:56.633+0000


In [0]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124f98f775af-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124f98f775af-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()



In [0]:
df_geo = df_geo.selectExpr("CAST(data as STRING)")

In [0]:
display(df_geo)

data
"{""ind"":7528,""timestamp"":""2020-08-28T03:52:47"",""latitude"":-89.9787,""longitude"":-173.293,""country"":""Albania""}"
"{""ind"":2863,""timestamp"":""2020-04-27T13:34:16"",""latitude"":-5.34445,""longitude"":-177.924,""country"":""Armenia""}"
"{""ind"":5730,""timestamp"":""2021-04-19T17:37:03"",""latitude"":-77.015,""longitude"":-101.437,""country"":""Colombia""}"
"{""ind"":8304,""timestamp"":""2019-09-13T04:50:29"",""latitude"":-28.8852,""longitude"":-164.87,""country"":""French Guiana""}"
"{""ind"":8731,""timestamp"":""2020-07-17T04:39:09"",""latitude"":-83.104,""longitude"":-171.302,""country"":""Aruba""}"
"{""ind"":1313,""timestamp"":""2018-06-26T02:39:25"",""latitude"":77.0447,""longitude"":61.9119,""country"":""Maldives""}"
"{""ind"":4315,""timestamp"":""2019-12-15T03:51:28"",""latitude"":-45.8508,""longitude"":66.1003,""country"":""Cote d'Ivoire""}"
"{""ind"":10794,""timestamp"":""2022-01-01T02:26:50"",""latitude"":-89.5236,""longitude"":-154.567,""country"":""Cocos (Keeling) Islands""}"
"{""ind"":5494,""timestamp"":""2021-07-21T02:02:35"",""latitude"":-82.6768,""longitude"":-129.202,""country"":""Bulgaria""}"
"{""ind"":5069,""timestamp"":""2021-03-20T09:32:44"",""latitude"":-63.0063,""longitude"":-157.474,""country"":""Azerbaijan""}"


In [0]:
df_pin = df_pin.selectExpr("CAST(data as STRING)")

In [0]:
display(df_pin)

data
"{""index"":7528,""unique_id"":""fbe53c66-3442-4773-b19e-d3ec6f54dddf"",""title"":""No Title Data Available"",""description"":""No description available Story format"",""poster_name"":""User Info Error"",""follower_count"":""User Info Error"",""tag_list"":""N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"",""is_image_or_video"":""multi-video(story page format)"",""image_src"":""Image src error."",""downloaded"":0,""save_location"":""Local save in /data/mens-fashion"",""category"":""mens-fashion""}"
"{""index"":2863,""unique_id"":""9bf39437-42a6-4f02-99a0-9a0383d8cd70"",""title"":""25 Super Fun Summer Crafts for Kids - Of Life and Lisa"",""description"":""Keep the kids busy this summer with these easy diy crafts and projects. Creative and…"",""poster_name"":""Of Life & Lisa | Lifestyle Blog"",""follower_count"":""124k"",""tag_list"":""Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg"",""downloaded"":1,""save_location"":""Local save in /data/diy-and-crafts"",""category"":""diy-and-crafts""}"
"{""index"":5730,""unique_id"":""1e1f0c8b-9fcf-460b-9154-c775827206eb"",""title"":""Island Oasis Coupon Organizer"",""description"":""Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the \""basic\"" couponer - holds up to 500 coupons with ease, and is made long enough so that you… "",""poster_name"":""Consuelo Aguirre"",""follower_count"":""0"",""tag_list"":""Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg"",""downloaded"":1,""save_location"":""Local save in /data/finance"",""category"":""finance""}"
"{""index"":8304,""unique_id"":""5b6d0913-25e4-43ab-839d-85d5516f78a4"",""title"":""The #1 Reason You’re Not His Priority Anymore - Matthew Coast"",""description"":""#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself"",""poster_name"":""Commitment Connection"",""follower_count"":""51k"",""tag_list"":""Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png"",""downloaded"":1,""save_location"":""Local save in /data/quotes"",""category"":""quotes""}"
"{""index"":8731,""unique_id"":""ea760f71-febf-4023-b592-d17396659039"",""title"":""20 Koi Fish Tattoos For Lucky Men"",""description"":""Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design."",""poster_name"":""TheTrendSpotter"",""follower_count"":""211k"",""tag_list"":""Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg"",""downloaded"":1,""save_location"":""Local save in /data/tattoos"",""category"":""tattoos""}"
"{""index"":1313,""unique_id"":""44662045-e891-4821-8a19-ebe7eedd371a"",""title"":""Liquid Lash Extensions Mascara"",""description"":""Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!"",""poster_name"":""Thrive Causemetics"",""follower_count"":""43k"",""tag_list"":""N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"",""is_image_or_video"":""video"",""image_src"":""https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg"",""downloaded"":1,""save_location"":""Local save in /data/beauty"",""category"":""beauty""}"
"{""index"":4315,""unique_id"":""21b59ba9-829d-4c33-8c27-4cd4c56d26b8"",""title"":""Podcasts for Teachers or Parents of Teenagers"",""description"":""Podcasts for Teachers or Parents of Teenagers: Teaching teens middle school and high school can feel joyful and rewarding most days, but can also frustrate you with one challeng… "",""poster_name"":""Math Giraffe"",""follower_count"":""25k"",""tag_list"":""Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg"",""downloaded"":1,""save_location"":""Local save in /data/education"",""category"":""education""}"
"{""index"":10794,""unique_id"":""c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1"",""title"":""TireBuyer"",""description"":""Nissan GT-R. Sick."",""poster_name"":""Ray Uyemura"",""follower_count"":""437"",""tag_list"":""Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg"",""downloaded"":1,""save_location"":""Local save in /data/vehicles"",""category"":""vehicles""}"
"{""index"":5494,""unique_id"":""8fb2af68-543b-4639-8119-de33d28706ed"",""title"":""Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You"",""description"":""If you love budgeting, make sure to give Dave Ramsey's 7 Baby Steps a try. Follow these steps to begin your debt snowball, build an emergency fund, invest and reach riches. I ca… "",""poster_name"":""Living Low Key | Save Money, Make Money, & Frugal Living"",""follower_count"":""26k"",""tag_list"":""Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png"",""downloaded"":1,""save_location"":""Local save in /data/finance"",""category"":""finance""}"
"{""index"":5069,""unique_id"":""b75b6f87-deb3-444f-b29e-ce9161b2df49"",""title"":""The Vault: Curated & Refined Wedding Inspiration"",""description"":""Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY"",""poster_name"":""Style Me Pretty"",""follower_count"":""6M"",""tag_list"":""60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design"",""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg"",""downloaded"":1,""save_location"":""Local save in /data/event-planning"",""category"":""event-planning""}"


In [0]:
from pyspark.sql.functions import regexp_replace, col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def clean_pin_df(df):
    # Define the schema for the JSON data
    schema = StructType([
        StructField("index", IntegerType(), True),
        StructField("unique_id", StringType(), True),
        StructField("title", StringType(), True),
        StructField("description", StringType(), True),
        StructField("follower_count", StringType(), True),
        StructField("poster_name", StringType(), True),
        StructField("tag_list", StringType(), True),
        StructField("is_image_or_video", StringType(), True),
        StructField("image_src", StringType(), True),
        StructField("save_location", StringType(), True),
        StructField("category", StringType(), True)
    ])
    
    # Parse the JSON data and create a DataFrame with the specified schema
    df_parsed = df.withColumn("data", from_json(col("data"), schema)).select("data.*")
    
    # Replace empty entries and entries with no relevant data with None
    cleaned_df = df_parsed.replace(
        {" ": None, "N/A": None, "n/a": None, "nan": None, "null": None, "NULL": None, "None": None, "none": None},
        subset=["unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]
    )
    
    # Ensure every entry in follower_count is a number
    cleaned_df = cleaned_df.withColumn("follower_count", regexp_replace("follower_count", "[^0-9]", ""))
    
    # Cast follower_count to integer
    cleaned_df = cleaned_df.withColumn("follower_count", cleaned_df["follower_count"].cast("int"))
    
    # Clean the save_location column to include only the save location path
    cleaned_df = cleaned_df.withColumn("save_location", regexp_replace("save_location", r".*\/", ""))
    
    # Rename the index column to ind
    cleaned_df = cleaned_df.withColumnRenamed("index", "ind")
    
    # Reorder the DataFrame columns
    cleaned_df = cleaned_df.select(
        "ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"
    )
    
    return cleaned_df

try:
    # Apply the cleaning function to df_pin
    df_pin_cleaned = clean_pin_df(df_pin)
    
    # Display the cleaned DataFrame
    display(df_pin_cleaned)
except Exception as e:
    raise ValueError("An error occurred while cleaning the DataFrame: " + str(e))

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",multi-video(story page format),Image src error.,mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",image,https://i.pinimg.com/originals/b3/bc/e2/b3bce2964e8c8975387b39660eed5f16.jpg,diy-and-crafts,diy-and-crafts
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",image,https://i.pinimg.com/originals/65/bb/ea/65bbeaf458907bb079317d8303c4fa0e.jpg,finance,finance
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,51.0,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",image,https://i.pinimg.com/originals/c6/64/ee/c664ee71524fb5a6e7b7b49233f93b43.png,quotes,quotes
8731,ea760f71-febf-4023-b592-d17396659039,20 Koi Fish Tattoos For Lucky Men,"Koi fish tattoos are a popular choice for men who want to make a statement, thanks to their rich symbolism and bold design.",211.0,TheTrendSpotter,"Dr Tattoo,Wörter Tattoos,Pisces Tattoos,Tatoo Art,Dream Tattoos,Dope Tattoos,Mini Tattoos,Finger Tattoos,Body Art Tattoos",image,https://i.pinimg.com/originals/8a/0c/0a/8a0c0a7b6236565c519acd41ad1a52c0.jpg,tattoos,tattoos
1313,44662045-e891-4821-8a19-ebe7eedd371a,Liquid Lash Extensions Mascara,"Instantly create the look of lash extensions with this award-winning, best-selling mascara that won't clump, flake or smudge. Available in 3 shades!",43.0,Thrive Causemetics,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",video,https://i.pinimg.com/videos/thumbnails/originals/69/84/e2/6984e20f3e262098fa9c0614c3453254.0000001.jpg,beauty,beauty
4315,21b59ba9-829d-4c33-8c27-4cd4c56d26b8,Podcasts for Teachers or Parents of Teenagers,"Podcasts for Teachers or Parents of Teenagers: Teaching teens middle school and high school can feel joyful and rewarding most days, but can also frustrate you with one challeng…",25.0,Math Giraffe,"Middle School Classroom,High School Students,High School Teachers,Middle School Tips,High School Counseling,Ela Classroom,High School Science,Future Classroom,Google Classroom",image,https://i.pinimg.com/originals/50/19/31/501931a27ee4d076658980851b995b2c.jpg,education,education
10794,c4bd2577-a7bb-4409-bb7a-17d5ed7e1cf1,TireBuyer,Nissan GT-R. Sick.,437.0,Ray Uyemura,"Lowrider,Old Vintage Cars,Antique Cars,Austin Martin,Nissan Gtr Black,Jaguar,1959 Cadillac,Cadillac Ct6,Old School Cars",image,https://i.pinimg.com/originals/0d/29/9f/0d299f3df020395aa7ce8387f40fbeed.jpg,vehicles,vehicles
5494,8fb2af68-543b-4639-8119-de33d28706ed,Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You,"If you love budgeting, make sure to give Dave Ramsey's 7 Baby Steps a try. Follow these steps to begin your debt snowball, build an emergency fund, invest and reach riches. I ca…",26.0,"Living Low Key | Save Money, Make Money, & Frugal Living","Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas",image,https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png,finance,finance
5069,b75b6f87-deb3-444f-b29e-ce9161b2df49,The Vault: Curated & Refined Wedding Inspiration,Sacramento California Wedding 2 Chic Events & Design Jodi Yorston Photography Wilson Vineyards Barn Miosa Couture Yellow Barn Vineyard Outdoor Candles DIY,6.0,Style Me Pretty,"60th Anniversary Parties,Anniversary Decorations,Golden Anniversary,25th Wedding Anniversary,Anniversary Pictures,Anniversary Ideas,Birthday Decorations,Event Planning Design,Event Design",image,https://i.pinimg.com/originals/7e/45/90/7e45905fefa36347e83333fd6d091140.jpg,event-planning,event-planning


In [0]:
from pyspark.sql.functions import array, col, to_timestamp, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

def clean_geo_df(df):
    # Define the schema for the JSON data
    schema = StructType([
        StructField("ind", IntegerType(), True),
        StructField("timestamp", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("country", StringType(), True)
    ])
    
    # Parse the JSON data and create a DataFrame with the specified schema
    df_parsed = df.withColumn("data", from_json(col("data"), schema)).select("data.*")
    
    # Convert the timestamp column from a string to a timestamp data type
    df_parsed = df_parsed.withColumn("timestamp", to_timestamp(col("timestamp")))
    
    # Create a new column 'coordinates' that contains an array based on the latitude and longitude columns
    df_parsed = df_parsed.withColumn("coordinates", array("latitude", "longitude"))
    
    # Drop the latitude and longitude columns
    df_parsed = df_parsed.drop("latitude", "longitude")
    
    # Reorder the DataFrame columns
    df_cleaned = df_parsed.select("ind", "country", "coordinates", "timestamp")
    
    return df_cleaned

try:
    # Apply the cleaning function to df_geo
    df_geo_cleaned = clean_geo_df(df_geo)
    
    # Display the cleaned DataFrame
    display(df_geo_cleaned)
except Exception as e:
    raise ValueError("An error occurred while cleaning the DataFrame: " + str(e))

ind,country,coordinates,timestamp
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47.000+0000
2863,Armenia,"List(-5.34445, -177.924)",2020-04-27T13:34:16.000+0000
5730,Colombia,"List(-77.015, -101.437)",2021-04-19T17:37:03.000+0000
8304,French Guiana,"List(-28.8852, -164.87)",2019-09-13T04:50:29.000+0000
8731,Aruba,"List(-83.104, -171.302)",2020-07-17T04:39:09.000+0000
1313,Maldives,"List(77.0447, 61.9119)",2018-06-26T02:39:25.000+0000
4315,Cote d'Ivoire,"List(-45.8508, 66.1003)",2019-12-15T03:51:28.000+0000
10794,Cocos (Keeling) Islands,"List(-89.5236, -154.567)",2022-01-01T02:26:50.000+0000
5494,Bulgaria,"List(-82.6768, -129.202)",2021-07-21T02:02:35.000+0000
5069,Azerbaijan,"List(-63.0063, -157.474)",2021-03-20T09:32:44.000+0000


In [0]:
df_user = df_user.selectExpr("CAST(data as STRING)")

In [0]:
display(df_user)

data
"{""ind"":7528,""first_name"":""Abigail"",""last_name"":""Ali"",""age"":20,""date_joined"":""2015-10-24T11:23:51""}"
"{""ind"":2863,""first_name"":""Dylan"",""last_name"":""Holmes"",""age"":32,""date_joined"":""2016-10-23T14:06:51""}"
"{""ind"":5730,""first_name"":""Rachel"",""last_name"":""Davis"",""age"":36,""date_joined"":""2015-12-08T20:02:43""}"
"{""ind"":8304,""first_name"":""Charles"",""last_name"":""Berry"",""age"":25,""date_joined"":""2015-12-28T04:21:39""}"
"{""ind"":8731,""first_name"":""Andrea"",""last_name"":""Alexander"",""age"":21,""date_joined"":""2015-11-10T09:27:42""}"
"{""ind"":1313,""first_name"":""Brittany"",""last_name"":""Jones"",""age"":32,""date_joined"":""2016-04-02T03:51:23""}"
"{""ind"":4315,""first_name"":""Michelle"",""last_name"":""Prince"",""age"":36,""date_joined"":""2015-12-20T16:38:13""}"
"{""ind"":10794,""first_name"":""Thomas"",""last_name"":""Turner"",""age"":34,""date_joined"":""2016-12-22T00:02:02""}"
"{""ind"":5494,""first_name"":""Anne"",""last_name"":""Allen"",""age"":27,""date_joined"":""2015-12-16T15:20:05""}"
"{""ind"":5069,""first_name"":""Amanda"",""last_name"":""Ball"",""age"":25,""date_joined"":""2016-01-13T17:36:30""}"


In [0]:
from pyspark.sql.functions import concat_ws, to_timestamp, col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def clean_user_df(df):
    # Define the schema for the JSON data
    schema = StructType([
        StructField("ind", IntegerType(), True),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("date_joined", StringType(), True)
    ])
    
    # Cast the data column to string before parsing the JSON data
    df_parsed = df.withColumn("data", from_json(col("data").cast("string"), schema)).select("data.*")
    
    # Create a new column 'user_name' that concatenates the information found in the first_name and last_name columns
    cleaned_df = df_parsed.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))
    
    # Drop the first_name and last_name columns
    cleaned_df = cleaned_df.drop("first_name", "last_name")
    
    # Convert the date_joined column from a string to a timestamp data type
    cleaned_df = cleaned_df.withColumn("date_joined", to_timestamp(col("date_joined")))
    
    # Reorder the DataFrame columns
    cleaned_df = cleaned_df.select("ind", "user_name", "age", "date_joined")
    
    return cleaned_df

# Apply the cleaning function to df_user
df_user_cleaned = clean_user_df(df_user)

# Display the cleaned DataFrame
display(df_user_cleaned)

ind,user_name,age,date_joined
7528,Abigail Ali,20,2015-10-24T11:23:51.000+0000
2863,Dylan Holmes,32,2016-10-23T14:06:51.000+0000
5730,Rachel Davis,36,2015-12-08T20:02:43.000+0000
8304,Charles Berry,25,2015-12-28T04:21:39.000+0000
8731,Andrea Alexander,21,2015-11-10T09:27:42.000+0000
1313,Brittany Jones,32,2016-04-02T03:51:23.000+0000
4315,Michelle Prince,36,2015-12-20T16:38:13.000+0000
10794,Thomas Turner,34,2016-12-22T00:02:02.000+0000
5494,Anne Allen,27,2015-12-16T15:20:05.000+0000
5069,Amanda Ball,25,2016-01-13T17:36:30.000+0000


In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)



In [0]:
df_user_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("124f98f775af_user_table")

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)


In [0]:
df_pin_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("124f98f775af_pin_table")

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)


In [0]:
df_geo_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("124f98f775af_geo_table")