In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
--Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
# Data streaming

# Pin data stream
df_pin = spark.readStream \
    .format('kinesis') \
    .option('streamName', 'streaming-12d03c8b5ccd-pin') \
    .option('initialPosition', 'earliest') \
    .option('region', 'us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()

# Geo data stream
df_geo = spark.readStream \
    .format('kinesis') \
    .option('streamName', 'streaming-12d03c8b5ccd-geo') \
    .option('initialPosition', 'earliest') \
    .option('region', 'us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()

# User data stream
df_user = spark.readStream \
    .format('kinesis') \
    .option('streamName', 'streaming-12d03c8b5ccd-user') \
    .option('initialPosition', 'earliest') \
    .option('region', 'us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()




In [0]:
%run "/Workspace/Users/carla.costan0@gmail.com/data_cleaning"

In [0]:
# Clean the data

# Deserialise the 'data' column
df_pin = df_pin.selectExpr("CAST(data as STRING)")

# Apply schema and clean pin data
pin_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

df_pin = df_pin.select(from_json(col("data"), pin_schema).alias("pin_data")).select("pin_data.*")
cleaned_df_pin = clean_df_pin(df_pin)

# Deserialise the 'data' column
df_geo = df_geo.selectExpr("CAST(data as STRING)")

# Apply schema and clean geo data
geo_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

df_geo = df_geo.select(from_json(col("data"), geo_schema).alias("geo_data")).select("geo_data.*")
cleaned_df_geo = clean_df_geo(df_geo)

# Deserialise the 'data' column
df_user = df_user.selectExpr("CAST(data as STRING)")

# Apply schema and clean user data
user_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True)
])

df_user = df_user.select(from_json(col("data"), user_schema).alias("user_data")).select("user_data.*")
cleaned_df_user = clean_df_user(df_user)


In [0]:
# Write cleaned data to the Delta table

# Pin data
cleaned_df_pin.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints_pin/") \
    .table("12d03c8b5ccd_pin_table") 

# Geo data
cleaned_df_geo.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints_geo/") \
    .table("12d03c8b5ccd_geo_table")

# User data
cleaned_df_user.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints_user/") \
    .table("12d03c8b5ccd_user_table")

# Step 10: Clean up checkpoint folders before re-running (only if necessary)
dbutils.fs.rm("/tmp/kinesis/_checkpoints_pin/", True)
dbutils.fs.rm("/tmp/kinesis/_checkpoints_geo/", True)
dbutils.fs.rm("/tmp/kinesis/_checkpoints_user/", True)