In [0]:
# Import Libraries

from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

In [0]:
# Setup and Authentication

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Extract the AWS access key and secret key from the Spark DataFrame
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# Disable format checking for the Delta table
spark.conf.set("spark.databricks.delta.formatCheck.enabled", "false")

In [0]:
# Cell 2: Read Streaming Data

def read_stream(kinesis_stream_name):
    """
    Read streaming data from Kinesis.

    Parameters:
    - kinesis_stream_name (str): The name of the Kinesis data stream.

    Returns:
    - pyspark.sql.DataFrame: The DataFrame containing the streaming data.
    """
    df = spark.readStream \
        .format('kinesis') \
        .option('streamName', kinesis_stream_name) \
        .option('initialPosition', 'earliest') \
        .option('region', 'us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()
    return df

# Read streaming data for each stream
pin_data = read_stream("streaming-126802f17de3-pin")
geo_data = read_stream("streaming-126802f17de3-geo")
user_data = read_stream("streaming-126802f17de3-user")

In [0]:
# Define Schema and Deserialize Streaming Data

pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

def deserialise_data(df, schema):
    """
    Deserialize streaming data using the specified schema.

    Parameters:
    - df (pyspark.sql.DataFrame): The DataFrame containing streaming data.
    - schema (pyspark.sql.types.StructType): The schema to be applied to the data.

    Returns:
    - pyspark.sql.DataFrame: The DataFrame with deserialized data.
    """
    df = df.selectExpr("CAST(data as STRING)")
    new_df = df.select(from_json("data", schema)).select("from_json(data).*")
    return new_df 

# Apply deserialization to the streaming DataFrames
df_pin_uncleaned = deserialise_data(pin_data, pin_schema)
df_geo_uncleaned = deserialise_data(geo_data, geo_schema)
df_user_uncleaned = deserialise_data(user_data, user_schema)


In [0]:
# Cleaning Pinterest DataFrame

def cleaning_pin(df):
    """
    Clean the pinterest data DataFrame (df_pin_uncleaned) by performing the following transformations:
    1. Replace empty entries and entries with no relevant data with None in specified columns.
    2. Transform follower_count to ensure every entry is a number and change its data type to int.
    3. Ensure that each column containing numeric data has a numeric data type.
    4. Clean the data in the save_location column to include only the save location path.
    5. Rename the index column to ind.
    6. Reorder the DataFrame columns.

    Parameters:
    - df (pyspark.sql.DataFrame): Input DataFrame to be cleaned.

    Returns:
    - pyspark.sql.DataFrame: Cleaned DataFrame.
    """

    null_dict = {
        "description": "No description available Story format",
        "follower_count": "User Info Error",
        "image_src": "Image src error.",
        "poster_name": "User Info Error",
        "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
        "title": "No Title Data Available"
    }

    for key, value in null_dict.items():
        df = df.na.replace(value, None, key)

    df = df.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
    df = df.withColumn("follower_count", regexp_replace("follower_count", "M", "0000"))
    df = df.withColumn("follower_count", df["follower_count"].cast("int"))
    df = df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
    df = df.withColumnRenamed("index", "ind")
    df = df.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list",
                   "is_image_or_video", "image_src", "save_location", "category")
    return df

# Applying cleaning function
df_pin = cleaning_pin(df_pin_uncleaned)

In [0]:
# Cleaning Geolocation DataFrame

def cleaning_geo(df):
    """
    Clean the geolocation data DataFrame (df_geo_uncleaned) by performing the following transformations:
    1. Create a new column coordinates that contains an array based on the latitude and longitude columns.
    2. Drop the latitude and longitude columns from the DataFrame.
    3. Convert the timestamp column from a string to a timestamp data type.
    4. Reorder the DataFrame columns.

    Parameters:
    - df (pyspark.sql.DataFrame): Input DataFrame to be cleaned.

    Returns:
    - pyspark.sql.DataFrame: Cleaned DataFrame.
    """
    df = df.withColumn("coordinates", array("latitude", "longitude"))
    df = df.drop("latitude", "longitude")
    df = df.withColumn("timestamp", to_timestamp("timestamp"))
    df = df.select("ind", "country", "coordinates", "timestamp")
    return df

        
# Applying cleaning function
df_geo = cleaning_geo(df_geo_uncleaned)

In [0]:
# Cleaning User DataFrame

def cleaning_user(df):
    """
    Clean the user data DataFrame (df_user_uncleaned) by performing the following transformations:
    1. Create a new column user_name that concatenates the information found in the first_name and last_name columns.
    2. Drop the first_name and last_name columns from the DataFrame.
    3. Convert the date_joined column from a string to a timestamp data type.
    4. Reorder the DataFrame columns.

    Parameters:
    - df (pyspark.sql.DataFrame): Input DataFrame to be cleaned.

    Returns:
    - pyspark.sql.DataFrame: Cleaned DataFrame.
    """
    df = df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))
    df = df.drop("first_name", "last_name")
    df = df.withColumn("date_joined", to_timestamp("date_joined"))
    df = df.select("ind", "user_name", "age", "date_joined")
    return df

        
# Applying cleaning function
df_user = cleaning_user(df_user_uncleaned)

In [0]:
# Write Streaming Data to Databricks Delta Table

def write_stream_to_databricks(df, delta_table_name):
    """
    Write streaming data to a Databricks Delta table.

    Parameters:
    - df (pyspark.sql.DataFrame): The DataFrame containing streaming data.
    - delta_table_name (str): The name of the Delta table to which the data should be written.
    """
    df.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"/tmp/kinesis/{delta_table_name}_checkpoints/") \
        .table(delta_table_name)

# Before running the writeStream function again, you will need to delete the checkpoint folder using the following command:
# dbutils.fs.rm("/tmp/kinesis/_checkpoints/", recurse=True)
dbutils.fs.rm("/tmp/kinesis/126802f17de3_pin_table_checkpoints/", recurse=True)
dbutils.fs.rm("/tmp/kinesis/126802f17de3_geo_table_checkpoints/", recurse=True)
dbutils.fs.rm("/tmp/kinesis/126802f17de3_user_table_checkpoints/", recurse=True)

# Write streaming data to Databricks Delta tables
write_stream_to_databricks(df_pin, "126802f17de3_pin_table")
write_stream_to_databricks(df_geo, "126802f17de3_geo_table")
write_stream_to_databricks(df_user, "126802f17de3_user_table")