# Data preparation

In [None]:
import pyspark


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [None]:
 spark = (
    SparkSession.builder.master("local[1]")
    .appName("Exam data preparation")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
    .getOrCreate()
)

In [None]:
spark

In [None]:
titanic_schema = (
    T.StructType()
    .add("PassengerId", T.IntegerType())
    .add("Survived", T.IntegerType())
    .add("Pclass", T.IntegerType())
    .add("Name", T.StringType())
    .add("Sex", T.StringType())
    .add("Age", T.IntegerType())
    .add("SibSp", T.IntegerType())
    .add("Parch", T.IntegerType())
    .add("Ticket", T.StringType())
    .add("Fare", T.FloatType())
    .add("Cabin", T.StringType())
    .add("Embarked", T.StringType())
    .add("Timestamp", T.TimestampType())
)


In [None]:

df_source_batch = spark.read.csv("./data/titanic.csv", schema=titanic_schema)
df_source_batch = df_source_batch.withColumn("value", F.to_json(F.struct(*df_source_batch.columns)).cast(T.StringType()))
dataframe_source_batch_writer = df_source_batch.select("value").write \
                                .format("kafka") \
                                .option("kafka.bootstrap.servers", "localhost:9092") \
                                .option("topic", "titanic_topic")


In [None]:
df_source_batch.select("value").count()

In [None]:
# Run twice to have duplicates to drop
dataframe_source_batch_writer.save()
dataframe_source_batch_writer.save()

In [None]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "titanic_topic") \
  .option("failOnDataLoss", "true") \
  .load()
df.printSchema()

In [None]:
df.count()

In [None]:
df.tail(1)

In [None]:
df= df.withColumn("message_content", F.from_json(F.col("value").cast("string"), titanic_schema))

df_minimal = df.select("message_content.*")

In [None]:
df_minimal.printSchema()

In [None]:
df_minimal = df_minimal.withColumn("Fare", F.col("Fare").cast(T.StringType()))
df_minimal = df_minimal.withColumn("Age", F.col("Age").cast(T.StringType()))

In [None]:
df_to_kafka = df_minimal

df_to_kafka = df_to_kafka.withColumn("string_columns", F.struct([col for col, type_name in df_to_kafka.dtypes if type_name=="string"]))
df_to_kafka = df_to_kafka.withColumn("numeric_columns", F.struct([col for col, type_name in df_to_kafka.dtypes if type_name in ["int","float"]]))

In [None]:
df_to_kafka.printSchema()

In [None]:
df_to_kafka = df_to_kafka.select("Timestamp","string_columns", "numeric_columns")

In [None]:
df_to_kafka.printSchema()

In [None]:
df_to_kafka = df_to_kafka.withColumn("data_packed_for_kafka", F.to_json(F.struct(*df_to_kafka.columns)))


In [None]:
df_to_kafka.printSchema()

In [None]:
query = df_to_kafka.select(F.col("data_packed_for_kafka").alias("value")) \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "topic_nested") \
  .save()


In [None]:
meant_to_be_json = df_to_kafka.select(F.col("data_packed_for_kafka")).tail(1)[0]["data_packed_for_kafka"]

In [None]:
import json 

# prove data is correctly formatted JSON
json.loads(meant_to_be_json)

In [None]:
spark.stop()