In [None]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

In [None]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
from pyspark.sql.functions import from_json, col, base64
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, FloatType, StringType
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

kinesisStreamName = "streaming-0e2b04098249-pin"
regionName = "us-east-1"

# Read data from Kinesis stream using structured streaming
streaming_df_pin_kinesis = spark.readStream \
    .format("kinesis") \
    .option("streamName", kinesisStreamName) \
    .option("regionName", regionName) \
    .option("initialPosition", "TRIM_HORIZON") \
    .option("format", "json") \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .option("inferSchema", "true") \
    .load()

# define the schema for the table (must use the correct column names)
schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# Define a UDF to deserialize the data column
deserialise_udf_pin = from_json(col("data").cast("string"), schema)

# Apply the UDF and select the desired columns
streaming_df_pin = streaming_df_pin_kinesis.withColumn("cast_data_pin", deserialise_udf_pin) \
    .select("cast_data_pin.index", "cast_data_pin.unique_id", "cast_data_pin.title", "cast_data_pin.description", "cast_data_pin.poster_name", "cast_data_pin.follower_count", "cast_data_pin.tag_list", "cast_data_pin.is_image_or_video", "cast_data_pin.image_src", "cast_data_pin.downloaded", "cast_data_pin.save_location", "cast_data_pin.category")

display(streaming_df_pin)

In [None]:
kinesisStreamName = "streaming-0e2b04098249-geo"
regionName = "us-east-1"

# Read data from Kinesis stream using structured streaming
streaming_df_geo_kinesis = spark.readStream \
    .format("kinesis") \
    .option("streamName", kinesisStreamName) \
    .option("regionName", regionName) \
    .option("initialPosition", "TRIM_HORIZON") \
    .option("format", "json") \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .option("inferSchema", "true") \
    .load()


# Define the schema for the table
schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

# Define a UDF to deserialize the data column
deserialise_udf_geo = from_json(col("data").cast("string"), schema)

# Apply the UDF and select the desired columns
streaming_df_geo = streaming_df_geo_kinesis.withColumn("cast_data_geo", deserialise_udf_geo) \
    .select("cast_data_geo.ind", "cast_data_geo.timestamp", "cast_data_geo.latitude", "cast_data_geo.longitude", "cast_data_geo.country")

display(streaming_df_geo)

In [None]:
kinesisStreamName = "streaming-0e2b04098249-user"
regionName = "us-east-1"

# Read data from Kinesis stream using structured streaming
streaming_df_user_kinesis = spark.readStream \
    .format("kinesis") \
    .option("streamName", kinesisStreamName) \
    .option("regionName", regionName) \
    .option("initialPosition", "TRIM_HORIZON") \
    .option("format", "json") \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .option("inferSchema", "true") \
    .load()


# Define the schema for the table
schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

# Define a UDF to deserialize the data column
deserialise_udf_user = from_json(col("data").cast("string"), schema)

# Apply the UDF and select the desired columns
streaming_df_user = streaming_df_user_kinesis.withColumn("cast_data_user", deserialise_udf_user) \
    .select("cast_data_user.ind", "cast_data_user.first_name", "cast_data_user.last_name", "cast_data_user.age", "cast_data_user.date_joined")

display(streaming_df_user)

In [None]:
### Transforming and cleaning the data

In [None]:
from pyspark.sql.functions import regexp_replace, col

streaming_df_pin = streaming_df_pin.withColumn(
    "description",
    when(
        (streaming_df_pin.description == "Untitled") |
        (streaming_df_pin.description == "No description available") |
        (streaming_df_pin.description == "No description available Story format"),
        "None"
    ).otherwise(streaming_df_pin.description)
)

streaming_df_pin = streaming_df_pin.withColumn(
    "tag_list",
    when(
        (streaming_df_pin.tag_list == "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e")|
        (streaming_df_pin.tag_list == "0"),
        "None"
    ).otherwise(streaming_df_pin.tag_list)
)

streaming_df_pin = streaming_df_pin.withColumn(
    "title",
    when(
        (streaming_df_pin.title == "No Title Data Available"),
        "None"
    ).otherwise(streaming_df_pin.title)
)

