In [182]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import *

In [183]:
spark = (
    SparkSession.builder.master("local[*]")
    .appName("Tutorial App")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
    .getOrCreate()
)

In [184]:
read = spark.read \
                          .format("kafka") \
                          .option("kafka.bootstrap.servers", "localhost:9092") \
                          .option("subscribe", "topic_nested")

In [185]:
read = read.load()

In [186]:
schema = StructType([
    StructField("Timestamp", StringType(), True),
    StructField("string_columns", StructType([
        StructField("Name", StringType(), True),
        StructField("Sex", StringType(), True),
        StructField("Age", StringType(), True),
        StructField("Ticket", StringType(), True),
        StructField("Fare", StringType(), True),
        StructField("Cabin", StringType(), True),
        StructField("Embarked", StringType(), True)
    ]), True),
    StructField("numeric_columns", StructType([
        StructField("PassengerId", IntegerType(), True),
        StructField("Survived", IntegerType(), True),
        StructField("Pclass", IntegerType(), True),
        StructField("SibSp", IntegerType(), True),
        StructField("Parch", IntegerType(), True)
    ]), True)
])

In [187]:
read = read.select(col("value").cast(StringType()))

In [188]:
read = read.withColumn("message_content", F.from_json(F.col("value").cast("string"),schema))

In [189]:
read.printSchema()

root
 |-- value: string (nullable = true)
 |-- message_content: struct (nullable = true)
 |    |-- Timestamp: string (nullable = true)
 |    |-- string_columns: struct (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Sex: string (nullable = true)
 |    |    |-- Age: string (nullable = true)
 |    |    |-- Ticket: string (nullable = true)
 |    |    |-- Fare: string (nullable = true)
 |    |    |-- Cabin: string (nullable = true)
 |    |    |-- Embarked: string (nullable = true)
 |    |-- numeric_columns: struct (nullable = true)
 |    |    |-- PassengerId: integer (nullable = true)
 |    |    |-- Survived: integer (nullable = true)
 |    |    |-- Pclass: integer (nullable = true)
 |    |    |-- SibSp: integer (nullable = true)
 |    |    |-- Parch: integer (nullable = true)



In [190]:
df_denest = read.select(
    col("message_content.Timestamp").alias("Timestamp"),
    col("message_content.string_columns.Name").alias("Name"),
    col("message_content.string_columns.Sex").alias("Sex"),
    col("message_content.string_columns.Age").alias("Age"),
    col("message_content.string_columns.Ticket").alias("Ticket"),
    col("message_content.string_columns.Fare").alias("Fare"),
    col("message_content.string_columns.Cabin").alias("Cabin"),
    col("message_content.string_columns.Embarked").alias("Embarked"),
    col("message_content.numeric_columns.PassengerId").alias("PassengerId"),
    col("message_content.numeric_columns.Survived").alias("Survived"),
    col("message_content.numeric_columns.Pclass").alias("Pclass"),
    col("message_content.numeric_columns.SibSp").alias("SibSp"),
    col("message_content.numeric_columns.Parch").alias("Parch")
)

In [191]:
df_denest.printSchema()

root
 |-- Timestamp: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)



In [192]:
df_denest.show()

+--------------------+--------------------+------+----+----------------+-------+-----+--------+-----------+--------+------+-----+-----+
|           Timestamp|                Name|   Sex| Age|          Ticket|   Fare|Cabin|Embarked|PassengerId|Survived|Pclass|SibSp|Parch|
+--------------------+--------------------+------+----+----------------+-------+-----+--------+-----------+--------+------+-----+-----+
|2020-01-01T13:45:...|Braund, Mr. Owen ...|  male|  22|       A/5 21171|   7.25| null|       S|          1|       0|     3|    1|    0|
|2020-01-01T13:44:...|Cumings, Mrs. Joh...|female|  38|        PC 17599|71.2833|  C85|       C|          2|       1|     1|    1|    0|
|2020-01-01T13:38:...|Heikkinen, Miss. ...|female|  26|STON/O2. 3101282|  7.925| null|       S|          3|       1|     3|    0|    0|
|2020-01-01T13:32:...|Futrelle, Mrs. Ja...|female|  35|          113803|   53.1| C123|       S|          4|       1|     1|    1|    0|
|2020-01-01T13:36:...|Allen, Mr. Willia...|  mal

