### Setup

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import urllib

aws_keys_df = spark.read.format("csv")\
.option("header", "true")\
.option("sep", ",")\
.load("/FileStore/tables/authentication_credentials.csv")

ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

### Functions

In [None]:
def get_stream(stream_name: str):
    df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    return df

def deserialize_stream(stream, schema):
    df = stream \
    .selectExpr("CAST(data as STRING)") \
    .withColumn("data", from_json(col("data"), schema)) \
    .select(col("data.*"))
    return df

def add_nulls_to_df_column(df, column, value_to_replace):
    df = df.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return df

def write_stream_df_to_table(df, name: str):
    df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"/tmp/kinesis/0ea903d23769_{name}_table_checkpoints/") \
    .table(f"0ea903d23769_{name}_table")

### Schemas

In [None]:
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

### Get Streams

In [None]:
pin_stream = get_stream('streaming-0ea903d23769-pin')
geo_stream = get_stream('streaming-0ea903d23769-geo')
user_stream = get_stream('streaming-0ea903d23769-user')

df_pin_dirty = deserialize_stream(pin_stream, pin_schema)
df_geo_dirty = deserialize_stream(geo_stream, geo_schema)
df_user_dirty = deserialize_stream(user_stream, user_schema)

### Download Dirty Data

In [None]:
display(df_pin_dirty)
display(df_geo_dirty)
display(df_user_dirty)

### Clean df_pin

In [None]:
df_pin = df_pin_dirty

# Replace empty entries and entries with no relevant data in each column with Nones
df_pin = df_pin.replace(['', ' ', 'NULL', 'null'], [None] * 4)
df_pin = df_pin.withColumn("description", when(col("description") == "No description available Story format", None).otherwise(col("description")))
df_pin = df_pin.withColumn("follower_count", when(col("follower_count") == "User Info Error", None).otherwise(col("follower_count")))
df_pin = df_pin.withColumn("image_src", when(col("image_src") == "Image src error.", None).otherwise(col("image_src")))
df_pin = df_pin.withColumn("poster_name", when(col("poster_name") == "User Info Error", None).otherwise(col("poster_name")))
df_pin = df_pin.withColumn("tag_list", when(col("tag_list") == "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None).otherwise(col("tag_list")))
df_pin = df_pin.withColumn("title", when(col("title") == "No Title Data Available", None).otherwise(col("title")))

# Transform follower_count to ensure every entry is a number and data type is an int
# Remove any non-numeric characters (like 'k' in '136k') and then convert to integer
df_pin = df_pin.withColumn("follower_count", regexp_replace(col("follower_count"), "[^0-9]", ""))
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast(IntegerType()))

# Ensure that each column containing numeric data has a numeric data type
df_pin = df_pin.withColumn("downloaded", col("downloaded").cast(IntegerType()))

# Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace(col("save_location"), "Local save in ", ""))

# Rename the index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")

# Reorder the DataFrame columns
df_pin = df_pin.select(["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"])


display(df_pin)

### Clean df_geo

In [None]:
df_geo = df_geo_dirty

# Create a new column 'coordinates' containing an array of latitude and longitude
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Drop the latitude and longitude columns
df_geo = df_geo.drop("latitude", "longitude")

# Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Reorder the DataFrame columns
df_geo = df_geo.select(["ind", "country", "coordinates", "timestamp"])

display(df_geo)

### Clean df_user

In [None]:
df_user = df_user_dirty

# Create a new column 'user_name' by concatenating 'first_name' and 'last_name'
df_user = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drop the 'first_name' and 'last_name' columns
df_user = df_user.drop("first_name", "last_name")

# Convert the 'date_joined' column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", col("date_joined").cast(TimestampType()))

# Reorder the DataFrame columns
df_user = df_user.select(["ind", "user_name", "age", "date_joined"])

display(df_user)

### Write to Delta Tables

In [None]:
write_stream_df_to_table(df_pin, "pin")
write_stream_df_to_table(df_geo, "geo")
write_stream_df_to_table(df_user, "user")