# Read streaming data from Kinesis

NOTE: 
- Change '<your_UserId>' with your own details
- The writeStreams must be interrupted before the next one can run

Using the preferred method start ingesting data into Kinesis Data Stream - such as sending data to an API with a Kinesis proxy integration.

Once data is seen arriving in the Kinesis data streams, read it into Databricks.

Firstly start by reading the table file containing the AWS Access Key and Secret Access Key.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import urllib
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType # help with schemas set up

# Define the path to the Delta table for credential information
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

Extract the ACCESS_KEY and SECRET_KEY from the spark dataframe created above. The secret access key will be encoded using urllib.parse.quote for security purposes. safe="" means that every character will be encoded.

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


# Cleaning methods

In [None]:
def cleaned_pin_df(df_to_clean):
    pin_df = df_to_clean 
    # Replace empty entries and entries with no relevant data in each column with Nones
    pin_df = pin_df.replace({
        'No description available Story format': None,
        'No description available': None,
        'Untitled': None}, subset=['description'])
    pin_df = pin_df.replace({'User Info Error': None}, subset=['follower_count'])
    pin_df = pin_df.replace({'Image src error.': None}, subset=['image_src'])
    pin_df = pin_df.replace({'User Info Error': None}, subset=['poster_name'])
    pin_df = pin_df.replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset=['tag_list'])
    pin_df = pin_df.replace({"No Title Data Available": None}, subset=["title"])

    # Perform transformations to ensure every entry is a number
    pin_df = pin_df.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
    pin_df = pin_df.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))

    # Clean the data to include only the save location path
    pin_df = pin_df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

    # Rename/drop column
    pin_df = pin_df.withColumnRenamed("index", "ind")
    pin_df = pin_df.drop("index")

    # Cast to datatype
    pin_df = pin_df.withColumn("follower_count", pin_df["follower_count"].cast("int"))
    # pin_df = pin_df.withColumn("downloaded", pin_df["downloaded"].cast("int"))
    # pin_df = pin_df.withColumn("downloaded",col("downloaded").cast("int"))
    pin_df = pin_df.withColumn("ind", pin_df["ind"].cast("int"))

    # Reorder the DataFrame columns
    pin_df = pin_df.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

    return pin_df

def cleaned_geo_df(df_to_clean):
    geo_df = df_to_clean 
    # Create array column based on the latitude and longitude columns
    geo_df = geo_df.withColumn("coordinates", array("latitude", "longitude"))

    # Drop the latitude and longitude columns from the DataFrame
    geo_df = geo_df.drop("latitude", "longitude")

    # Cast the timestamp column to a timestamp data type
    geo_df = geo_df.withColumn("timestamp", to_timestamp("timestamp"))

    # Reorder the DataFrame columns
    geo_df = geo_df.select("ind", "country", "coordinates", "timestamp")

    return geo_df


def cleaned_user_df(df_to_clean):
    user_df = df_to_clean 
    # Create a new column user_name that concatenates the information found in the first_name and last_name columns
    user_df = user_df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

    # Drop the first_name and last_name columns from the DataFrame
    user_df = user_df.drop("first_name", "last_name")

    # Cast the date_joined column to a timestamp data type
    user_df = user_df.withColumn("date_joined", to_timestamp("date_joined"))

    # Reorder the DataFrame columns
    user_df = user_df.select("ind", "user_name", "age", "date_joined")

    return user_df

# Pull [PIN stream]
 - Defining schema for stream table
 - Reading in streamed data
 - Deserialising stream data
 - Cleaning streamed data
 - Writing streams data into delta tables
 - NOTE: the writeStreams must be interrupted before the next one can run

In [None]:
# Pin schema

# Define structured streaming schema using StructType
pin_df_schema = StructType([\
    StructField("category", StringType(), True),\
    StructField("description", StringType(), True),\
    StructField("downloaded", IntegerType(), True),\
    StructField("follower_count", StringType(), True),\
    StructField("image_src", StringType(), True),\
    StructField("index", IntegerType(), True),\
    StructField("is_image_or_video", StringType(), True),\
    StructField("poster_name", StringType(), True),\
    StructField("save_location", StringType(), True),\
    StructField("tag_list", StringType(), True),\
    StructField("title", StringType(), True),\
    StructField("unique_id", StringType(), True),\
])

