In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
--Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

In [0]:
from pyspark.sql import *
from pyspark.sql.types import *


#Load the kinesis data from stream to databricks

df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12ffc5aba733-pin') \
.option('initialPosition','latest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
df_pin = df_pin.selectExpr("CAST(data as STRING)")

#Creation of schema to convert and parse.

schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),  
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", LongType(), True),  
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

#Parse data into a table from a JSON string via the schema. 

df_pin = df_pin.withColumn("jsonData", from_json(col("data").cast("string"), schema))

#Rename columns

df_pin_kinesis = df_pin.select(
    col("jsonData.index").alias("index"),
    col("jsonData.unique_id").alias("unique_id"),
    col("jsonData.title").alias("title"),
    col("jsonData.description").alias("description"),
    col("jsonData.poster_name").alias("poster_name"),
    col("jsonData.follower_count").alias("follower_count"),
    col("jsonData.tag_list").alias("tag_list"),
    col("jsonData.is_image_or_video").alias("is_image_or_video"),
    col("jsonData.image_src").alias("image_src"),
    col("jsonData.downloaded").alias("downloaded"),
    col("jsonData.save_location").alias("save_location"),
    col("jsonData.category").alias("category")
)

# Replace empty entries or invalid data with None

df_pin_cleaned = df_pin_kinesis.replace(["", "N/A","User Info Error", "No description available Story format", "Image src error","N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", "No Title Data Available",], None)

# Clean and cast to IntegerType
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", regexp_replace(col("follower_count"), " ", ""))
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count",
    when(col("follower_count").contains("k"), 
         regexp_replace(col("follower_count"), "k", "").cast("float") * 1000)
    .when(col("follower_count").contains("m"), 
         regexp_replace(col("follower_count"), "m", "").cast("float") * 1000000)
    .when(col("follower_count").contains("M"), 
         regexp_replace(col("follower_count"), "M", "").cast("float") * 1000000)
    .otherwise(col("follower_count").cast("float"))
)

df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", col("follower_count").cast("int"))


# More cleaning
df_pin_cleaned = df_pin_cleaned.withColumn("downloaded", col("downloaded").cast(IntegerType()))
df_pin_cleaned = df_pin_cleaned.withColumn("index", col("index").cast(IntegerType()))

df_pin_cleaned = df_pin_cleaned.withColumn("save_location", split(col("save_location"), " ").getItem(3))
                
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")

df_pin_cleaned = df_pin_cleaned.select("ind", "unique_id", "title", "description", 
                                       "follower_count", "poster_name", "tag_list", 
                                       "is_image_or_video", "image_src", "save_location", "category")
display(df_pin_cleaned)

#Write to delta table

df_pin_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12ffc5aba733_pin_table")


In [0]:
from pyspark.sql import *
from pyspark.sql.types import *


spark.sql("TRUNCATE TABLE default.12ffc5aba733_user_table")

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12ffc5aba733-user') \
.option('initialPosition','latest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
df_user = df_user.selectExpr("CAST(data as STRING)")

schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True),
])

df_user = df_user.withColumn("jsonData", from_json(col("data").cast("string"), schema))

df_user_kinesis = df_user.select(
    col("jsonData.ind").alias("ind"),
    col("jsonData.first_name").alias("first_name"),
    col("jsonData.last_name").alias("last_name"),
    col("jsonData.age").alias("age"),
    col("jsonData.date_joined").alias("date_joined"),
)

df_user_cleaned = df_user_kinesis.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drop the columns from the DataFrame.
df_user_cleaned = df_user_cleaned.drop("first_name").drop("last_name")

# Convert the column from a string to a timestamp data type.
df_user_cleaned = df_user_cleaned.withColumn("date_joined", col("date_joined").cast(TimestampType()))

#  Reorder the DataFrame columns 
df_user_cleaned = df_user_cleaned.select("ind", "user_name", "age", "date_joined")

display(df_user_cleaned)

#Write to delta table

df_user_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12ffc5aba733_user_table")


In [0]:
from pyspark.sql import *
from pyspark.sql.types import *

spark.sql("TRUNCATE TABLE default.12ffc5aba733_geo_table")

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12ffc5aba733-geo') \
.option('initialPosition','latest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
df_geo  = df_geo.selectExpr("CAST(data as STRING)")


schema_geo = StructType([
    StructField("ind", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True),
])

df_geo = df_geo.withColumn("jsonData", from_json(col("data").cast("string"), schema_geo))

df_geo_kinesis = df_geo.select(
    col("jsonData.ind").alias("ind"),
    col("jsonData.timestamp").alias("timestamp"),
    col("jsonData.latitude").alias("latitude"),
    col("jsonData.longitude").alias("longitude"),
    col("jsonData.country").alias("country"),
)

df_geo_cleaned = df_geo_kinesis.withColumn("coordinates", array(col("latitude"), col("longitude")))
# Drop the columns from the DataFrame.
df_geo_cleaned = df_geo_cleaned.drop("latitude").drop("longitude")

# Convert the column from a string to a timestamp data type.
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Reorder the DataFrame columns 
df_geo_cleaned = df_geo_cleaned.select("ind", "country", "coordinates", "timestamp")

display(df_geo_cleaned)

# Write the streaming data to a file sink
df_geo_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12ffc5aba733_geo_table")


In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)