In [193]:
df= df_denest.dropDuplicates()

In [194]:
df.show()

+--------------------+--------------------+------+----+------------------+-------+-------+--------+-----------+--------+------+-----+-----+
|           Timestamp|                Name|   Sex| Age|            Ticket|   Fare|  Cabin|Embarked|PassengerId|Survived|Pclass|SibSp|Parch|
+--------------------+--------------------+------+----+------------------+-------+-------+--------+-----------+--------+------+-----+-----+
|2020-01-01T13:30:...|Moubarek, Master....|  male|null|              2661|15.2458|   null|       C|         66|       1|     3|    1|    1|
|2020-01-01T13:45:...|Caldwell, Master....|  male|   0|            248738|   29.0|   null|       S|         79|       1|     2|    0|    2|
|2020-01-01T13:35:...|      Ali, Mr. Ahmed|  male|  24|SOTON/O.Q. 3101311|   7.05|   null|       S|        211|       0|     3|    0|    0|
|2020-01-01T13:43:...|Stead, Mr. Willia...|  male|  62|            113514|  26.55|    C87|       S|        253|       0|     1|    0|    0|
|2020-01-01T13:30:..

In [195]:
df = df.na.drop(how = "any",subset = ['Cabin','Age','Embarked'])
df.show()

+--------------------+--------------------+------+---+----------+--------+---------------+--------+-----------+--------+------+-----+-----+
|           Timestamp|                Name|   Sex|Age|    Ticket|    Fare|          Cabin|Embarked|PassengerId|Survived|Pclass|SibSp|Parch|
+--------------------+--------------------+------+---+----------+--------+---------------+--------+-----------+--------+------+-----+-----+
|2020-01-01T13:43:...|Stead, Mr. Willia...|  male| 62|    113514|   26.55|            C87|       S|        253|       0|     1|    0|    0|
|2020-01-01T13:30:...|Allison, Master. ...|  male|  0|    113781|  151.55|        C22 C26|       S|        306|       1|     1|    1|    2|
|2020-01-01T13:36:...|White, Mr. Richar...|  male| 21|     35281| 77.2875|            D26|       S|        103|       0|     1|    0|    1|
|2020-01-01T13:39:...|Lines, Miss. Mary...|female| 16|  PC 17592|    39.4|            D28|       S|        854|       1|     1|    0|    1|
|2020-01-01T13:32:..

In [196]:
df = df.drop('Pclass','SibSp','Parch')
df.show()

+--------------------+--------------------+------+---+----------+--------+---------------+--------+-----------+--------+
|           Timestamp|                Name|   Sex|Age|    Ticket|    Fare|          Cabin|Embarked|PassengerId|Survived|
+--------------------+--------------------+------+---+----------+--------+---------------+--------+-----------+--------+
|2020-01-01T13:43:...|Stead, Mr. Willia...|  male| 62|    113514|   26.55|            C87|       S|        253|       0|
|2020-01-01T13:30:...|Allison, Master. ...|  male|  0|    113781|  151.55|        C22 C26|       S|        306|       1|
|2020-01-01T13:36:...|White, Mr. Richar...|  male| 21|     35281| 77.2875|            D26|       S|        103|       0|
|2020-01-01T13:39:...|Lines, Miss. Mary...|female| 16|  PC 17592|    39.4|            D28|       S|        854|       1|
|2020-01-01T13:32:...|Davidson, Mr. Tho...|  male| 31|F.C. 12750|    52.0|            B71|       S|        672|       0|
|2020-01-01T13:33:...|Maioni, Mi

In [197]:
df = df.withColumn("Age", col("Age").cast(DoubleType())) \
  .withColumn("Fare", col("Fare").cast(DoubleType()))

In [198]:
df

DataFrame[Timestamp: string, Name: string, Sex: string, Age: double, Ticket: string, Fare: double, Cabin: string, Embarked: string, PassengerId: int, Survived: int]

In [199]:
df.write.json("titanic.json")