## Imports

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

## Credentials

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format("csv")\
.option("header", "true")\
.option("sep", ",")\
.load("/FileStore/tables/authentication_credentials.csv")
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

## Schemas

In [0]:
schema_pin = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

schema_geo = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("country", StringType())
])

schema_user = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

## Reading from stream

In [0]:
user_df = spark.readStream \
    .format('kinesis') \
    .option('streamName','streaming-0a9b5b8a2ae5-user') \
    .option('initialPosition', 'earliest') \
    .option('region', 'us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()

geo_df = spark.readStream \
    .format('kinesis') \
    .option('streamName','streaming-0a9b5b8a2ae5-geo') \
    .option('initialPosition', 'earliest') \
    .option('region', 'us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()

pin_df = spark.readStream \
    .format('kinesis') \
    .option('streamName','streaming-0a9b5b8a2ae5-pin') \
    .option('initialPosition', 'earliest') \
    .option('region', 'us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()

user_df = user_df.selectExpr("CAST(data as STRING)")
user_df = user_df.withColumn("data", from_json(user_df["data"], schema_user))
user_df = user_df.selectExpr("data.*")

geo_df = geo_df.selectExpr("CAST(data as STRING)")
geo_df = geo_df.withColumn("data", from_json(geo_df["data"], schema_geo))
geo_df = geo_df.selectExpr("data.*")

pin_df = pin_df.selectExpr("CAST(data as STRING)")
pin_df = pin_df.withColumn("data", from_json(pin_df["data"], schema_pin))
pin_df = pin_df.selectExpr("data.*")

display(user_df)

## Clean data

#### Cleaning pin_df data

In [0]:
# remove duplicates 
pin_df = pin_df.drop_duplicates()

# replacing empty entries and entries with no relevant data with None
pin_df = pin_df.replace({"No description available Story format" : None}, subset=["description"])
pin_df = pin_df.replace({"User Info Error" : None}, subset=["follower_count"])
pin_df = pin_df.replace({"Image src error." : None}, subset=["image_src"])
pin_df = pin_df.replace({"User Info Error" : None}, subset=["poster_name"])
pin_df = pin_df.replace({"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e" : None}, subset=["tag_list"])
pin_df = pin_df.replace({"No Title Data Available" : None}, subset=["title"])

# transforming the follower_count column to make sure every entry is a number
pin_df = pin_df.withColumn("follower_count",regexp_replace("follower_count","k","000"))
pin_df = pin_df.withColumn("follower_count",regexp_replace("follower_count","M","000000"))

# casting follower_count column to int
pin_df = pin_df.withColumn("follower_count", pin_df["follower_count"].cast("int"))

# change the save_location column to only have the path
pin_df = pin_df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# rename the index column
pin_df = pin_df.withColumnRenamed("index","ind")

# reordering the columns
pin_df = pin_df.select("ind","unique_id","title","description","follower_count","poster_name","tag_list","is_image_or_video","image_src","save_location","category")

display(pin_df)


#### Cleaning geo_df data

In [0]:
# remove duplicates
geo_df = geo_df.drop_duplicates()

# coordinates column
geo_df = geo_df.withColumn("coordinates", array("latitude", "longitude"))

# dropping latitude and logitude columns
geo_df = geo_df.drop("latitude","longitude")

# converting timestamp from a string to a timestamp
geo_df = geo_df.withColumn("timestamp", to_timestamp("timestamp"))

# reordering the columns
geo_df = geo_df.select("ind","country","coordinates","timestamp")

display(geo_df)

#### Cleaning user_df data

In [0]:
# removing duplicates
user_df = user_df.drop_duplicates()

# a new user_name column
user_df = user_df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

# dropping first_name and last_name
user_df = user_df.drop("first_name", "last_name")

# converting date_joined to a timestamp
user_df = user_df.withColumn("date_joined", to_timestamp("date_joined"))

# reordering the columns
user_df = user_df.select("ind", "user_name", "age", "date_joined")

display(user_df)


## Writing data to Delta Tables

In [0]:
pin_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/0a9b5b8a2ae5_pin_table_checkpoints/") \
    .table("0a9b5b8a2ae5_pin_table")   

user_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/0a9b5b8a2ae5_user_table_checkpoints/") \
    .table("0a9b5b8a2ae5_user_table")  

geo_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/0a9b5b8a2ae5_geo_table_checkpoints/") \
    .table("0a9b5b8a2ae5_geo_table")   

In [0]:
# to remove the checkpoint folder if the above block needs to be ran again
# dbutils.fs.rm("/tmp/kinesis/0a9b5b8a2ae5_pin_table_checkpoints/", True)
# dbutils.fs.rm("/tmp/kinesis/0a9b5b8a2ae5_user_table_checkpoints/", True)
# dbutils.fs.rm("/tmp/kinesis/0a9b5b8a2ae5_geo_table_checkpoints/", True)