In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
df_pin_streams = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12a3410ba3cf-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
pin_schema = StructType([
    StructField("index", StringType()),
    StructField("unique_id", StringType()),
    StructField("title", StringType()),
    StructField("description", StringType()),
    StructField("follower_count", StringType()),
    StructField("poster_name", StringType()),
    StructField("tag_list", StringType()),
    StructField("is_image_or_video", StringType()),
    StructField("image_src", StringType()),
    StructField("downloaded", StringType()),  # Assuming downloaded is of StringType
    StructField("save_location", StringType()),
    StructField("category", StringType())
])


df_pin = df_pin_streams \
    .selectExpr("CAST(data as STRING)") \
    .select(from_json(col("data"), pin_schema).alias("pin_data")) \
    .select("pin_data.*")


In [0]:
df_pin_cleaned = df_pin.withColumn("category", when(df_pin["category"].isNull(), None).otherwise(df_pin["category"]))
df_pin_cleaned = df_pin.withColumn("description", when(df_pin["description"].isNull(), None).otherwise(df_pin["description"]))
df_pin_cleaned = df_pin_cleaned.withColumn(
    "follower_count",
    when(col("follower_count").endswith("k"), 
         regexp_replace(col("follower_count"), "k", "").cast("int") * 1000)
    .when(col("follower_count").endswith("M"), 
          regexp_replace(col("follower_count"), "M", "").cast("int") * 1000000)
    .otherwise(col("follower_count").cast("int"))
)
df_pin_cleaned = df_pin_cleaned.withColumn(
    "follower_count",
    when(
        df_pin_cleaned["follower_count"].cast("int").isNotNull(),
        df_pin_cleaned["follower_count"].cast("int")
    ).otherwise(None)
)
df_pin_cleaned.withColumn("downloaded",col("downloaded").cast("int"))
df_pin_cleaned = df_pin_cleaned.withColumn("save_location",regexp_replace(col("save_location"), "^Local save in ", ""))
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")
desired_column_order = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]
df_pin_cleaned = df_pin_cleaned.select(desired_column_order)
df_pin_cleaned.withColumn("ind",col("ind").cast("int"))


In [0]:
df_pin_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12a3410ba3cf_pin_table")

In [0]:
df_geo_streams = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12a3410ba3cf-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()


In [0]:
geo_schema = StructType([
    StructField("ind", StringType()),
    StructField("timestamp", StringType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("country", StringType())
])

df_geo = df_geo_streams \
    .selectExpr("CAST(data as STRING)") \
    .select(from_json(col("data"), geo_schema).alias("geo_data")) \
    .select("geo_data.*")

In [0]:
df_geo_cleaned = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", to_timestamp(col("timestamp")))
df_geo_cleaned = df_geo_cleaned.select("ind", "country", "coordinates", "timestamp")

In [0]:
df_geo_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12a3410ba3cf_geo_table")

In [0]:
df_user_streams = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-12a3410ba3cf-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
user_schema = StructType([
    StructField("ind", StringType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),
    StructField("date_joined", TimestampType())
])

df_user = df_user_streams \
    .selectExpr("CAST(data as STRING)") \
    .select(from_json(col("data"), user_schema).alias("user_data")) \
    .select("user_data.*")

In [0]:
df_user_cleaned = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")
df_user_cleaned = df_user_cleaned.withColumn("date_joined", to_timestamp(col("date_joined"), "yyyy-MM-dd'T'HH:mm:ss"))
df_user_cleaned = df_user_cleaned.select("ind", "user_name", "age", "date_joined")

In [0]:
df_user_cleaned.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("12a3410ba3cf_user_table")