In [1]:
import json
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("DE-348").getOrCreate()

Sample input:

```
{"msg": {"id": 0, "name": "test 0"}, "attr": {"timestamp": 1555705314, "tf": true, "details": [{"a": 0, "b":100}, {"a": 10, "b": 1000}]}}
{"msg": {"id": 1, "name": "test 1"}, "attr": {"timestamp": 1555715314, "tf": true, "details": [{"a": 1, "b":101}, {"a": 11, "b": 1001}]}}
{"msg": {"id": 2, "name": "test 2"}, "attr": {"timestamp": 1555725314, "tf": true, "details": [{"a": 2, "b":102}, {"a": 12, "b": 1002}]}}
{"msg": {"id": 3, "name": "test 3"}, "attr": {"timestamp": 1555735314, "tf": false, "details": []}}
{"msg": {"id": 4, "name": "test 4"}, "attr": {"timestamp": 1555745314, "tf": true, "details": [{"a": 4, "b":104}, {"a": 14, "b": 1004}]}}
{"msg": {"id": 5, "name": "test 5"}, "attr": {"timestamp": 1555755314, "tf": true, "details": [{"a": 5, "b":105}, {"a": 15, "b": 1005}]}}
{"msg": {"id": 6, "name": "test 6"}, "attr": {"timestamp": 1555765314, "tf": false, "details": []}}
{"msg": {"id": 7, "name": "test 7"}, "attr": {"timestamp": 1555775314, "tf": true, "details": [{"a": 7, "b":107}, {"a": 17, "b": 1007}]}}
{"msg": {"id": 8, "name": "test 8"}, "attr": {"timestamp": 1555785314, "tf": true, "details": [{"a": 8, "b":108}, {"a": 18, "b": 1008}]}}
{"msg": {"id": 9, "name": "test 9"}, "attr": {"timestamp": 1555795314, "tf": false, "details": []}}
```

In [3]:
testDF = spark.read.json("test_input1.json")
testDF.show()

