In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
df_geo_streaming = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a5afda0229f-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
(display(df_geo_streaming))

data
"{""index"":470,""timestamp"":""2019-03-19 05:00:26"",""latitude"":87.8892,""longitude"":35.6314,""country"":""Djibouti""}"
"{""index"":974,""timestamp"":""2022-03-17 09:24:07"",""latitude"":-56.9556,""longitude"":94.6183,""country"":""Svalbard & Jan Mayen Islands""}"
"{""index"":722,""timestamp"":""2021-08-13 07:42:02"",""latitude"":-53.6534,""longitude"":-149.875,""country"":""Antigua and Barbuda""}"
"{""index"":749,""timestamp"":""2021-01-04 00:08:03"",""latitude"":67.5426,""longitude"":61.4646,""country"":""Sudan""}"
"{""index"":443,""timestamp"":""2017-12-28 04:23:13"",""latitude"":-53.7383,""longitude"":-86.0636,""country"":""Saint Helena""}"
"{""index"":545,""timestamp"":""2019-12-27 14:15:05"",""latitude"":82.2559,""longitude"":-160.379,""country"":""Grenada""}"
"{""index"":82,""timestamp"":""2022-05-19 07:17:10"",""latitude"":29.9602,""longitude"":-101.96,""country"":""India""}"
"{""index"":269,""timestamp"":""2018-10-02 05:44:38"",""latitude"":-88.8298,""longitude"":-170.188,""country"":""Albania""}"
"{""index"":674,""timestamp"":""2020-01-16 04:13:22"",""latitude"":81.1893,""longitude"":-112.978,""country"":""New Caledonia""}"
"{""index"":343,""timestamp"":""2018-05-12 08:22:11"",""latitude"":-87.0574,""longitude"":-164.826,""country"":""Bahamas""}"


In [0]:
df_pin_streaming = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a5afda0229f-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
(display(df_pin_streaming))

In [0]:
df_user_streaming = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a5afda0229f-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
(display(df_user_streaming))

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
age,eyJpbmRleCI6NDcwLCJmaXJzdF9uYW1lIjoiRXJpYyIsImxhc3RfbmFtZSI6IlF1aW5uIiwiYWdlIjoyNywiZGF0ZV9qb2luZWQiOiIyMDE1LTExLTA3IDAwOjU5OjA3In0=,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494051517784695168794005340178,2023-10-21T10:41:41.751+0000
age,eyJpbmRleCI6OTc0LCJmaXJzdF9uYW1lIjoiRGF2aWQiLCJsYXN0X25hbWUiOiJKb25lcyIsImFnZSI6NDAsImRhdGVfam9pbmVkIjoiMjAxNi0wNi0zMCAwMDo1MDozMSJ9,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494194171031409695174059622418,2023-10-21T10:41:43.309+0000
age,eyJpbmRleCI6NzIyLCJmaXJzdF9uYW1lIjoiQmV0aCIsImxhc3RfbmFtZSI6IkFybXN0cm9uZyIsImFnZSI6MjIsImRhdGVfam9pbmVkIjoiMjAxNy0wMy0yMyAwMzo0MToxNCJ9,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494258244099849270520319049746,2023-10-21T10:41:43.825+0000
age,eyJpbmRleCI6NzQ5LCJmaXJzdF9uYW1lIjoiQW5nZWxhIiwibGFzdF9uYW1lIjoiTWFjZG9uYWxkIiwiYWdlIjo0NCwiZGF0ZV9qb2luZWQiOiIyMDE2LTEyLTI1IDE4OjQyOjA2In0=,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494426284788775704044322684946,2023-10-21T10:41:45.386+0000
age,eyJpbmRleCI6NDQzLCJmaXJzdF9uYW1lIjoiQ2FybGEiLCJsYXN0X25hbWUiOiJBdGtpbnMiLCJhZ2UiOjQ5LCJkYXRlX2pvaW5lZCI6IjIwMTYtMDUtMDggMjA6Mjk6MzMifQ==,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494590698700243293818241155090,2023-10-21T10:41:46.911+0000
age,eyJpbmRleCI6NTQ1LCJmaXJzdF9uYW1lIjoiS2VsbHkiLCJsYXN0X25hbWUiOiJCcm93biIsImFnZSI6MjksImRhdGVfam9pbmVkIjoiMjAxNy0wMy0wMSAyMToyMzoxNSJ9,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494664443175239786197898231826,2023-10-21T10:41:47.455+0000
age,eyJpbmRleCI6ODIsImZpcnN0X25hbWUiOiJBbmRyZXMiLCJsYXN0X25hbWUiOiJDb3J0ZXoiLCJhZ2UiOjI2LCJkYXRlX2pvaW5lZCI6IjIwMTUtMTEtMjAgMjE6NTA6MzkifQ==,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494727307317859746983702429714,2023-10-21T10:41:48.004+0000
age,eyJpbmRleCI6MjY5LCJmaXJzdF9uYW1lIjoiQWRhbSIsImxhc3RfbmFtZSI6IkFjb3N0YSIsImFnZSI6MjAsImRhdGVfam9pbmVkIjoiMjAxNS0xMC0yMSAyMToyNjo0NSJ9,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494891721229327336620181946386,2023-10-21T10:41:49.553+0000
age,eyJpbmRleCI6Njc0LCJmaXJzdF9uYW1lIjoiVGltb3RoeSIsImxhc3RfbmFtZSI6IldhcmQiLCJhZ2UiOjM3LCJkYXRlX2pvaW5lZCI6IjIwMTYtMTAtMDYgMjI6MDE6MzgifQ==,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985494953376446127682708091961362,2023-10-21T10:41:50.068+0000
age,eyJpbmRleCI6MzQzLCJmaXJzdF9uYW1lIjoiQW5kcmV3IiwibGFzdF9uYW1lIjoiQnVya2UiLCJhZ2UiOjIwLCJkYXRlX2pvaW5lZCI6IjIwMTUtMTEtMTQgMTc6Mzg6MzEifQ==,streaming-0a5afda0229f-user,shardId-000000000001,49645209141473545910680985495025911995304560527293808658,2023-10-21T10:41:50.741+0000