# Pin stream
# Now using the ACCESS_KEY and SECRET_KEY we can read the streaming data from Kinesis using the format below (make sure you are sending data to your stream before running the code cells below):

pin_df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-/<your_UserId>-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

##to look at the data
# display(pin(df)) 

# Cast the data column to string to deserialise
pin_df = pin_df.selectExpr("CAST(data AS STRING) AS jsonData")

##to look at the data
# display(pin(df)) 

# Deserialise the JSON data using the schema
# https://www.databricks.com/blog/2017/08/09/apache-sparks-structured-streaming-with-amazon-kinesis-on-databricks.html to help with transformation
pin_df = pin_df.select(from_json("jsonData", pin_df_schema).alias("parsed_data"))

# Explode the parsed_data struct column to get individual columns
pin_df = pin_df.select("parsed_data.*")

pin_df = cleaned_pin_df(pin_df)

display(pin_df)

# Remove the checkpoint folder first
# dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

pin_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("/<your_UserId>_pin_table_kinesis")

ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
6788,2c4458d6-d86f-46c1-a80a-46955406c453,Copy These 8 Street Style Looks To Look Sharp,Steal these looks..,613000.0,Mens Fashion - LIFESTYLE BY PS,"Stylish Mens Outfits,Casual Summer Outfits,Summer Outfits For Guys,Men's Spring Outfits,Best Winter Outfits Men,Nice Outfits For Men,Outfits For Teenage Guys,Mens Spring Fashion Outfits,Fashion Boots",image,https://i.pinimg.com/originals/be/2e/51/be2e51449fb41a79084f61515f4c4a2c.jpg,/data/mens-fashion,mens-fashion
4876,6c6ba449-6530-4cb8-bb72-d08f3a2a5e78,Happy Birthday Balloons Gifts Home Party Decorations - Pink / 50 pack,Imagine your birthday or wedding being illuminated with these beautiful LED balloons in the evening. What a wonderful atmosphere this would create and how surprised your guests…,3000.0,Lasercutwraps Shop,"Light Up Balloons,Led Balloons,Balloons Online,Round Balloons,White Balloons,Baby Shower Balloons,Balloon Chandelier,Balloon Garland,Broderie Simple",image,https://i.pinimg.com/originals/79/16/78/7916782bd9bd01c1853ccd6a0db76bfc.jpg,/data/event-planning,event-planning
470,b12a8c2c-8498-4de5-abe6-fed29da8a93b,Silhouette su sfondi in gradazione cromatica,Gli studenti della 3B hanno imparato ad usare i colori acrilici mescolandoli tra loro per ottenere sfumature in gradazione cromatica. La sagoma nera è sdipinta successivamente c…,5000.0,Danielle's Taste Bud Ticklers,"Oil Pastel Art,Oil Pastel Drawings,Art Drawings Sketches,Colorful Drawings,Colourful Art,Oil Pastels,Acrylic Paintings,Art Paintings,Sunset Paintings",image,https://i.pinimg.com/originals/41/2f/4e/412f4e281f2b2c3b42e48b2f9c4380a4.jpg,/data/art,art
10155,514986a6-a6f5-461d-a491-6e858381d481,Greece : 20 Photos to Inspire You to Visit Athens - Hedonisitit,"Athens, Greece - Photography guide to the city Greece Travel Honeymoon Backpack Backpacking Vacation #travel #honeymoon #vacation #backpacking #budgettravel #offthebeatenpath #b…",40000.0,Hedonistit | Blogging + Content Creation Tips,"Greece Photography,Photography Guide,Travel Photography,Light Photography,Cool Places To Visit,Places To Travel,Places To Go,Travel Destinations,Greece Places To Visit",image,https://i.pinimg.com/originals/0c/a4/be/0ca4bed1043243180abe35e4375cf2b1.jpg,/data/travel,travel
303,8d5372f8-b199-4936-9f8d-6df54e8c40e6,Watercolor Leaf Print,"Hello everyone! Bethany here from Whistle and Ivy, and I am so excited to share this lovely and EASY watercolor leaf print today. In the corner of my yard, I have a beautifully…",106000.0,PinkWhen | Easy Recipes For Everyone,"Leaf Crafts Kids,Fall Crafts For Kids,Projects For Kids,Art For Kids,Art Projects,Kids Diy,Leaf Projects,Ecole Art,Watercolor Projects",image,https://i.pinimg.com/originals/05/b6/4f/05b64fed4d6baeefe85dd216483d1b99.jpg,/data/art,art
8369,7e47f434-3da8-4dbd-b627-99045486e9fc,300+ Best Love Quotes By Rumi Will Inspire You To Find Clarity In Your Life & Relationships,Rumi is known as one of the greatest poets of all time. These powerful Rumi quotes about love and inspiration will bring clarity to even the most confusing relationships.,942000.0,YourTango,"Best Rumi Quotes,Best Love Quotes,Wisdom Quotes,Inspirational Quotes,Rumi Quotes On Life,Crush Quotes,Quotes Quotes,Wise Quotes About Love,Sufi Quotes",image,https://i.pinimg.com/originals/8b/01/bb/8b01bb02cf4696146414c2c0a6d569cf.jpg,/data/quotes,quotes
7195,24403f06-ede6-4ddb-8be6-5d327248e890,Amazing outfit idea,,6000.0,Billonaire dreamz,,multi-video(story page format),https://i.pinimg.com/videos/thumbnails/originals/b1/2c/3a/b12c3af8417bc5b8064bee1308abb349.0000001.jpg,/data/mens-fashion,mens-fashion
1663,61d05454-85e9-4505-99e3-64eae6611c82,Everything Christmas!!,,216.0,⭒ Maci Jordan⭒,"Christmas Feeling,Noel Christmas,Merry Little Christmas,Christmas Treats,All Things Christmas,Winter Christmas,Christmas Decorations,Xmas Holidays,Christmas Cookies",multi-video(story page format),,/data/christmas,christmas
1535,b3bb4c9c-e8e8-40c4-9e67-8e45bd720ab6,20 Of The Best Hair Tips You'll Ever Read,Hair mask,307000.0,Listotic - Easy Ideas | Fun DIY Projects | Easy Recipes | Lists,"Natural Hair Styles,Long Hair Styles,Wedding Hair Inspiration,Style Inspiration,Hair Remedies,Natural Remedies,Tips Belleza,Hair Health,About Hair",image,https://i.pinimg.com/originals/fa/9a/38/fa9a3809703c38e77b87fefd86856ad8.jpg,/data/beauty,beauty
6802,27dde4cc-ff40-4b55-b3ad-00e9df088901,Fit,,34.0,Therealmvpgabriel,"Trendy Mens Fashion,Look Fashion,Male Winter Fashion,Mens Streetwear Fashion,Mens Grunge Fashion,Korean Male Fashion,Street Fashion Men,Rustic Mens Fashion,Male Streetwear",multi-video(story page format),,/data/mens-fashion,mens-fashion


