# Databrick Stream Processing Notebook
This notebook was originally written in Databricks to the perform the following tasks:
    1. Connect to AWS and retrieve the data
    2. Clean the data
    3. Build the tables
    

## Retrieve Access Keys

In [0]:
# import statements
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import col,when
from pyspark.sql.functions import regexp_extract
import urllib

# retrieve amazon credentials
# specify file type to be csv
file_type = "csv"

# indicates file has first row as the header
first_row_is_header = "true"

# indicates file has comma as the delimeter
delimiter = ","

# read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

# encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

## Build the Pin Dataframe and Table

This section creates and cleans the dataframe containing the pin data and loads to a table.

In [0]:
# create the dataframe
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a25072a5e0f-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# specify schema
pin_schema = StructType([
    StructField("index", IntegerType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("poster_name", StringType()),
    StructField("follower_count", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", IntegerType()),
    StructField("save_location", StringType()),
    StructField("category", StringType())
])

# deserialize
deserialize_df_pin = df_pin.selectExpr("CAST(data as STRING)")
deserialize_df_pin = deserialize_df_pin.withColumn("data", from_json(col("data"), pin_schema))
deserialize_df_pin = deserialize_df_pin.selectExpr("data.*")

# remove duplicates
df_pin = deserialize_df_pin.drop_duplicates([column_name for column_name, data_type in deserialize_df_pin.dtypes])

# replace null values into None
df_pin_clean = df_pin.withColumn("category", when(df_pin["category"].isNull(), None).otherwise(df_pin["category"]))
df_pin_clean = df_pin.withColumn("description", when(df_pin["description"].isNull(), None).otherwise(df_pin["description"]))

# replace entries with no relevant data with None
df_pin_clean = df_pin.withColumn("description", when(col("description").contains("No description"), None).otherwise(col("description")))
df_pin_clean = df_pin.withColumn("follower_count", when(col("follower_count").contains("User Info Error"), None).otherwise(col("follower_count")))
df_pin_clean = df_pin.withColumn("image_src", when(col("image_src").contains("Image src error"), None).otherwise(col("image_src")))
df_pin_clean = df_pin.withColumn("poster_name", when(col("poster_name").contains("User Info Error"), None).otherwise(col("poster_name")))

# change M and k inside follower_column into its coresponding value
df_pin_clean = df_pin_clean.withColumn("follower_count", regexp_replace(df_pin_clean["follower_count"], "M", "000000"))
df_pin_clean = df_pin_clean.withColumn("follower_count", regexp_replace(df_pin_clean["follower_count"], "k", "000"))

# change follower_count data type into int
df_pin_clean = df_pin_clean.withColumn("follower_count", df_pin_clean["follower_count"].cast("int"))

# ensuring that each column containing numeric data has a numeric data type
df_pin_clean = df_pin_clean.withColumn("downloaded", df_pin_clean["downloaded"].cast("int"))
df_pin_clean = df_pin_clean.withColumn("index", df_pin_clean["index"].cast("int"))

# cleaning the save_location column
df_pin_clean = df_pin_clean.withColumn("save_location", regexp_replace(df_pin_clean["save_location"], "Local save in", ""))

# renaming index column into ind column
df_pin_clean = df_pin_clean.withColumnRenamed("index", "ind")

# reorder dataframe columns
df_pin_clean = df_pin_clean.select(["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"])

try:
  # deletes the checkpoint folder
  dbutils.fs.rm("/tmp/kinesis/_checkpoints/pin/", True)
except:
  print("There is no saved checkpoints folder.")

# writes df_pin_clean dataframe into deltatables
df_pin_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/pin/") \
  .table("0a25072a5e0f_pin_table")

## Build the Geo Dataframe and Table

This section creates and cleans the dataframe containing the geo data and loads to a table

In [0]:
# create the dataframe
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a25072a5e0f-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# specify schema
geo_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("country", StringType())
])

# deserialize
deserialize_df_geo = df_geo.selectExpr("CAST(data as STRING)")
deserialize_df_geo = deserialize_df_geo.withColumn("data", from_json(col("data"), geo_schema))
deserialize_df_geo = deserialize_df_geo.selectExpr("data.*")

# remove duplicates
df_geo = deserialize_df_geo.drop_duplicates([column_name for column_name, data_type in deserialize_df_geo.dtypes])

# creating a new coordinates column
df_geo_clean = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# dropping latitude and longitude column
df_geo_clean = df_geo_clean.drop("latitude", "longitude")

# converting timestamp into a timestamp data type
df_geo_clean = df_geo_clean.withColumn("timestamp", df_geo_clean["timestamp"].cast("timestamp"))

# reordering columns
df_geo_clean = df_geo_clean.select(["ind", "country", "coordinates", "timestamp"])

try:
  # deletes the checkpoint folder
  dbutils.fs.rm("/tmp/kinesis/_checkpoints/geo/", True)
except:
  print("There is no saved checkpoints folder.")

# writes df_geo_clean dataframe into deltatables
df_geo_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/geo/") \
  .table("0a25072a5e0f_geo_table")

%md
## Build the User Dataframe and Table

This section creates and cleans the dataframe containing the user data and loads to a table

In [0]:
# create the dataframe
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0a25072a5e0f-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

# specify schema
user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", StringType()),
    StructField("date_joined", TimestampType())
])

# deserialize
deserialize_df_user = df_user.selectExpr("CAST(data as STRING)")
deserialize_df_user = deserialize_df_user.withColumn("data", from_json(col("data"), user_schema))
deserialize_df_user = deserialize_df_user.selectExpr("data.*")

# remove duplicates
df_user = deserialize_df_user.drop_duplicates([column_name for column_name, data_type in deserialize_df_user.dtypes])

# creating an username column
df_user_clean = df_user.withColumn("user_name", concat(col("first_name"), col("last_name")))

# dropping first and last name columns
df_user_clean = df_user_clean.drop("first_name", "last_name")

# converting date_joined column into a timestamp
df_user_clean = df_user_clean.withColumn("date_joined", df_user_clean["date_joined"].cast("timestamp"))

# reodering columns
df_user_clean = df_user_clean.select(["ind","user_name","age","date_joined"])

try:
  # deletes the checkpoint folder
  dbutils.fs.rm("/tmp/kinesis/_checkpoints/user/", True)
except:
  print("There is no saved checkpoints folder.")

# writes df_user_clean dataframe into deltatables
df_user_clean.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/user/") \
  .table("0a25072a5e0f_user_table")