In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import * # pyspark functions
import urllib # URL processing
import base64
import json

# set auth credentials from 'Delta table'
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"
keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
df_pin = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName','streaming-126ca3664fbb-pin') \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
# to decode `data` (from base64) into readable form (JSON string?)
df_pin = df_pin.selectExpr("CAST(data as STRING)") # or just use .cast('string') in parsing below


df_geo = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName','streaming-126ca3664fbb-geo') \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
df_geo = df_geo.selectExpr("CAST(data as STRING)") # as above

df_user = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName','streaming-126ca3664fbb-user') \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()
df_user = df_user.selectExpr("CAST(data as STRING)") # as above


# manually determined schemas

pin_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])
geo_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("country", StringType(), True)
])
user_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True)
])


# converting `data` JSON string into spark dataframe

parsed_pin = df_pin.withColumn("parsed_data", from_json(col("data"), pin_schema))
df_pin = parsed_pin.select("parsed_data.*")

parsed_geo = df_geo.withColumn("parsed_data", from_json(col("data"), geo_schema))
df_geo = parsed_geo.select("parsed_data.*")

parsed_user = df_user.withColumn("parsed_data", from_json(col("data"), user_schema))
df_user = parsed_user.select("parsed_data.*")


# display(df_pin)
# display(df_geo)
# display(df_user)


In [0]:
# function to convert string numbers to ints, including those with k/M units
def convert_to_int(value):
    if type(value) == int:
        return value
    # elif type(value) == float:
    #     return value
    else:
        try:
            value = value.strip()
            if value.endswith('k'):
                return int(float(value[:-1]) * 1000)
            elif value.endswith('M'):
                return int(float(value[:-1]) * 1000000)
            else:
                return int(value)
        except ValueError:
            return None

# custom UDF from the function
convert_to_int_udf = udf(lambda x: convert_to_int(x), IntegerType())

############# PIN DATA ##############

# first cleaning empty strings to None
df_pin = df_pin.select(
    [
        when(trim(col(c)) == "", lit(None)).otherwise(col(c)).alias(c) for c in df_pin.columns
    ]
)

# applying the UDF to the 1 (string) numerical columns
df_pin = df_pin.withColumn(
    "follower_count",
    when(col("follower_count").isNotNull(), convert_to_int_udf(col("follower_count")))
    .otherwise(None)
)

# rename and reorder columns
df_pin = df_pin.withColumnRenamed("index", "ind")

# reordering  columns
df_pin = df_pin.select("ind", "unique_id", "title",
               "description", "follower_count",
               "poster_name", "tag_list",
               "is_image_or_video", "image_src",
               "save_location", "category",
               "downloaded") # this last one is actually missing from the instructions

# calling datafram
df_pin

############# GEO DATA ##############

# producing geospatial array and dropping original 2 columns
df_geo = df_geo.withColumn("coordinates",
                           array(col("latitude"), col("longitude"))
                           ).drop("latitude", "longitude")

# convert the datetime string to proper datetime format
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss"))

# reordering
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

# calling datafram
df_geo

############# USER DATA ##############

# combine first and last names then drop original columns
df_user = df_user.withColumn('user_name',
                             concat_ws(' ',
                                       col('first_name'),
                                       col('last_name'))).drop('first_name', 'last_name')

# convert date_joined to timestamp format
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined", "yyyy-MM-dd'T'HH:mm:ss"))

# reordering
df_user = df_user.select("ind", "user_name", "age", "date_joined")

# calling datafram
df_user

In [0]:

df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("126ca3664fbb_pin_table")

df_geo.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("126ca3664fbb_geo_table")

df_user.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("126ca3664fbb_user_table")