# Pull [GEO stream]
 - Defining schema for stream table
 - Reading in streamed data
 - Deserialising stream data
 - Cleaning streamed data
 - Writing streams data into delta tables
 - NOTE: the writeStreams must be interrupted before the next one can run

In [None]:
# Define a streaming schema using StructType
# Geo schema
geo_df_schema = StructType([\
    StructField("country", StringType(), True),\
    StructField("ind", IntegerType(), True),\
    StructField("latitude", StringType(), True),\
    StructField("longitude", StringType(), True),\
    StructField("timestamp", StringType(), True),\
])

# Geo stream
geo_df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-/<your_UserId>-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# Cast the data column to string to deserialize
geo_df = geo_df.selectExpr("CAST(data AS STRING) AS jsonData")

# Deserialize the JSON data using the schema
geo_df = geo_df.select(from_json("jsonData", geo_df_schema).alias("parsed_data"))

# Explode the parsed_data struct column to get individual columns
geo_df = geo_df.select("parsed_data.*")

geo_df = cleaned_geo_df(geo_df)

display(geo_df)

# Remove the checkpoint folder first
# dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

geo_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("/<your_UserId>_geo_table_kinesis")

ind,country,coordinates,timestamp
6788,Algeria,"List(-86.4791, -169.547)",2019-01-04T02:30:10.000+0000
4876,French Polynesia,"List(-50.8134, -160.465)",2022-01-23T15:52:52.000+0000
470,Djibouti,"List(87.8892, 35.6314)",2019-03-19T05:00:26.000+0000
10155,Mongolia,"List(-16.8008, -60.9122)",2020-05-04T16:34:26.000+0000
303,United States Virgin Islands,"List(31.1892, -163.128)",2022-08-17T16:18:21.000+0000
8369,Algeria,"List(-89.5173, -179.689)",2022-02-27T02:15:43.000+0000
7195,Lebanon,"List(-62.0138, 107.319)",2020-12-16T07:09:08.000+0000
1663,Comoros,"List(-11.0835, -57.025)",2018-10-07T00:54:16.000+0000
1535,Cuba,"List(-75.7236, -156.761)",2018-11-07T04:56:49.000+0000
6802,Comoros,"List(-50.2113, -67.5862)",2018-10-31T16:37:03.000+0000


