In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false


key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
from pyspark.sql.types import StructType, StructField, StringType

df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12acc47946a5-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_pin = df_pin.selectExpr("CAST(data as STRING)")


schema = StructType(
    [
        StructField('index', IntegerType(), True),
        StructField('unique_id', StringType(), True),
        StructField('title', StringType(), True),
        StructField('description', StringType(), True),
        StructField('poster_name', StringType(), True),
        StructField('follower_count', IntegerType(), True),
        StructField('tag_list', StringType(), True),
        StructField('is_image_or_video', StringType(), True),
        StructField('image_src', StringType(), True),
        StructField('downloaded', BooleanType(), True),
        StructField('save_location', StringType(), True),
        StructField('category', StringType(), True)    ]
)

df_pin = df_pin.withColumn("data", from_json("data", schema)).select(col('data.*'))

#display(df_pin)

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12acc47946a5-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = df_geo.selectExpr("CAST(data as STRING)")


schema = StructType(
    [
        StructField('ind', IntegerType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('latitude', FloatType(), True),
        StructField('longitude', FloatType(), True),
        StructField('country', StringType(), True)
    ]
)

df_geo = df_geo.withColumn("data", from_json("data", schema)).select(col('data.*'))

#display(df_geo)



df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12acc47946a5-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = df_user.selectExpr("CAST(data as STRING)")


schema = StructType(
    [
        StructField('ind', IntegerType(), True),
        StructField('first_name', StringType(), True),
        StructField('last_name', StringType(), True),
        StructField('age', IntegerType(), True),
        StructField('date_joined', TimestampType(), True)
    ]
)

df_user = df_user.withColumn("data", from_json("data", schema)).select(col('data.*'))

#display(df_user)


In [0]:

cleaned_df_pin = df_pin.replace({'No description available Story format': None}, subset=['description'])
cleaned_df_pin = cleaned_df_pin.replace({'User Info Error': None}, subset=['follower_count'])
cleaned_df_pin = cleaned_df_pin.replace({'Image src error.': None}, subset=['image_src'])
cleaned_df_pin = cleaned_df_pin.replace({'User Info Error': None}, subset=['poster_name'])
cleaned_df_pin = cleaned_df_pin.replace({'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None}, subset=['tag_list'])
cleaned_df_pin = cleaned_df_pin.replace({'No Title Data Available': None}, subset=['title'])

cleaned_df_pin = cleaned_df_pin.replace({'k': '000'}, subset=['follower_count'])
cleaned_df_pin = cleaned_df_pin.replace({'m': '000000'}, subset=['follower_count'])

cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
                   

cleaned_df_pin = cleaned_df_pin.withColumn("follower_count", cleaned_df_pin["follower_count"].cast("int"))
cleaned_df_pin.dtypes


cleaned_df_pin = cleaned_df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

cleaned_df_pin = cleaned_df_pin.withColumnRenamed("index", "ind")
cleaned_df_pin = cleaned_df_pin.select("ind", "unique_id", "title","description","follower_count","poster_name","tag_list","is_image_or_video","image_src","save_location","category")
display(cleaned_df_pin)


cleaned_df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12acc47946a5_pin_table")


geo_df_clean =  df_geo.withColumn("coordinates", concat("latitude", lit(","), "longitude"))
geo_df_clean = geo_df_clean.withColumn("timestamp", geo_df_clean["timestamp"].cast("timestamp"))
geo_df_clean.dtypes

geo_df_clean = geo_df_clean.select("ind","country","coordinates","timestamp")

display(geo_df_clean)

geo_df_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12acc47946a5_geo_table")

user_df_clean = df_user.withColumn("user_name", concat("first_name", lit(" "), "last_name"))
user_df_clean = user_df_clean.withColumn("date_joined", user_df_clean["date_joined"].cast("timestamp"))
user_df_clean = user_df_clean.select("ind","user_name","age","date_joined")


user_df_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12acc47946a5_user_table")