In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")



In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','Kinesis-Prod-Stream') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

geo_struct = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("country", StringType(), True),])

df_geo_stream = df.filter(df.partitionKey == "geolocation_data")
df_geo_stream = df_geo_stream.selectExpr("CAST(data as STRING) jsonData")
df_geo_stream = df_geo_stream.select(from_json("jsonData", geo_struct).alias("data")).select("data.*")
df_geo_stream.createOrReplaceTempView("df_geo_stream")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_geo_stream_clean AS
SELECT
  ind,
  country,
  ARRAY(latitude, longitude) AS coordinates,
  `timestamp`
FROM
  df_geo_stream

In [0]:
# Remove the existing data
dbutils.fs.rm("/user/hive/warehouse/262542bdae36_geo_table", True)
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

spark.sql("""
CREATE OR REPLACE TEMP VIEW df_geo_stream_cleaned AS
SELECT
  ind,
  country,
  ARRAY(latitude, longitude) AS coordinates,
  `timestamp`
FROM
  df_geo_stream
""")

df_geo_stream_cleaned = spark.table("df_geo_stream_cleaned")

df_geo_stream_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("262542bdae36_geo_table")


<pyspark.sql.streaming.query.StreamingQuery at 0x7f289c6b3fa0>

In [0]:
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','Kinesis-Prod-Stream') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

user_struct = StructType([
    StructField("age", IntegerType(), True),
    StructField("date_joined", TimestampType(), True),
    StructField("first_name", StringType(), True),
    StructField("ind", IntegerType(), True),
    StructField("last_name", StringType(), True),])

df_user_stream = df.filter(df.partitionKey == "user_data")
df_user_stream = df_user_stream.selectExpr("CAST(data as STRING) jsonData")
df_user_stream = df_user_stream.select(from_json("jsonData", user_struct).alias("data")).select("data.*")
df_user_stream.createOrReplaceTempView("df_user_stream")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_user_stream_clean AS
SELECT
  ind,
  CONCAT(first_name, ' ', last_name) AS user_name,
  age,
  date_joined
  FROM
  df_user_stream;

In [0]:
# Remove the existing data
dbutils.fs.rm("/user/hive/warehouse/262542bdae36_user_table", True)
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

spark.sql("""
CREATE OR REPLACE TEMP VIEW df_user_stream_cleaned AS
SELECT
  ind,
  CONCAT(first_name, ' ', last_name) AS user_name,
  age,
  date_joined
  FROM
  df_user_stream;
""")

df_user_stream_cleaned = spark.table("df_user_stream_cleaned")

df_user_stream_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("262542bdae36_user_table")


<pyspark.sql.streaming.query.StreamingQuery at 0x7f289c454fa0>

In [0]:
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','Kinesis-Prod-Stream') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

pin_struct = StructType([
    StructField("category", StringType(), True),
    StructField("description", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("follower_count", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("index", IntegerType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("title", StringType(), True),
    StructField("unique_id", StringType(), True)
    ])

df_pin_stream = df.filter(df.partitionKey == "pinterest_data")
df_pin_stream = df_pin_stream.selectExpr("CAST(data as STRING) jsonData")
df_pin_stream = df_pin_stream.select(from_json("jsonData", pin_struct).alias("data")).select("data.*")
df_pin_stream.createOrReplaceTempView("df_pin_stream")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_clean AS
    SELECT 
        category,
        CASE 
            WHEN description IN ('No description available Story format', 'No description available') 
            THEN NULL
            ELSE description
        END AS description,
        downloaded,
        follower_count,
        image_src,
        `index`,
        is_image_or_video,
        poster_name,
        save_location,
        tag_list,
        title,
        unique_id
    FROM df_pin_stream;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_clean2 AS
  SELECT *
  FROM df_pin_stream_clean
  WHERE follower_count != 'User Info Error';

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_clean3 AS
    SELECT 
        category,
        description,
        downloaded,
        follower_count,
        CASE 
            WHEN image_src IN ('Image src error.') 
            THEN NULL
            ELSE image_src
        END AS image_src,
        `index`,
        is_image_or_video,
        poster_name,
        save_location,
        tag_list,
        title,
        unique_id
    FROM df_pin_stream_clean2;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_clean4 AS
  SELECT 
      category,
      description,
      downloaded,
      follower_count,
      image_src,
      `index`,
      is_image_or_video,
      poster_name,
      save_location,
      CASE 
        WHEN tag_list IN ('N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e') 
        THEN NULL
        ELSE tag_list
      END AS tag_list,
      title,
      unique_id
  FROM df_pin_stream_clean3;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_clean5 AS
SELECT
    category,
    description,
    downloaded,
    CASE
      WHEN follower_count LIKE '%k' THEN CAST(CAST(SUBSTRING(follower_count, 1, LENGTH(follower_count) - 1) AS DECIMAL) * 1000 AS INT) -- removes 'k' from value, multiplies by 1000 and converts to INT
      WHEN follower_count LIKE '%M' THEN CAST(CAST(SUBSTRING(follower_count, 1, LENGTH(follower_count) - 1) AS DECIMAL) * 1000000 AS INT) -- removes 'm' from value, multiplies by 1000000 and converts to INT
      ELSE CAST(follower_count AS INT)
    END AS follower_count,
    image_src,
    `index`,
    is_image_or_video,
    poster_name,
    save_location,
    tag_list,
    title,
    unique_id
FROM df_pin_stream_clean4;


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_clean6 AS
SELECT
    category,
    description,
    downloaded,
    follower_count,
    image_src,
    `index`,
    is_image_or_video,
    poster_name,
    TRIM(SUBSTRING(save_location, 16)) AS save_location,  -- removes 'Local save in' prefix
    tag_list,
    title,
    unique_id
FROM df_pin_stream_clean5;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_clean7 AS
SELECT
    category,
    description,
    downloaded,
    follower_count,
    image_src,
    `index` AS ind,  -- Renaming column 'index' to 'ind'
    is_image_or_video,
    poster_name,
    save_location,
    tag_list,
    title,
    unique_id
FROM df_pin_stream_clean6;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW df_pin_stream_cleaned AS
SELECT
    ind,
    unique_id,
    title,
    description,
    follower_count,
    poster_name,
    tag_list,
    is_image_or_video,
    image_src,
    save_location,
    category
FROM df_pin_stream_clean7;

In [0]:
# Remove the existing data
dbutils.fs.rm("/user/hive/warehouse/262542bdae36_pin_table", True)
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

spark.sql("""
CREATE OR REPLACE TEMP VIEW df_pin_stream_cleaned AS
SELECT
    ind,
    unique_id,
    title,
    description,
    follower_count,
    poster_name,
    tag_list,
    is_image_or_video,
    image_src,
    save_location,
    category
FROM df_pin_stream_clean7
""")

df_pin_stream_cleaned = spark.table("df_pin_stream_cleaned")

df_pin_stream_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("262542bdae36_pin_table")


<pyspark.sql.streaming.query.StreamingQuery at 0x7f289c5a9ed0>