# Pull [USER stream]
 - Defining schema for stream table
 - Reading in streamed data
 - Deserialising stream data
 - Cleaning streamed data
 - Writing streams data into delta tables
 - NOTE: the writeStreams must be interrupted before the next one can run

In [None]:
# Define a streaming schema using StructType
# User schema
user_df_schema = StructType([\
    StructField("age", StringType(), True), \
    StructField("category", StringType(), True), \
    StructField("date_joined", StringType(), True), \
    StructField("description", StringType(), True), \
    StructField("downloaded", IntegerType(), True), \
    StructField("first_name", StringType(), True), \
    StructField("follower_count", StringType(), True), \
    StructField("image_src", StringType(), True), \
    StructField("ind", IntegerType(), True), \
    StructField("index", IntegerType(), True), \
    StructField("is_image_or_video", StringType(), True), \
    StructField("last_name", StringType(), True), \
    StructField("poster_name", StringType(), True), \
    StructField("save_location", StringType(), True), \
    StructField("tag_list", StringType(), True), \
    StructField("title", StringType(), True), \
    StructField("unique_id", StringType(), True), \
])

# User stream
user_df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-/<your_UserId>-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# Cast the data column to string to deserialize
user_df = user_df.selectExpr("CAST(data AS STRING) AS jsonData")

# Deserialize the JSON data using the schema
user_df = user_df.select(from_json("jsonData", user_df_schema).alias("parsed_data"))

# Explode the parsed_data struct column to get individual columns
user_df = user_df.select("parsed_data.*")

user_df = cleaned_user_df(user_df)

display(user_df)

# Remove the checkpoint folder first
# dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

user_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("/<your_UserId>_user_table_kinesis")

ind,user_name,age,date_joined
470,Eric Quinn,27,2015-11-07T00:59:07.000+0000
10155,Barbara Harris,28,2016-01-18T00:02:03.000+0000
303,Natasha Cortez,51,2016-04-22T18:18:45.000+0000
8369,Aaron Abbott,20,2015-10-23T16:08:41.000+0000
7195,Amber Barnes,32,2017-08-14T00:57:09.000+0000
1663,Christopher Yoder,34,2017-06-23T05:46:55.000+0000
1535,Anthony Brown,33,2015-10-24T16:39:50.000+0000
6802,Tiffany Ward,51,2016-09-07T16:46:41.000+0000
7103,Amber Ashley,22,2016-09-05T00:23:48.000+0000
940,James Jones,21,2016-08-11T00:12:59.000+0000