+--------------------+-----------+
|                attr|        msg|
+--------------------+-----------+
|[[[0, 100], [10, ...|[0, test 0]|
|[[[1, 101], [11, ...|[1, test 1]|
|[[[2, 102], [12, ...|[2, test 2]|
|[[], false, 15557...|[3, test 3]|
|[[[4, 104], [14, ...|[4, test 4]|
|[[[5, 105], [15, ...|[5, test 5]|
|[[], false, 15557...|[6, test 6]|
|[[[7, 107], [17, ...|[7, test 7]|
|[[[8, 108], [18, ...|[8, test 8]|
|[[], false, 15557...|[9, test 9]|
+--------------------+-----------+



In [4]:
import pyspark.sql.functions as pysql_fn
import pyspark.sql.types as pysql_types

def flatten_structs(df):
    """
    This function will flatten all StructTypes from a pyspark dataframe recursively.
    For example, if each record contains key/value pairs such as {message: {...}, data: {...}}, these are flattened as:
    {message-field1, message-field2, .... message-fieldx, data-field1, ... data-fieldx}
    However, more complex types such as arrays are not handled. Therefore, an array of structs remains as is
    :param df: a Spark dataframe
    :return: a flattened Spark dataframe
    """
    # Get a list of columns that are StructType
    # df.schema.fields returns [StructField(name, dataType, nullable)]
    f_struct_type_cols = filter(lambda field: isinstance(field.dataType, pysql_types.StructType), df.schema.fields)
    struct_type_cols = list(f_struct_type_cols)

    if len(struct_type_cols) == 0:
        return df

    unnested_struct_cols = [{
        'name': field.name,
        'fields': [inner_field.name for inner_field in field.dataType.fields]
    } for field in struct_type_cols]

    # Remove the StructType columns from the list of all columns of the DF
    non_struct_cols = [
        pysql_fn.col(col_name) for col_name in df.schema.names if col_name not in {
            struct_col.name for struct_col in struct_type_cols
        }
    ]

    select_cols = []
    for col_dict in unnested_struct_cols:
        for col_field in col_dict['fields']:
            col_name = "{}.{}".format(col_dict['name'], col_field)
            col_name_alias = "{}-{}".format(col_dict['name'], col_field)
            select_cols.append(pysql_fn.col(col_name).alias(col_name_alias))

    unnested_df = df.select(non_struct_cols + select_cols)

    return flatten_structs(unnested_df)

In [5]:
flatTestDF = flatten_structs(testDF)
flatTestDF.show()

+--------------------+-------+--------------+------+--------+
|        attr-details|attr-tf|attr-timestamp|msg-id|msg-name|
+--------------------+-------+--------------+------+--------+
|[[0, 100], [10, 1...|   true|    1555705314|     0|  test 0|
|[[1, 101], [11, 1...|   true|    1555715314|     1|  test 1|
|[[2, 102], [12, 1...|   true|    1555725314|     2|  test 2|
|                  []|  false|    1555735314|     3|  test 3|
|[[4, 104], [14, 1...|   true|    1555745314|     4|  test 4|
|[[5, 105], [15, 1...|   true|    1555755314|     5|  test 5|
|                  []|  false|    1555765314|     6|  test 6|
|[[7, 107], [17, 1...|   true|    1555775314|     7|  test 7|
|[[8, 108], [18, 1...|   true|    1555785314|     8|  test 8|
|                  []|  false|    1555795314|     9|  test 9|
+--------------------+-------+--------------+------+--------+



In [6]:
jsTestDF = flatTestDF.withColumn("js_ad", pysql_fn.to_json(pysql_fn.col("attr-details")))
jsTestDF.show()
jsTestDF.printSchema()

+--------------------+-------+--------------+------+--------+--------------------+
|        attr-details|attr-tf|attr-timestamp|msg-id|msg-name|               js_ad|
+--------------------+-------+--------------+------+--------+--------------------+
|[[0, 100], [10, 1...|   true|    1555705314|     0|  test 0|[{"a":0,"b":100},...|
|[[1, 101], [11, 1...|   true|    1555715314|     1|  test 1|[{"a":1,"b":101},...|
|[[2, 102], [12, 1...|   true|    1555725314|     2|  test 2|[{"a":2,"b":102},...|
|                  []|  false|    1555735314|     3|  test 3|                  []|
|[[4, 104], [14, 1...|   true|    1555745314|     4|  test 4|[{"a":4,"b":104},...|
|[[5, 105], [15, 1...|   true|    1555755314|     5|  test 5|[{"a":5,"b":105},...|
|                  []|  false|    1555765314|     6|  test 6|                  []|
|[[7, 107], [17, 1...|   true|    1555775314|     7|  test 7|[{"a":7,"b":107},...|
|[[8, 108], [18, 1...|   true|    1555785314|     8|  test 8|[{"a":8,"b":108},...|
|   

The `interimDF` below will be roughly what the data will look like coming from the interim data bucket. We will not have the `attr-details` column from above, but rather the json string encoded version as seen in `js_ad`. Ultimately this is achieved with the pyspark function `to_json`.

While ideally we would then be able to read this column back into the correct format using the `from_json` function, this function requires specifying a schema, which we will not have. Therefore, I convert the `js_ad` column to a parseable json row as an RDD and then read it into a new dataframe using `spark.read.json`. I can then `explode` it to extract the details and join it to the column I need from the `interimDF` to produce what will finally be the `finalDF`.

In [7]:
interimDF = jsTestDF.drop("attr-details")
interimDF.show()

+-------+--------------+------+--------+--------------------+
|attr-tf|attr-timestamp|msg-id|msg-name|               js_ad|
+-------+--------------+------+--------+--------------------+
|   true|    1555705314|     0|  test 0|[{"a":0,"b":100},...|
|   true|    1555715314|     1|  test 1|[{"a":1,"b":101},...|
|   true|    1555725314|     2|  test 2|[{"a":2,"b":102},...|
|  false|    1555735314|     3|  test 3|                  []|
|   true|    1555745314|     4|  test 4|[{"a":4,"b":104},...|
|   true|    1555755314|     5|  test 5|[{"a":5,"b":105},...|
|  false|    1555765314|     6|  test 6|                  []|
|   true|    1555775314|     7|  test 7|[{"a":7,"b":107},...|
|   true|    1555785314|     8|  test 8|[{"a":8,"b":108},...|
|  false|    1555795314|     9|  test 9|                  []|
+-------+--------------+------+--------+--------------------+



In [20]:
jsIntDF = spark.read.json(interimDF.rdd.map(lambda row: {"msg-id": row['msg-id'], "details": json.loads(row['js_ad'])}))
jsIntDF.show()
jsIntDF.printSchema()

+--------------------+------+
|             details|msg-id|
+--------------------+------+
|[[0, 100], [10, 1...|     0|
|[[1, 101], [11, 1...|     1|
|[[2, 102], [12, 1...|     2|
|                  []|     3|
|[[4, 104], [14, 1...|     4|
|[[5, 105], [15, 1...|     5|
|                  []|     6|
|[[7, 107], [17, 1...|     7|
|[[8, 108], [18, 1...|     8|
|                  []|     9|
+--------------------+------+

root
 |-- details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: long (nullable = true)
 |-- msg-id: long (nullable = true)



In [16]:
expIntDF = jsIntDF.select("msg-id", pysql_fn.explode("details").alias("detail"))
expIntDF.show()

+------+----------+
|msg-id|    detail|
+------+----------+
|     0|  [0, 100]|
|     0|[10, 1000]|
|     1|  [1, 101]|
|     1|[11, 1001]|
|     2|  [2, 102]|
|     2|[12, 1002]|
|     4|  [4, 104]|
|     4|[14, 1004]|
|     5|  [5, 105]|
|     5|[15, 1005]|
|     7|  [7, 107]|
|     7|[17, 1007]|
|     8|  [8, 108]|
|     8|[18, 1008]|
+------+----------+



In [13]:
expFlatIntDF = expIntDF.select("msg-id", "detail.a", "detail.b")
expFlatIntDF.show()

+------+---+----+
|msg-id|  a|   b|
+------+---+----+
|     0|  0| 100|
|     0| 10|1000|
|     1|  1| 101|
|     1| 11|1001|
|     2|  2| 102|
|     2| 12|1002|
|     4|  4| 104|
|     4| 14|1004|
|     5|  5| 105|
|     5| 15|1005|
|     7|  7| 107|
|     7| 17|1007|
|     8|  8| 108|
|     8| 18|1008|
+------+---+----+



In [14]:
finalDF = interimDF.select("msg-id", "msg-name", "attr-timestamp", "attr-tf").join(expFlatIntDF, ["msg-id"], how="left_outer")
finalDF.orderBy("msg-id").show()
finalDF.printSchema()

+------+--------+--------------+-------+----+----+
|msg-id|msg-name|attr-timestamp|attr-tf|   a|   b|
+------+--------+--------------+-------+----+----+
|     0|  test 0|    1555705314|   true|   0| 100|
|     0|  test 0|    1555705314|   true|  10|1000|
|     1|  test 1|    1555715314|   true|   1| 101|
|     1|  test 1|    1555715314|   true|  11|1001|
|     2|  test 2|    1555725314|   true|   2| 102|
|     2|  test 2|    1555725314|   true|  12|1002|
|     3|  test 3|    1555735314|  false|null|null|
|     4|  test 4|    1555745314|   true|   4| 104|
|     4|  test 4|    1555745314|   true|  14|1004|
|     5|  test 5|    1555755314|   true|   5| 105|
|     5|  test 5|    1555755314|   true|  15|1005|
|     6|  test 6|    1555765314|  false|null|null|
|     7|  test 7|    1555775314|   true|   7| 107|
|     7|  test 7|    1555775314|   true|  17|1007|
|     8|  test 8|    1555785314|   true|   8| 108|
|     8|  test 8|    1555785314|   true|  18|1008|
|     9|  test 9|    1555795314