#### Once the stream is running on the client machine:
Input AWS username in the `username` field below and run the notebook

In [0]:
username = "<enter_aws_user_string>"
spark.sql(f"DROP TABLE IF EXISTS {username}_pin_table")
spark.sql(f"DROP TABLE IF EXISTS {username}_geo_table")
spark.sql(f"DROP TABLE IF EXISTS {username}_user_table")
dbutils.fs.rm(f"/tmp/kinesis/_checkpoints/", True)

The following transformations are required to process the data effectively:
#### Pin data:
- Replace missing or non-applicable values with None
- Adjust the numerical series so they only contain numbers

  _e.g. in follower_count '100k' should read '100000'_
- Update data types where they are inaccurate
- Remove the unnecessary "Local save in" prefix from values in the save_location series
- Rename 'index' series to match other two dataframes

In [0]:
def clean_pin_df(df):
  """
  Cleans the pin dataframe by performing the following transformations:
  - Replaces the values in the description series with None if they match the values in the to_replace_with_none dictionary
  - Converts k, M and B suffixes with 000, 000000 and 000000000 respectively in the follower_count series
  - Converts all data types to numeric where applicable
  - Removes the unnecessary "Local save in " prefix from the save_location series
  - Renames the index series to 'ind' to match geo and user dataframes
  - Restructures columns in more logical order
  """
  to_replace_with_none = {
    'description': ['No description available Story format', 'Untitled', 'No description available Story format'],
    'image_src': 'Image src error.',
    'poster_name': 'User Info Error',
    'tag_list': 'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e',
    'title': 'No Title Data Available'
    }

  for column, values in to_replace_with_none.items():
    if isinstance(values, list):
        for value in values:
          cleaned_df = df.replace(value, None, subset=[column])
    else:
        cleaned_df = cleaned_df.replace(values, None, subset=[column])                    
  cleaned_df = cleaned_df.replace({'User Info Error':'0'}, subset=['follower_count'])
  cleaned_df = cleaned_df.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
  cleaned_df = cleaned_df.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
  cleaned_df = cleaned_df.withColumn("follower_count", regexp_replace("follower_count", "B", "000000000"))
  cleaned_df = cleaned_df.withColumn("follower_count", cleaned_df["follower_count"].cast("int"))
  cleaned_df = cleaned_df.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
  cleaned_df = cleaned_df.withColumnRenamed("index", "ind")
  cleaned_df = cleaned_df.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")
  return cleaned_df

#### Geo data:
- Create a series named 'coordinates' by joining the both the 'latitude' and 'longitude' columns, seperating the values with a comma
- Convert the 'timestamp' column to timestamp type

In [0]:
def clean_geo_df(df):
  """
  Cleans the geo dataframe by performing the following transformations:
  - New series 'coordinates' created from latitude and longitude series
  - Timestamp series converted to timestamp type
  - Restructures columns in more logical order
  """
  cleaned_df = df.withColumn("coordinates", array("latitude", "longitude"))
  cleaned_df = cleaned_df.drop("latitude", "longitude")
  cleaned_df = cleaned_df.withColumn("timestamp", to_timestamp("timestamp"))
  cleaned_df = cleaned_df.select("ind", "country", "coordinates", "timestamp")
  return cleaned_df

#### User data
- Create a series 'user_name' by joining the 'first_name' and 'last_name' series and then dropping them
- Convert 'date_joined' series to timestamp type

In [0]:
def clean_user_df(df):
  """
  Cleans the geo dataframe by performing the following transformations:
  - Creates new series 'user_name' created from 'first_name' and 'last_name'
  -  Drops 'first_name' and 'last_name' series
  - 'date_joined' series converted to timestamp type
  - Restructures columns in more logical order
  """
  cleaned_df = df.withColumn("user_name", concat("first_name", lit(" "), "last_name"))
  cleaned_df = cleaned_df.drop("first_name", "last_name")
  cleaned_df = cleaned_df.withColumn("date_joined", to_timestamp("date_joined"))
  cleaned_df = cleaned_df.select("ind", "user_name", "age", "date_joined")
  return cleaned_df

The following is used to bypass format-related errors.

In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled=false

First, this notebook needs access to the stream. This is done by reading a _separate_ delta table containing the credentials:

`delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"`

The `Access key ID` and `Secret access key` are assigned to  variables, and the encoded secret key is created.

Since ELT is a schema-on-read approach, the schema is defined in the `readStream` function by first assigning it to a python object, and assigning the object as the `schema` parameter.

At this stage, it should be possible to read the stream in using the standard syntax. Note that for this particular example, the format is `kinesis` and the `initialPosition` parameter is set to `earliest` to maximise the amount of data that is captured.
As soon as the data has been read in, it is immediately written into its corresponding delta table with the `writeStream` method.

