In [5]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType

GROUP_ID = "so_v2"

source_base_path = "abfss://demo@dvtrainingadls.dfs.core.windows.net/stackoverflow/"
raw_base_path = "abfss://raw@datakickstartadls.dfs.core.windows.net/stackoverflow/"
refined_base_path = "abfss://refined@datakickstartadls.dfs.core.windows.net/"

ckpt_path = f"{raw_base_path}checkpoints/stackoverflow_streaming_{GROUP_ID}"
dest_path = f"{raw_base_path}stack_overflow_streaming/posts_{GROUP_ID}"

In [6]:
%run utils/logging_utils

In [7]:
job_name = "stackoverflow_stream"
destination_database_name = "raw"
destination_table_name = "stackoverflow_streaming"
source_table_name = "stackoverflow_kafka_stream"
start_logging(job_name, destination_database_name, destination_table_name, source_table_name)

In [8]:
post_schema = (
  StructType()
    .add('_Id','long')
    .add('_ParentId','long')
    .add('_PostTypeId','long')
    .add('_Score','long')
    .add('_Tags','string')
    .add('_Title','string')
    .add('_ViewCount','long')  
    .add('_LastActivityDate','timestamp')
    .add('_LastEditDate','timestamp')
    .add('_LastEditorDisplayName','string')
    .add('_LastEditorUserId','long')
    .add('_OwnerDisplayName','string')
    .add('_OwnerUserId','long')
    .add('_AcceptedAnswerId','long')
    .add('_AnswerCount','long')
    .add('_Body','string')
    .add('_ClosedDate','timestamp')
    .add('_CommentCount','long')
    .add('_CommunityOwnedDate','timestamp')
    .add('_ContentLicense','string')
    .add('_CreationDate','timestamp')
    .add('_FavoriteCount','long')
)

In [9]:
##Define Variables
confluentBootstrapServers = "pkc-41973.westus2.azure.confluent.cloud:9092"
confluentApiKey =  mssparkutils.credentials.getSecretWithLS('demokv', 'confluent-cloud-user')
confluentSecret = mssparkutils.credentials.getSecretWithLS('demokv', 'confluent-cloud-password')
confluentTopicName = "stackoverflow_post"

##Create Spark Readstream
post_raw_df = (
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", confluentBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
)

post_cast_df = post_raw_df.select(
    col('key').cast('string').alias('key'),
    from_json(col('value').cast('string'), post_schema).alias("json"))

post_df = post_cast_df.selectExpr("json.*")

In [10]:
new_users = spark.read.format("delta").load(f"{refined_base_path}stackoverflow_new_users").select(
    "user_id",
    "account_id",
    "display_name",
    "website_url"
)

old_users = spark.read.parquet(f"{raw_base_path}users").selectExpr(
    "_ID as user_id",
    "_AccountId as account_id",
    "_DisplayName as display_name",
    "_WebsiteUrl as website_url"
    )

all_users = old_users.union(new_users)

In [11]:
combined_df = post_df.join(all_users, post_df["_OwnerUserId"] == all_users["user_id"], how="left")
log_informational_message("Starting stream for combined so posts to delta file.")

q = combined_df.writeStream.format("delta").option("checkpointLocation",ckpt_path) \
    .trigger(processingTime='30 seconds').outputMode("append") \
    .start(dest_path)

q.processAllAvailable()
q.stop()

In [12]:
test_df = spark.read.format("delta").load(dest_path)
display(test_df)

In [None]:
stop_logging(job_name)