In [1]:
import yaml
import json

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, LongType, IntegerType, StringType, FloatType, BooleanType
import pyspark.sql.functions as F

In [2]:
with open('streaming_config.yml', 'r') as file:
    config = yaml.safe_load(file)

BOOTSTRAP_SERVERS = config['kafka']['bootstrap_servers']
TOPIC = 'auth_events'
SCHEMA = config['kafka']['topics'][TOPIC]['schema']
SPARK_JARS_PACKAGES = config['spark']['spark_jars_packages']

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages {SPARK_JARS_PACKAGES}'

In [None]:
# from pyspark.sql import SparkSession
# import pyspark.sql.types as T
# import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName(f"song-streaming-data-pipeline-{TOPIC}") \
    .config("spark.jars.packages", SPARK_JARS_PACKAGES) \
    .getOrCreate()
    
spark.sparkContext.setLogLevel("INFO")

In [5]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query 

def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

def prepare_dataframe_to_kafka_sink(dataframe: DataFrame, key_columns: list, value_columns: list) -> DataFrame:
    
    dataframe = dataframe.withColumn("value", F.to_json(F.struct(*[c for c in dataframe.columns if c != 'key'])))
    # dataframe = dataframe.withColumn("value", F.concat('{', F.col('value'), '}'))
    return dataframe.select(['key', "value"])

def sink_kafka(dataframe: DataFrame, topic: str, output_mode: str = 'append', kafka_bootstrap_servers: str = 'broker:29092', kafka_checkpoint_location: str = 'checkpoint'):
    write_query = dataframe.writeStream \
        .outputMode(output_mode) \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("topic", topic) \
        .option("checkpointLocation", kafka_checkpoint_location) \
        .start()
    return write_query


In [None]:
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("subscribe", "auth_events") \
    .option("startingOffsets", "earliest") \
    .load()
    # .option("checkpointLocation", "checkpoint1") \
    

In [7]:
auth_events_schema = SCHEMA
auth_events_schema = eval(auth_events_schema)

In [8]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df_kafka_encoded = df_kafka_encoded.withColumn('json_value', F.from_json('value', auth_events_schema)).select('key', 'value', 'json_value.*')
df_kafka_encoded = df_kafka_encoded.withColumn('ts', F.col('ts')/1000)
df_kafka_encoded = df_kafka_encoded.withColumn('ts', F.from_unixtime(F.col('ts'), 'yyyy-MM-dd HH:mm:ss.SSSSSS'))
df_kafka_encoded = df_kafka_encoded.withColumn('key', F.hash('userId', 'sessionId', 'ts'))
df_kafka_encoded = df_kafka_encoded.drop('value')
df_kafka_encoded = df_kafka_encoded.withColumn('key', df_kafka_encoded.key.cast(StringType()))

In [9]:
df_sink_kafka = prepare_dataframe_to_kafka_sink(df_kafka_encoded, key_columns=['key'], value_columns=['ts', 'sessionId', 'level', 'itemInSession', 'city', 'zip', 'state', 'userAgent', 'lon', 'lat', 'userId', 'lastName', 'firstName', 'gender', 'registration', 'tag', 'success'])

In [None]:
# write_query = sink_console(df_sink_kafka, output_mode='append')
# df_sink_kafka.printSchema()
write_query = sink_kafka(df_sink_kafka, output_mode='append', topic='auth_events_transformed')
# write_query.show()
# write_query.awaitTermination(5)

In [None]:
# write_query, query_results = sink_memory(df_sink_kafka, query_name='auth_events_transformed', query_template='select * from {table_name}')
# query_results.show()

In [None]:
write_query.stop()