In [0]:
#TASK-1
## PREPROCESSING df_pin_streaming DATA
df_pin_streaming.printSchema()

In [0]:
## TO SEEE THE DATA CONTAIN IN THE STREAMS I HAVE TO DESERIALIZED THE DATA

In [0]:
# geo_streaming deserialization
df_geo_streaming = df_geo_streaming.selectExpr("CAST(data as STRING)")

In [0]:
(display(df_geo_streaming))

In [0]:
# pin_streaming deserialization
df_pin_streaming = df_pin_streaming.selectExpr("CAST(data as STRING)")

In [0]:
(display(df_pin_streaming))

data
"{""index"":470,""unique_id"":""b12a8c2c-8498-4de5-abe6-fed29da8a93b"",""title"":""Silhouette su sfondi in gradazione cromatica"",""description"":""Gli studenti della 3B hanno imparato ad usare i colori acrilici mescolandoli tra loro per ottenere sfumature in gradazione cromatica. La sagoma nera è sdipinta successivamente c… "",""poster_name"":""Danielle's Taste Bud Ticklers"",""follower_count"":""5k"",""tag_list"":[""Oil Pastel Art"",""Oil Pastel Drawings"",""Art Drawings Sketches"",""Colorful Drawings"",""Colourful Art"",""Oil Pastels"",""Acrylic Paintings"",""Art Paintings"",""Sunset Paintings""],""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/41/2f/4e/412f4e281f2b2c3b42e48b2f9c4380a4.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":974,""unique_id"":""343517b0-1180-4828-be98-ac356b01b0ef"",""title"":""Natural Homemade Zit Zapper Cream Sticks"",""description"":""Homemade Zit Zapper Sticks. Super easy to make with only 2 ingredients! All natural and they actually work!"",""poster_name"":""Merissa (Little House Living) | Simple Living + Frugal Living"",""follower_count"":""152k"",""tag_list"":[""Do It Yourself Nails"",""Do It Yourself Home"",""Acne Remedies"",""Natural Remedies"",""Beauty Secrets"",""Beauty Hacks"",""Homemade Acne Treatment"",""Diy Beauté"",""Homemade Beauty Products""],""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/d5/46/01/d546011a8782a22dd26c44932f59fe4c.jpg"",""downloaded"":1,""save_location"":""Local save in /data/beauty"",""category"":""beauty""}"
"{""index"":722,""unique_id"":""106b90c8-f4da-4dec-957f-6f7e42fb4d43"",""title"":""How to paint easy water texture with watercolor pt. 2"",""description"":""No description available Story format"",""poster_name"":""Kelley Vivian | New England Landscape Original Art & Prints"",""follower_count"":""21k"",""tag_list"":[""Watercolor Paintings For Beginners"",""Watercolor Art Lessons"",""Watercolour Tutorials"",""Watercolor Techniques"",""Watercolor Landscape Tutorial"",""Watercolor Landscape Paintings"",""Acrylic Painting Techniques"",""Watercolor Flowers"",""How To Watercolor""],""is_image_or_video"":""multi-video(story page format)"",""image_src"":""https://i.pinimg.com/videos/thumbnails/originals/e8/b3/1a/e8b31ae96b57643a300e2c3f49e729b7.0000001.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":749,""unique_id"":""ba0d42fe-9d11-4321-8d73-169262c16a42"",""title"":""In My Sketchbook"",""description"":""Leaf and forest sketchbook drawings and doodles by Kate Hadfield (inspired by the Quirky Heart Digest prompts). Love this mixed media art journal project!"",""poster_name"":""Kate Hadfield"",""follower_count"":""36k"",""tag_list"":[""Doodle Art Drawing"",""Mandala Drawing"",""Leaf Drawing"",""Doodle Art Journals"",""Art Journal Pages"",""Journal Prompts"",""Pen Art"",""Marker Art"",""Jackson's Art""],""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/63/69/0d/63690dc3a75529228090f1b0a24286fc.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":443,""unique_id"":""a0fd8d86-7530-47f1-a354-8860b7decea4"",""title"":""Book Art Is Awesome: Drawn Edition"",""description"":""I am so sorry. The pieces in the previous book art posts were largely unattainable. We were talking large-scale sculptures or art gallery pieces. These weren't things you could… "",""poster_name"":""Book Riot"",""follower_count"":""154k"",""tag_list"":[""Sketch Painting"",""Drawing Sketches"",""Art Drawings"",""Sketch Ink"",""Drawing Ideas"",""Shadow Painting"",""Collage Drawing"",""Book Drawing"",""Shadow Art""],""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/99/6c/cb/996ccbf53f699adfada0e3a8263dbe14.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":545,""unique_id"":""bd51d340-f99d-4126-87b4-5f3667d93979"",""title"":""A cup of fantasy!"",""description"":""Funny cups painted with tempera! This is a good exercise to learn how many textures you can paint with tempera colours, and how many ways there are to use use a paintbrush. Enjo… "",""poster_name"":""Snezana Katanic"",""follower_count"":""13"",""tag_list"":[""Winter Art Projects"",""School Art Projects"",""Winter Project"",""Art 2nd Grade"",""Square 1 Art"",""Arte Elemental"",""Classe D'art"",""Pop Art"",""Ecole Art""],""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/d7/39/82/d73982741e6bf9708be421232791f2cf.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":82,""unique_id"":""f62e3bd2-cf77-44c8-ae64-81273f901592"",""title"":""🍂watercolor autumn leaves 🍂"",""description"":""No description available Story format"",""poster_name"":""Zezè"",""follower_count"":""150k"",""tag_list"":[""Watercolor Art Lessons"",""Watercolor Painting Techniques"",""Painting Tips"",""Easy Watercolor Paintings"",""Watercolor Tips"",""Watercolor Tutorials"",""Painting Tutorials"",""Watercolor Flowers"",""Watercolors""],""is_image_or_video"":""multi-video(story page format)"",""image_src"":""https://i.pinimg.com/videos/thumbnails/originals/c1/3f/d4/c13fd4de97106b98970b81402a4ca9f3.0000001.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":269,""unique_id"":""5f694a45-9b10-4634-b5fa-ced839985aab"",""title"":""Meet The Enigmatic Aykut Aydoğdu Illustrations"",""description"":""This Turkish, Istanbul-based illustrator Aykut Aydoğdu has developed a unique style creating surreal, enigmatic illustrations that reflect the most intense and incomprehensible… "",""poster_name"":""Bored Panda"",""follower_count"":""2M"",""tag_list"":[""Surreal Artwork"",""Fantasy Artwork"",""Portraits"",""Portrait Art"",""Trucage Photo"",""Surealism Art"",""Art Et Design"",""Graphic Design"",""Surrealism Photography""],""is_image_or_video"":""image"",""image_src"":""https://i.pinimg.com/originals/b1/f7/2e/b1f72e09dd91719ae802bdf4b2d4aa6b.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":674,""unique_id"":""db168555-2951-4295-8961-317efe30e0bb"",""title"":""Regenbogen-Regen - Tanzendes Mädchen (inkl. Download)"",""description"":""Mit der Puste-Technik kann man einfach wunderschöne Kunstwerke gestalten! Lade dir einfach das Mädchen mit Schirm (auch verfügbar als Junge mit Schirm) herunter und los geht der… "",""poster_name"":""Faminino | Basteln mit Kinder, Ratgeber und easy DIY-Ideen "",""follower_count"":""71k"",""tag_list"":[""Crayon Art"",""Melted Crayon Crafts"",""Art Drawings Sketches Simple"",""Colorful Drawings"",""Diy Canvas Art"",""Art Club"",""Art Plastique"",""Art Activities"",""Diy Art""],""is_image_or_video"":""video"",""image_src"":""https://i.pinimg.com/videos/thumbnails/originals/76/53/0b/76530b1d736cea566c9a68dd8d96d3f7.0000001.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"
"{""index"":343,""unique_id"":""c5496429-6d97-4861-99cc-cfe97307ca6f"",""title"":""Color Mixing with Crayons- Positive and Negative Leaf Shapes"",""description"":""Try color mixing with crayons with this poitive and negative leaf shape project."",""poster_name"":""The Kitchen Table Classroom"",""follower_count"":""221k"",""tag_list"":[""Color Art Lessons"",""Art Lessons For Kids"",""Art Lessons Elementary"",""Art For Children"",""Easy Art For Kids"",""Kindergarten Art Lessons"",""Fall Art Projects"",""Line Art Projects"",""Art Education Projects""],""is_image_or_video"":""video"",""image_src"":""https://i.pinimg.com/videos/thumbnails/originals/99/eb/97/99eb97fcb0ad221ff9d599518b0e8ab4.0000001.jpg"",""downloaded"":1,""save_location"":""Local save in /data/art"",""category"":""art""}"


In [0]:
# user_streaming deserialization
df_user_streaming = df_user_streaming.selectExpr("CAST(data as STRING)")

In [0]:
(display(df_user_streaming))

data
"{""index"":470,""first_name"":""Eric"",""last_name"":""Quinn"",""age"":27,""date_joined"":""2015-11-07 00:59:07""}"
"{""index"":974,""first_name"":""David"",""last_name"":""Jones"",""age"":40,""date_joined"":""2016-06-30 00:50:31""}"
"{""index"":722,""first_name"":""Beth"",""last_name"":""Armstrong"",""age"":22,""date_joined"":""2017-03-23 03:41:14""}"
"{""index"":749,""first_name"":""Angela"",""last_name"":""Macdonald"",""age"":44,""date_joined"":""2016-12-25 18:42:06""}"
"{""index"":443,""first_name"":""Carla"",""last_name"":""Atkins"",""age"":49,""date_joined"":""2016-05-08 20:29:33""}"
"{""index"":545,""first_name"":""Kelly"",""last_name"":""Brown"",""age"":29,""date_joined"":""2017-03-01 21:23:15""}"
"{""index"":82,""first_name"":""Andres"",""last_name"":""Cortez"",""age"":26,""date_joined"":""2015-11-20 21:50:39""}"
"{""index"":269,""first_name"":""Adam"",""last_name"":""Acosta"",""age"":20,""date_joined"":""2015-10-21 21:26:45""}"
"{""index"":674,""first_name"":""Timothy"",""last_name"":""Ward"",""age"":37,""date_joined"":""2016-10-06 22:01:38""}"
"{""index"":343,""first_name"":""Andrew"",""last_name"":""Burke"",""age"":20,""date_joined"":""2015-11-14 17:38:31""}"


In [0]:
#TASK-1
## TRANSFORMING GEO DATA
df_pin_streaming.printSchema()

In [0]:
df_geo_streaming.printSchema()

In [0]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType

json_schema = StructType([
    StructField("index", StringType(), True),  # Change StringType() to the appropriate data type if needed
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True)  # Assuming it's a string; adjust if different
    # Add other expected fields here if necessary
])

