In [None]:
# pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType

# URL processing
import urllib

In [None]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
# TASK 2: READ DATA

# Read pin data
df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12c0d092d679-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_pin = df_pin.selectExpr("CAST(data as STRING) jsonData")

# Create schema
schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True),
    StructField("downloaded", StringType(), True)
])

df_pin = df_pin.select(from_json("jsonData", schema).alias("data")).select("data.*")

# Cleaning
# Replace empty entries and entries with no relevant data in each column with Nones
replace_values = {
    'User Info Error': ['follower_count', 'poster_name'],
    'No description available Story format': ['description'],
    'No Title Data Available': ['title'],
    'Image src error.': ['image_src'],
    'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': ['tag_list']
}

for error_value, columns in replace_values.items():
    df_pin = df_pin.replace({error_value: None}, subset=columns)

# Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int.
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast("integer"))

# Ensure that each column containing numeric data has a numeric data type
# Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn("downloaded", df_pin["downloaded"].cast("integer")) \
    .withColumn("index", df_pin["index"].cast("integer")) \
    .withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Rename the index column to ind.
df_pin = df_pin.withColumnRenamed("index", "ind")

# Change order of columns
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

df_pin = df_pin.dropDuplicates()

In [None]:
query = (
  df_pin
    .writeStream
    .format("delta")          # Delta Lake sink for durable storage
    .queryName("df_pin_streaming_query") # Can give the query a name
    .outputMode("append")   # Complete mode: All counts should be stored in Delta Lake
    .option("checkpointLocation", "tmp/checkpoints/test")  # Add checkpoint location
    .table("12c0d092d679_pin_table")  # Specify the Delta Lake table name
)

In [None]:
# Read geo data
df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12c0d092d679-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_geo = df_geo.selectExpr("CAST(data as STRING) geoJsonData")

geo_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("timestamp", StringType(), True)
])

df_geo = df_geo.select(from_json("geoJsonData", geo_schema).alias("geo_data")).select("geo_data.*")

# Cleaning
# Create a new column coordinates that contains an array based on the latitude and longitude columns
df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

# Drop the latitude and longitude columns from the DataFrame
df_geo = df_geo.drop(*["latitude", "longitude"])

# Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

# Reorder the DataFrame columns
df_geo = df_geo.select('ind', 'country', 'coordinates', 'timestamp')

# Remove duplicates
df_geo = df_geo.dropDuplicates()

df_geo = df_geo.selectExpr("ind", "country", "coordinates", "timestamp")


In [None]:
query = (
  df_geo
    .writeStream
    .format("delta")          # Delta Lake sink for durable storage
    .queryName("df_geo_streaming_query") # Can give the query a name
    .outputMode("append")   # Complete mode: All counts should be stored in Delta Lake
    .option("checkpointLocation", "tmp/checkpoints/geo_test")  # Add checkpoint location
    .table("12c0d092d679_geo_table")  # Specify the Delta Lake table name
)

In [None]:
# Read user data
df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12c0d092d679-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_user = df_user.selectExpr("CAST(data as STRING) userJsonData")
user_schema = StructType([
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("ind", StringType(), True),
    StructField("last_name", StringType(), True)
])
df_user = df_user.select(from_json("userJsonData", user_schema).alias("user_data")).select("user_data.*")

# Cleaning
# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drop first_name and last_name
df_user = df_user.drop(*["first_name", "last_name"])

# Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))

# Reorder the DataFrame columns
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')

# Remove duplicates
df_user = df_user.dropDuplicates()

df_user = df_user.selectExpr("ind", "user_name", "age", "date_joined")


In [None]:
query = (
  df_user
    .writeStream
    .format("delta")          # Delta Lake sink for durable storage
    .queryName("df_user_streaming_query") # Can give the query a name
    .outputMode("append")   # Complete mode: All counts should be stored in Delta Lake
    .option("checkpointLocation", "tmp/checkpoints/user_test")  # Add checkpoint location
    .table("12c0d092d679_user_table")  # Specify the Delta Lake table name
)