Once the data has been extracted, it is necessary to run the following to ensure it is in the correct format:
```
df = df.selectExpr("CAST(data as STRING)")
df = df.withColumn("data", from_json(col("data"), schema=<custom_defined_schema>))
df = df.select("data.*")
```
This is repeated for `pin` `geo` and `user` dataframes.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
aws_keys_df = spark.read.format("delta").load(delta_table_path)

ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

pin_streaming_schema = StructType([
    StructField("category", StringType(), True),
    StructField("description", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("follower_count", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("index", IntegerType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("title", StringType(), True),
    StructField("unique_id", StringType(), True)]
  )

pin_df = (
    spark
    .readStream
    .format("kinesis") \
    .option("streamName", f"streaming-{username}-pin") \
    .option("region", "us-east-1") \
    .option("initialPosition", "earliest") \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    )
pin_df = pin_df.selectExpr("CAST(data as STRING)")
pin_df = pin_df.withColumn("data", from_json(col("data"), schema=pin_streaming_schema))
pin_df = pin_df.select("data.*")

cleaned_pin_df = clean_pin_df(pin_df)

cleaned_pin_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/pin") \
  .table(f"{username}_pin_table")


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:730)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:448)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:448)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio

In [0]:

geo_streaming_schema = StructType([
  StructField("country", StringType(), True),
  StructField("ind", LongType(), True),
  StructField("latitude", DoubleType(), True),
  StructField("longitude", DoubleType(), True),
  StructField("timestamp", StringType(), True)]
  )

geo_df = (
    spark
    .readStream
    .format("kinesis") \
    .option("streamName", f"streaming-{username}-geo") \
    .option("region", "us-east-1") \
    .option("initialPosition", "earliest") \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    )
geo_df = geo_df.selectExpr("CAST(data as STRING)")
geo_df = geo_df.withColumn("data", from_json(col("data"), schema=geo_streaming_schema))
geo_df = geo_df.select("data.*")

cleaned_geo_df = clean_geo_df(geo_df)

cleaned_geo_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/geo") \
  .table(f"{username}_geo_table")


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1721684535882719>, line 32[0m
[1;32m     25[0m cleaned_geo_df [38;5;241m=[39m clean_geo_df(geo_df)
[1;32m     27[0m display(cleaned_geo_df)
[1;32m     28[0m cleaned_geo_df[38;5;241m.[39mwriteStream \
[1;32m     29[0m   [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m) \
[1;32m     30[0m   [38;5;241m.[39moutputMode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m) \
[1;32m     31[0m   [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mcheckpointLocation[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m/tmp/kinesis/_checkpoints/[39m[38;5;124m"[39m) \
[0;32m---> 32[0m   [38;5;241m.[39mtable([38;5;124m"[39m[38;5;124m12885f560a0b_geo_table[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/sql/streaming/readwrit

In [0]:
user_streaming_schema = StructType([
  StructField("age", LongType(), True),
  StructField("date_joined", StringType(), True),
  StructField("first_name", StringType(), True),
  StructField("ind", LongType(), True),
  StructField("last_name", StringType(), True)]  
  )

user_df = (
    spark
    .readStream
    .format("kinesis") \
    .option("streamName", f"streaming-{username}-user") \
    .option("region", "us-east-1") \
    .option("initialPosition", "earliest") \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
    )
user_df = user_df.selectExpr("CAST(data as STRING)")
user_df = user_df.withColumn("data", from_json(col("data"), schema=user_streaming_schema))
user_df = user_df.select("data.*")

cleaned_user_df = clean_user_df(user_df)

cleaned_user_df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/user") \
  .table(f"{username}_user_table")


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-294045431771656>, line 31[0m
[1;32m     25[0m cleaned_user_df [38;5;241m=[39m clean_user_df(user_df)
[1;32m     26[0m display(cleaned_user_df)
[1;32m     27[0m cleaned_user_df[38;5;241m.[39mwriteStream \
[1;32m     28[0m   [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m) \
[1;32m     29[0m   [38;5;241m.[39moutputMode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m) \
[1;32m     30[0m   [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mcheckpointLocation[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m/tmp/kinesis/_checkpoints/[39m[38;5;124m"[39m) \
[0;32m---> 31[0m   [38;5;241m.[39mtable([38;5;124m"[39m[38;5;124m12885f560a0b_user_table[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/sql/streaming/rea

Finally, the checkpoints are cleared to reset the notebook to it's initial state.

In [0]:
dbutils.fs.rm(f"/tmp/kinesis/_checkpoints/pin", True)
dbutils.fs.rm(f"/tmp/kinesis/_checkpoints/geo", True)
dbutils.fs.rm(f"/tmp/kinesis/_checkpoints/user", True)