# Parse the JSON string
df_geo_streaming = df_geo_streaming.withColumn("parsed_data", from_json(col("data"), json_schema))


In [0]:
from pyspark.sql.functions import array

# Create 'coordinates' column using the parsed data
df_geo_streaming = df_geo_streaming.withColumn("coordinates", array("parsed_data.latitude", "parsed_data.longitude"))

In [0]:
from pyspark.sql.functions import unix_timestamp

df_geo_streaming = df_geo_streaming.withColumn("timestamp", unix_timestamp(col("parsed_data.timestamp"), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

In [0]:
df_geo_streaming.printSchema()

In [0]:
df_geo_streaming = df_geo_streaming.drop("latitude", "longitude")

In [0]:
from pyspark.sql.functions import unix_timestamp, from_unixtime

# Assuming the timestamp column is in the format 'yyyy-MM-dd HH:mm:ss'
df_geo_streaming = df_geo_streaming.withColumn("timestamp", unix_timestamp(col("timestamp"), 'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

In [0]:
df_geo_streaming.printSchema()

In [0]:
### TRANSFORMING AND CLEANING PIN DATA

In [0]:
df_pin_streaming.printSchema()

In [0]:
## DATA DESERIALIZATION
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, BooleanType

# Define the expected schema of the JSON data
pin_json_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("follower_count", IntegerType(), True),
    StructField("poster_name", StringType(), True),
    StructField("tag_list", ArrayType(StringType()), True),  # Assuming this is a list of strings
    StructField("is_image_or_video", BooleanType(), True),  # Assuming this is a boolean
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

In [0]:
from pyspark.sql.functions import from_json

# Parse the JSON string
df_pin_streaming = df_pin_streaming.withColumn("parsed_data", from_json(col("data"), pin_json_schema))

In [0]:
df_pin_streaming = df_pin_streaming.select(
    col("parsed_data.ind").alias("ind"),
    col("parsed_data.unique_id").alias("unique_id"),
    col("parsed_data.title").alias("title"),
    col("parsed_data.description").alias("description"),
    col("parsed_data.follower_count").alias("follower_count"),
    col("parsed_data.poster_name").alias("poster_name"),
    col("parsed_data.tag_list").alias("tag_list"),
    col("parsed_data.is_image_or_video").alias("is_image_or_video"),
    col("parsed_data.image_src").alias("image_src"),
    col("parsed_data.save_location").alias("save_location"),
    col("parsed_data.category").alias("category")
)

In [0]:
df_pin_streaming.printSchema()

In [0]:
from pyspark.sql.functions import when

# A function to safely cast to a specified type, returning original string if cast fails
def safe_cast(column, to_type):
    return when(column.cast(to_type).isNotNull(), column.cast(to_type)).otherwise(column)

In [0]:
from pyspark.sql.functions import col, lit
from pyspark.sql.types import DataType

def safe_cast(column: col, target_type: DataType):
    """
    Safely cast a column to a target type.
    If the cast fails, it returns a default value.
    """
    default_value = {
        IntegerType: lit(0),
        StringType: lit(""),
        BooleanType: lit(False)
        # You can add more types and their default values as needed
    }.get(target_type, lit(None))
    
    return when(column.cast(target_type).isNotNull(), column.cast(target_type)).otherwise(default_value)

In [0]:
df_pin_streaming = df_pin_streaming.withColumn("is_image_or_video", safe_cast(col("is_image_or_video"), IntegerType()))
df_pin_streaming = df_pin_streaming.withColumn("unique_id", safe_cast(col("unique_id"), IntegerType()))

In [0]:
df_pin_streaming.printSchema()

In [0]:
### TRANSFORMING AND CLEANING USER DATA

In [0]:
from pyspark.sql.functions import regexp_replace

# Assuming df_pin_streaming is your streaming DataFrame
df_pin_streaming = df_pin_streaming.withColumn("save_location", 
                                              regexp_replace(col("save_location"), "^Local save in ", ""))

In [0]:
df_pin_streaming = df_pin_streaming.withColumnRenamed("index", "ind")

In [0]:
df_pin_streaming = df_pin_streaming.select(
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
)

In [0]:
df_pin_streaming.printSchema()

In [0]:
### USER DATA CLEANING AND TRANSFORMATION

In [0]:
df_user_streaming.printSchema()

In [0]:
# deserialization of user data
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

# Define the expected schema of the JSON data
json_schema = StructType([
    StructField("ind", LongType(), True),  # Using LongType for the ID; adjust if needed
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True)  # Assuming it's a string; adjust if different
])

# Parse the JSON string
df_user_streaming = df_user_streaming.withColumn("parsed_data", from_json(col("data"), json_schema))

# Flatten the data
df_user_streaming = df_user_streaming.select("parsed_data.*")


In [0]:
# Create a new column user_name that concatenates first_name and last_name
from pyspark.sql.functions import concat_ws

df_user_streaming = df_user_streaming.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

In [0]:
# drop first name and last name
df_user_streaming = df_user_streaming.drop("first_name", "last_name")

In [0]:
# Convert date joined to timestamp
from pyspark.sql.functions import to_timestamp

df_user_streaming = df_user_streaming.withColumn("date_joined", to_timestamp(col("date_joined")))

In [0]:
# Re-order
df_user_streaming = df_user_streaming.select(
    "ind",
    "user_name",
    "age",
    "date_joined"
)


In [0]:
df_user_streaming.printSchema()

In [0]:
### WRITE STREAM

In [0]:
# WRITE GEO DATA
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [0]:
df_geo_streaming.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a5afda0229f_geo_table")

In [0]:
df_pin_streaming.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a5afda0229f_pin_table")

In [0]:
df_user_streaming.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0a5afda0229f_user_table")