# Reading the AWS Access Key and Secret Access Key

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# Creating Dataframes 

In [0]:
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12863e427a8f-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12863e427a8f-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12863e427a8f-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# Defining Schemas

In [0]:

pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("country", StringType())
])
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

## Deserealising data

In [0]:

deserialize_df_pin = df_pin.selectExpr("CAST(data as STRING)")
deserialize_df_pin = deserialize_df_pin.withColumn("data", from_json(col("data"), pin_schema))
deserialize_df_pin = deserialize_df_pin.selectExpr("data.*")

deserialize_df_geo = df_geo.selectExpr("CAST(data as STRING)")
deserialize_df_geo = deserialize_df_geo.withColumn("data", from_json(col("data"), geo_schema))
deserialize_df_geo = deserialize_df_geo.selectExpr("data.*")


deserialize_df_user = df_user.selectExpr("CAST(data as STRING)")
deserialize_df_user = deserialize_df_user.withColumn("data", from_json(col("data"), user_schema))
deserialize_df_user = deserialize_df_user.selectExpr("data.*")

## Deleting duplicates

In [0]:
# Removes all duplicated data
df_pin = deserialize_df_pin.drop_duplicates([column_name for column_name, data_type in deserialize_df_pin.dtypes])
df_geo = deserialize_df_geo.drop_duplicates([column_name for column_name, data_type in deserialize_df_geo.dtypes])
df_user = deserialize_df_user.drop_duplicates([column_name for column_name, data_type in deserialize_df_user.dtypes])

In [0]:
from pyspark.sql.functions import col,when
from pyspark.sql.functions import regexp_extract


# Cleaning User dataframe

In [0]:
# Creating username column
df_user = df_user.withColumn("user_name", concat(col("first_name"), col("last_name")))

# Dropping first and last name columns
df_user = df_user.drop("first_name", "last_name")  

# Coverting date_joined column into a timestamp
df_user = df_user.withColumn("date_joined", df_user["date_joined"].cast("timestamp"))

# Reodering columns
df_user = df_user.select(["ind","user_name","age","date_joined"])

# Cleaing Geo Data

In [0]:
# Create an new column coordinates
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Dropping columns Latitude & Longitude
df_geo = df_geo.drop("latitude", "longitude")

# Converting timestamp into a timestamp data type
df_geo = df_geo.withColumn("timestamp", df_geo["timestamp"].cast("timestamp"))

# Reordering columns
df_geo = df_geo.select(["ind", "country", "coordinates", "timestamp"])

# Cleaning Pin Data

In [0]:
# Replace null values into None
df_pin = df_pin.withColumn("category", when(df_pin["category"].isNull(), None).otherwise(df_pin["category"]))
df_pin = df_pin.withColumn("description", when(df_pin["description"].isNull(), None).otherwise(df_pin["description"]))

# Replace entries with no relevant data into None
df_pin = df_pin.withColumn("description", when(col("description").contains("No description"), None).otherwise(col("description")))
df_pin = df_pin.withColumn("description", when(col("description").contains("Untitled"), None).otherwise(col("description")))
df_pin = df_pin.withColumn("follower_count", when(col("follower_count").contains("User Info Error"), None).otherwise(col("follower_count")))
df_pin = df_pin.withColumn("image_src", when(col("image_src").contains("Image src error"), None).otherwise(col("image_src")))
df_pin = df_pin.withColumn("tag_list", when(col("tag_list").isin(["N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e"]), None).otherwise(col("tag_list")))
df_pin = df_pin.withColumn("poster_name", when(col("poster_name").contains("User Info Error"), None).otherwise(col("poster_name")))
df_pin = df_pin.withColumn("title", when(col("title").contains("No Title Data Available"), None).otherwise(col("title")))

# Change M and k inside follower_column into its coresponding value
df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "M", "000000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "k", "000"))

# Change follower_count data type into int
df_pin = df_pin.withColumn("follower_count", df_pin["follower_count"].cast("int"))

# Ensuring that each column containing numeric data has a numeric data type
df_pin = df_pin.withColumn("downloaded", df_pin["downloaded"].cast("int"))
df_pin = df_pin.withColumn("index", df_pin["index"].cast("int"))

# Cleaning the save_location column
df_pin = df_pin.withColumn("save_location", regexp_replace(df_pin["save_location"], "Local save in", ""))

# Renaming index column into ind column
df_pin = df_pin.withColumnRenamed("index", "ind")

# Reorder dataframe columns
df_pin = df_pin.select(["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"])

# Streaming Data to Delta Tables

In [0]:
#Deletes the checkpoint folder
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

#Writes df_pin dataframe into delta tables
df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12863e427a8f_pin_table")

In [0]:
#Deletes the checkpoint folder
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
dbutils.fs.rm("dbfs:/user/hive/warehouse/12863e427a8f_geo_table", True)

#Writes df_geo dataframe into delta tables
df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12863e427a8f_geo_table")

In [0]:
#Deletes the checkpoint folder
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

#Writes df_user dataframe into delta tables
df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12863e427a8f_user_table")