### Import required libraries:

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

### Read authentication credentials:

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
#print(ACCESS_KEY, ', ', SECRET_KEY)
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

### Read data streams from AWS Kinesis into dataframes:

In [0]:
#*************************************************
# 1. PINTEREST DATA:
#*************************************************

df_stream_pin = spark \
                .readStream \
                .format('kinesis') \
                .option('streamName','streaming-0a3c6c045333-pin') \
                .option('initialPosition','earliest') \
                .option('region','us-east-1') \
                .option('awsAccessKey', ACCESS_KEY) \
                .option('awsSecretKey', SECRET_KEY) \
                .load()

#*************************************************
# 2. GEOLOCATION DATA:
#*************************************************

df_stream_geo = spark \
                .readStream \
                .format('kinesis') \
                .option('streamName','streaming-0a3c6c045333-geo') \
                .option('initialPosition','earliest') \
                .option('region','us-east-1') \
                .option('awsAccessKey', ACCESS_KEY) \
                .option('awsSecretKey', SECRET_KEY) \
                .load()

#*************************************************
# 3. USER DATA:
#*************************************************

df_stream_user = spark \
                .readStream \
                .format('kinesis') \
                .option('streamName','streaming-0a3c6c045333-user') \
                .option('initialPosition','earliest') \
                .option('region','us-east-1') \
                .option('awsAccessKey', ACCESS_KEY) \
                .option('awsSecretKey', SECRET_KEY) \
                .load()

### Data deserialization:

In [0]:
#*************************************************
# 1. PINTEREST DATA:
#*************************************************

df_pin_schema = StructType([
    StructField("index", StringType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("follower_count", StringType()),
    StructField("poster_name", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", StringType()),  # Assuming downloaded is of StringType
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

df_pin = df_stream_pin \
    .selectExpr("CAST(data as STRING)") \
    .select(from_json(col("data"), df_pin_schema).alias("pin_data")) \
    .select("pin_data.*")

#*************************************************
# 2. GEOLOCATION DATA:
#*************************************************

df_geo_schema = StructType([
    StructField("ind", StringType()),
    StructField("timestamp", StringType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("country", StringType())
])

df_geo = df_stream_geo \
    .selectExpr("CAST(data as STRING)") \
    .select(from_json(col("data"), df_geo_schema).alias("geo_data")) \
    .select("geo_data.*")

#*************************************************
# 3. USER DATA:
#*************************************************

df_user_schema = StructType([
    StructField("ind", StringType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),
    StructField("date_joined", TimestampType())
])

df_user = df_stream_user \
    .selectExpr("CAST(data as STRING)") \
    .select(from_json(col("data"), df_user_schema).alias("user_data")) \
    .select("user_data.*")

### Data cleaning:

In [0]:
#*************************************************
# 1. PINTEREST DATA CLEANING:
#*************************************************

# Replace empty entries and entries with no relevant data in each column with Nones
df_pin_cleaned = df_pin.withColumn("category", when(df_pin["category"].isNull(), None).otherwise(df_pin["category"]))
df_pin_cleaned = df_pin.withColumn("description", when(df_pin["description"].isNull(), None).otherwise(df_pin["description"]))
df_pin_cleaned = df_pin.withColumn("downloaded", when(df_pin["downloaded"].isNull(), None).otherwise(df_pin["downloaded"]))
df_pin_cleaned = df_pin.withColumn("follower_count", when(df_pin["follower_count"].isNull(), None).otherwise(df_pin["follower_count"]))
# Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int.
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", 
                                when(col("follower_count").endswith("k"), regexp_replace(col("follower_count"), "k", "").cast("int")*1000)
                                .when(col("follower_count").endswith("k"), regexp_replace(col("follower_count"), "M", "").cast("int")*1000000)
                                .otherwise(col("follower_count").cast("int"))
                                )
# Ensure that each column containing numeric data has a numeric data type
df_pin_cleaned = df_pin_cleaned.withColumn("downloaded",col("downloaded").cast("int"))
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count",col("follower_count").cast("int"))
# Clean the data in the save_location column to include only the save location path
df_pin_cleaned = df_pin_cleaned.withColumn("save_location", regexp_replace(col("save_location"), "^Local save in ", ""))
# Rename the index column to ind
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")
df_pin_cleaned = df_pin_cleaned.withColumn("ind", col("ind").cast("int"))
# Reorder the DataFrame columns
pin_columns_reorder = [ "ind",
                        "unique_id",
                        "title",
                        "description",
                        "follower_count",
                        "poster_name",
                        "tag_list",
                        "is_image_or_video",
                        "image_src",
                        "save_location",
                        "category"
                    ]
df_pin_cleaned = df_pin_cleaned.select(pin_columns_reorder)

#*************************************************
# 2. GEOLOCATION DATA CLEANINIG:
#*************************************************

# Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo_cleaned = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))
# Drop the latitude and longitude columns from the DataFrame
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")
# Convert the timestamp column from a string to a timestamp data type
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", to_timestamp("timestamp"))
# Reorder the DataFrame columns
geo_columns_reorder = [ "ind",
                        "country",
                        "coordinates",
                        "timestamp"
                    ]
df_geo_cleaned = df_geo_cleaned.select(geo_columns_reorder)

#*************************************************
# 3. USER DATA CLEANING:
#*************************************************

# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user_cleaned = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))
# Drop the first_name and last_name columns from the DataFrame
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")
# Convert the date_joined column from a string to a timestamp data type
df_user_cleaned = df_user_cleaned.withColumn("date_joined", to_timestamp("date_joined"))
# Reorder the DataFrame columns
user_columns_reorder = [ "ind",
                        "user_name",
                        "age",
                        "date_joined"
                    ]
df_user_cleaned = df_user_cleaned.select(user_columns_reorder)

### Display data streams:

In [0]:
display(df_pin_cleaned)

In [0]:
display(df_geo_cleaned)

In [0]:
display(df_user_cleaned)

### Write data streams to Databricks (Delta Tables):

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
#dbutils.fs.rm("/tmp/kinesis/_checkpoints/pin_table", True)
#dbutils.fs.rm("/tmp/kinesis/_checkpoints/geo_table", True)
#dbutils.fs.rm("/tmp/kinesis/_checkpoints/user_table", True)

In [0]:
#*************************************************
# 1. WRITE PINTEREST DATA:
#*************************************************

df_pin_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/pin_table") \
  .table("0a3c6c045333_pin_table")

#*************************************************
# 2. WRITE GEOLOCATION DATA:
#*************************************************

df_geo_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/geo_table") \
  .table("0a3c6c045333_geo_table")

#*************************************************
# 3. WRITE USER DATA:
#*************************************************

df_user_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/user_table") \
  .table("0a3c6c045333_user_table")