streaming_df_pin = streaming_df_pin.withColumn(
    "follower_count",
    when(
        (streaming_df_pin.follower_count == "User Info Error"),
        "0"
    ).otherwise(streaming_df_pin.follower_count)
)

streaming_df_pin = streaming_df_pin.withColumn(
    "image_src",
    when(
        (streaming_df_pin.image_src == "Image src error."),
        "None"
    ).otherwise(streaming_df_pin.image_src)
)

streaming_df_pin = streaming_df_pin.withColumn(
    "poster_name",
    when(
        (streaming_df_pin.poster_name == "User Info Error"),
        "None"
    ).otherwise(streaming_df_pin.poster_name)
)

streaming_df_pin = streaming_df_pin.withColumn(
    "downloaded",
    when(
        (streaming_df_pin.downloaded == "None"),
        "0"
    ).otherwise(streaming_df_pin.downloaded)
)

streaming_df_pin = streaming_df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
streaming_df_pin = streaming_df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))

# change the datatype of the "follower_count" column to int
streaming_df_pin = streaming_df_pin.withColumn("follower_count", col("follower_count").cast("int"))

streaming_df_pin = streaming_df_pin.withColumnRenamed("index", "ind")

streaming_df_pin = streaming_df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

streaming_df_pin = streaming_df_pin.select(col("ind"), col("unique_id"), col("title"), col("description"), col("follower_count"), col("poster_name"), col("tag_list"), col("is_image_or_video"), col("image_src"), col("save_location"), col("category"))

display(streaming_df_pin)

In [None]:
from pyspark.sql.functions import array

streaming_df_geo = streaming_df_geo.withColumn("coordinates", array("latitude", "longitude"))
streaming_df_geo = streaming_df_geo.drop("latitude", "longitude")

# change the datatype of the "timestamp" column to timestamp
streaming_df_geo = streaming_df_geo.withColumn("timestamp", col("timestamp").cast("timestamp"))

streaming_df_geo = streaming_df_geo.select(col("ind"), col("country"), col("coordinates"), col("timestamp"))

display(streaming_df_geo)

In [None]:
from pyspark.sql.functions import concat

streaming_df_user = streaming_df_user.withColumn('user_name', concat(streaming_df_user.first_name, lit(' '), streaming_df_user.last_name))

streaming_df_user = streaming_df_user.drop("first_name", "last_name")

# change the datatype of the "date_joined" column to timestamp
streaming_df_user = streaming_df_user.withColumn("date_joined", col("date_joined").cast("timestamp"))

streaming_df_user = streaming_df_user.select(col("ind"), col("user_name"), col("age"), col("date_joined"))

display(streaming_df_user)

In [None]:
### Save the data to Delta Tables on Databricks

In [None]:
# Write the streaming DataFrame to a temporary table
streaming_df_pin.writeStream.format("memory").queryName("pin_temp_table").start()

# Read the temporary table as a non-streaming DataFrame
not_streaming_df_pin = spark.sql("SELECT * FROM pin_temp_table")

# Display the non-streaming DataFrame
display(not_streaming_df_pin)

In [None]:
# Write the streaming DataFrame to a temporary table
streaming_df_geo.writeStream.format("memory").queryName("geo_temp_table").start()

# Read the temporary table as a non-streaming DataFrame
not_streaming_df_geo = spark.sql("SELECT * FROM geo_temp_table")

# Display the non-streaming DataFrame
display(not_streaming_df_geo)

In [None]:
# Write the streaming DataFrame to a temporary table
streaming_df_user.writeStream.format("memory").queryName("user_temp_table").start()

# Read the temporary table as a non-streaming DataFrame
not_streaming_df_user = spark.sql("SELECT * FROM user_temp_table")

# Display the non-streaming DataFrame
display(not_streaming_df_user)

In [None]:
# Save the DataFrame's as Delta tables
not_streaming_df_pin.write.format("delta").mode("overwrite").saveAsTable("0e2b04098249_pin_table")

not_streaming_df_geo.write.format("delta").mode("overwrite").saveAsTable("0e2b04098249_geo_table")

not_streaming_df_user.write.format("delta").mode("overwrite").saveAsTable("0e2b04098249_user_table")