In [0]:
import json
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

In [0]:
# Define a streaming schema using StructType
pin_streaming_schema = StructType([
    StructField("index", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True),
    StructField("follower_count", StringType(), True),
    # Add more fields as needed
])

In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_pin_cast = df_pin.selectExpr("CAST(data as STRING) data")

display(df_pin_cast)

In [0]:
# Define a streaming schema using StructType
pin_streaming_schema = StructType([
    StructField("index", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("unique_id", StringType(), True),
    # Add more fields as needed
])

df_test = df_pin_cast.select(from_json("data", pin_streaming_schema).alias("data")).select("data.*")

In [0]:
display(df_test)

In [0]:
cleaned_pin1 = df_test.replace({'User Info Error': None,
                               'No description available Story format':None,
                               'Image src error.':None,
                               'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e':None,
                               'No Title Data Available':None
                               })

cleaned_pin2 = cleaned_pin1.withColumn('follower_count', 
                                       when(col('follower_count').endswith('k'), regexp_replace(col('follower_count'), 'k', '').cast('int') * 1000)
                                       .when(col('follower_count').endswith('M'), regexp_replace(col('follower_count'), 'M', '').cast('int') * 1000000)
                                       .otherwise(col('follower_count').cast('int')))

cleaned_pin3 = (cleaned_pin2
                #.withColumn("follower_count", cleaned_pin2["follower_count"].cast('float'))
                .withColumn("downloaded", cleaned_pin2["downloaded"].cast('int'))
                .withColumn("ind", cleaned_pin2["index"].cast('int')))

cleaned_pin4 = cleaned_pin3.withColumn('save_location', regexp_replace('save_location','Local save in ', ""))

cleaned_pin5 = cleaned_pin4.select('ind', 'unique_id', 'title', 'description', 'follower_count', 'poster_name', 'tag_list', 'is_image_or_video', 'image_src', 'save_location', 'category')

In [0]:
display(cleaned_pin5)

In [0]:
cleaned_pin5.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("")

In [0]:
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_geo_cast = df_geo.selectExpr("CAST(data as STRING) data")

display(df_geo_cast)

In [0]:
# Define a streaming schema using StructType
geo_streaming_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("title", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True),
    # Add more fields as needed
])

df_geo_final = df_geo_cast.select(from_json("data", geo_streaming_schema).alias("data")).select("data.*")
display(df_geo_final)

In [0]:
clean_geo = df_geo_final.withColumn("coordinates", array("latitude", "longitude"))
clean_geo = clean_geo.drop("latitude", "longitude")
clean_geo = clean_geo.withColumn("timestamp", to_timestamp("timestamp"))
clean_geo = clean_geo.select("ind", "country", "coordinates", "timestamp")

display(clean_geo)

In [0]:
clean_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("")

In [0]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_user_cast = df_user.selectExpr("CAST(data as STRING) data")

display(df_user_cast)

In [0]:
# Define a streaming schema using StructType
user_streaming_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True),
    # Add more fields as needed
])

df_user_final = df_user_cast.select(from_json("data", user_streaming_schema).alias("data")).select("data.*")
display(df_user_final)

In [0]:
clean_user = df_user_final.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))
clean_user = clean_user.drop("first_name", "last_name")
clean_user = clean_user.withColumn("date_joined", to_timestamp("date_joined"))
clean_user = clean_user.select("ind", "user_name", "age", "date_joined")

display(clean_user)

In [0]:
clean_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("")