# Reading data streams with Kinesis

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

## Reading data from Kinesis Data Streams


In [None]:
pin_df_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124714cdee67-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
display(pin_df_stream)
pin_df = pin_df_stream.selectExpr("CAST(data as STRING)")
display(pin_df)

Parse string data into dataframe


In [None]:
pin_df = pin_df.select(json_tuple(col("data"),"index","unique_id", "title", "description", "poster_name", "follower_count", "tag_list", "is_image_or_video", "image_src", "downloaded", "save_location", "category")) \
    .toDF("index","unique_id", "title", "description", "poster_name", "follower_count", "tag_list", "is_image_or_video", "image_src", "downloaded", "save_location", "category")

display(pin_df)

In [None]:
geo_df_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124714cdee67-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
display(geo_df_stream)
geo_df = geo_df_stream.selectExpr("CAST(data as STRING)")
display(geo_df)

In [None]:
geo_df = geo_df.select(json_tuple(col("data"),"ind","timestamp", "latitude", "longitude", "country")) \
    .toDF("ind","timestamp", "latitude", "longitude", "country")

display(geo_df)

In [None]:
user_df_stream = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-124714cdee67-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [None]:
display(user_df_stream)
user_df = user_df_stream.selectExpr("CAST(data as STRING)")
display(user_df)

In [None]:
user_df = user_df.select(json_tuple(col("data"),"ind","first_name", "last_name", "age", "date_joined")) \
    .toDF("ind","first_name", "last_name", "age", "date_joined")

display(user_df)

## Cleaning Data

### Pinterest Post Data

In [None]:
# drop dulpicate rows
pin_df = pin_df.dropDuplicates()
# Replacing missing entries and irrelevant data with None
clean_pin_df = pin_df.replace({'User Info Error': None}, subset=['follower_count'])
clean_pin_df = clean_pin_df.replace({'No description available Story format': None}, subset=['description'])
clean_pin_df = clean_pin_df.replace({'Image src error.': None}, subset=['image_src'])
clean_pin_df = clean_pin_df.replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset=['tag_list'])
clean_pin_df = clean_pin_df.replace({'No Title Data Available': None}, subset=['title'])
# Cast 'follower_count' to integer data type, but first convert any "k" and "M" to number
clean_pin_df = clean_pin_df.withColumn(
    "follower_count",
    when(
        col("follower_count").contains("k"),
        regexp_extract(col("follower_count"), "(\d+(.\d+)?)", 1).cast(DoubleType())
        * 1000
    )
    .when(
        col("follower_count").contains("M"),
        regexp_extract(col("follower_count"), "(\d+(.\d+)?)", 1).cast(DoubleType())
        * 1000000
    )
    .otherwise(regexp_extract(col("follower_count"), "(\d+(.\d+)?)", 1).cast("integer"))
    .cast("integer")
)
# cast any numeric columns to a numeric data type 
clean_pin_df = clean_pin_df.withColumn("downloaded", clean_pin_df["downloaded"].cast("integer")) \
      .withColumn("index", clean_pin_df["index"].cast("integer"))
      # clean save location column to contain only relative path
clean_pin_df = clean_pin_df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
clean_pin_df = clean_pin_df.withColumnRenamed("index", "ind")
# reorder columns of cleaned pinterest data dataframe
clean_pin_df = clean_pin_df.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

clean_pin_df.printSchema()

### Write data to delta table


In [None]:
clean_pin_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/mnt/data checkpoint_dir") \
.outputMode("append") \
.table("124714cdee67_pin_table")

### Geolocation Data


In [None]:
# drop dulpicate rows
geo_df = geo_df.dropDuplicates()
clean_geo_df = geo_df \
    .withColumn("coordinates", array("latitude", "longitude")) \
    .drop("latitude", "longitude") \
    .withColumn("timestamp", col("timestamp").cast(TimestampType())) \
    .select("ind", "country", "coordinates", "timestamp")        

In [None]:
clean_geo_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/mnt/data checkpoint_dir") \
.outputMode("append") \
.table("124714cdee67_geo_table")

### User data

In [None]:
# drop dulpicate rows
user_df = user_df.dropDuplicates()
clean_user_df = user_df \
    .withColumn("user_name", concat("first_name",  lit(" "), "last_name")) \
    .drop("first_name", "last_name") \
    .withColumn("date_joined", col("date_joined").cast(TimestampType())) \
    .select("ind", "user_name", "age", "date_joined")

In [None]:
clean_user_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/mnt/data checkpoint_dir") \
.outputMode("append") \
.table("124714cdee67_user_table")