##Milestone 8 Read Transform Write Kinesis Stream on Databricks##

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','Kinesis-Prod-Stream') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# Define geo structure
geo_struct = StructType([
    StructField("ind", IntegerType(),True),
    StructField("timestamp", StringType(),True),
    StructField("latitude", StringType(),True),
    StructField("longitude", StringType(),True),
    StructField("country", StringType(),True)                        
])

df_geo = df.filter(df.partitionKey == "streaming-57e94de2a910-geo")
# Decode JSON Data
df_geo = df_geo.select(from_json(col("data").cast("string"), geo_struct).alias("parsed_value")).select("parsed_value.*")

In [0]:
# Define pin structure
pin_struct = StructType([
    StructField("index", IntegerType(),True),
    StructField("unique_id", StringType(),True),
    StructField("title", StringType(),True),
    StructField("description", StringType(),True),
    StructField("poster_name", StringType(),True),
    StructField("follower_count", StringType(),True),
    StructField("tag_list", StringType(),True),
    StructField("is_image_or_video", StringType(),True),
    StructField("image_src", StringType(),True),
    StructField("downloaded", StringType(),True),
    StructField("save_location", StringType(),True),
    StructField("category", StringType(),True)
])
df_pin = df.filter(df.partitionKey == "streaming-57e94de2a910-pin")
# Decode JSON Data
df_pin = df_pin.select(from_json(col("data").cast("string"), pin_struct).alias("parsed_value")).select("parsed_value.*")

In [0]:
# Define user structure
user_struct = StructType([
    StructField("ind", IntegerType(),True),
    StructField("first_name", StringType(),True),
    StructField("last_name", StringType(),True),
    StructField("age", IntegerType(),True),
    StructField("date_joined", StringType(),True)
])
df_user = df.filter(df.partitionKey == "streaming-57e94de2a910-user")
# Decode JSON Data
df_user = df_user.select(from_json(col("data").cast("string"), user_struct).alias("parsed_value")).select("parsed_value.*")

In [0]:
# Clean pin data

from pyspark.sql.functions import when, col, lit, regexp_replace

# Replace empty entries and entries with no relevant data in each column with Nones
replace_values = ["", "N/A", "null", "No description available", "No description available Story format", "User Info Error", "Image src error.", "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", "No Title Data Available"]

# Replace empty and irrelevant entries with None for each column
df_pin = df_pin.select([
    when(col(c).isin(replace_values) | (col(c) == ""), lit(None)).otherwise(col(c)).alias(c)
    for c in df_pin.columns
])

# Filter rows where 'downloaded' is 1 or 0
df_pin = df_pin.filter((col("downloaded") == 1) | (col("downloaded") == 0))


In [0]:

# Step 1: Standardize follower_count values
df_pin = df_pin.withColumn(
    "follower_count",
    # Replace 'k' with '000' and 'M' with '000000'
    when(col("follower_count").endswith("k"), regexp_replace(col("follower_count"), "k", "000").cast("double"))
    .when(col("follower_count").endswith("M"), regexp_replace(col("follower_count"), "M", "000000").cast("double"))
    .otherwise(col("follower_count").cast("double"))  # Handle numeric values like '25'
)

# Step 2: Drop rows where follower_count is null or not valid
df_pin = df_pin.filter(col("follower_count").isNotNull())

df_pin = df_pin.withColumn("follower_count", col("follower_count").cast(IntegerType()))

# rename the index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")


In [0]:
# column order
column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]

# Reorder the DataFrame columns
df_pin = df_pin.select(column_order)


In [0]:
# Save cleaned data
table_name = "57e94de2a910_pin_table" 
df_pin.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/57e94de2a910_pin/") \
    .table(table_name) 


<pyspark.sql.streaming.query.StreamingQuery at 0x7f2ed8d7b400>

In [0]:
from pyspark.sql.functions import array, col, to_timestamp

# Create a new column as an array of 'column1' and 'column2'
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Drop the latitude and longitude columns
df_geo = df_geo.drop("latitude", "longitude")

# Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss"))

# Filter out rows where conversion failed (timestamp is null)
df_geo = df_geo.filter(df_geo["timestamp"].isNotNull())

# New column order
column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp",
]

# Reorder the DataFrame columns
df_geo = df_geo.select(column_order)

In [0]:
# Save cleaned data
table_name = "57e94de2a910_geo_table" 
df_geo.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/57e94de2a910_geo/") \
    .table(table_name) 

<pyspark.sql.streaming.query.StreamingQuery at 0x7f2ec8bc87c0>

In [0]:
from pyspark.sql.functions import concat, lit, col

# Create a new column username that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("username", concat(col("first_name"), lit(" "), col("last_name")))

# Drop the latitude and longitude columns
df_user = df_user.drop("first_name", "last_name")

# Convert the date joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", to_timestamp(col("date_joined"), "yyyy-MM-dd'T'HH:mm:ss"))

# Filter out rows where conversion failed (timestamp_column is null)
df_user = df_user.filter(df_user["date_joined"].isNotNull())

# New column order
column_order = [
    "ind",
    "username",
    "age",
    "date_joined"
]

# Reorder the DataFrame columns
df_user = df_user.select(column_order)

In [0]:
# Save cleaned data
table_name = "57e94de2a910_user_table" 
df_user.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/57e94de2a910_user/") \
    .table(table_name) 

<pyspark.sql.streaming.query.StreamingQuery at 0x7f2ec86a17b0>