# Dataframe from JSON


In [11]:
import pyspark
import pyspark.sql.functions as F
import pandas as pd
import re
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, ArrayType
from pyspark.sql.functions import udf



The 'pyarrow' lib provides a considerable performance improvement. But, it doesn't support ArrayType


In [12]:
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.appName('test').master("spark://127.0.0.1:7077")\
.getOrCreate()

The JSON file test.js

```json
[
    {
        "name": "Andre",
        "id": 1,
        "doc_list":[{"docid":"DOC001", "name":"bla001.txt"}, {"docid":"DOC002", "name":"bla002.txt"}],
    },

    {
        "name": "Noé",
        "id": 1,
        "doc_list":[{"docid":"DOC003", "name":"bla003.txt"}, {"docid":"DOC004", "name":"bla004.txt"}],
    }
]


```

The easiest way to read a local file is import it using Pandas and convert it into a DataFrame object later.



## The problem

Besides to read the JSON file, off course, suppose that its desirable to extract the doc file names associated to the people's names. Note that for the each name there is a list of docs with 'docid' and 'name' belonged to the docs. How can we get a list of doc names for each person name in a new column called 'doc_names'? Example:

```text
+-----+----------------------+
|name |doc_names             |
+-----+----------------------+
|Andre|bla001.txt, bla002.txt|
|Noé  |bla003.txt, bla004.txt|
+-----+----------------------+
```



In [13]:
# Reading file using Pandas. Not ideal, but I'm having problems to read from local files yet. But, I'll fix that some day
pdf = pd.read_json('test.json')
# Converting to Spark dataframe
sdf = spark.createDataFrame(pdf).drop("id")
# Showing the result
sdf.show(truncate=False)
sdf.schema

  Unsupported type in conversion to Arrow: ArrayType(StructType(List(StructField(docid,StringType,true),StructField(name,StringType,true))),true)
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  for column, series in pdf.iteritems():


+-----+------------------------------------------------------------------------------+
|name |doc_list                                                                      |
+-----+------------------------------------------------------------------------------+
|Andre|[{name -> bla001.txt, docid -> DOC001}, {name -> bla002.txt, docid -> DOC002}]|
|Noé  |[{name -> bla003.txt, docid -> DOC003}, {name -> bla004.txt, docid -> DOC004}]|
+-----+------------------------------------------------------------------------------+



StructType(List(StructField(name,StringType,true),StructField(doc_list,ArrayType(MapType(StringType,StringType,true),true),true)))

# DataFrame
Probably, the best way to do this once DataFrame API has been enreached with optimizations for the last years. 


In [14]:
# Work with single lines is better in this case. Explode will make each item from the list a row in DF
adf = sdf.withColumn("doc_ex", F.explode("doc_list"))
# Extracting the value of interest. In this case, 'name'(doc names)
adf = adf.withColumn("doc_name", adf.doc_ex.getItem("name")).drop("doc_ex")
adf.show(truncate=False)
# Time to revert the 'explode' effect. For this, let's group rows by name and use 'collect_list' as aggregate function
# in order to have a list again. But now, only with doc names.
ndf = adf.groupBy("name").agg(F.collect_list("doc_name").alias('doc_list'))
# Now we need only transform this list 
ndf = ndf.withColumn("doc_names", F.concat_ws(",", "doc_list")).drop("doc_list")
ndf.show(truncate=False)

+-----+------------------------------------------------------------------------------+----------+
|name |doc_list                                                                      |doc_name  |
+-----+------------------------------------------------------------------------------+----------+
|Andre|[{name -> bla001.txt, docid -> DOC001}, {name -> bla002.txt, docid -> DOC002}]|bla001.txt|
|Andre|[{name -> bla001.txt, docid -> DOC001}, {name -> bla002.txt, docid -> DOC002}]|bla002.txt|
|Noé  |[{name -> bla003.txt, docid -> DOC003}, {name -> bla004.txt, docid -> DOC004}]|bla003.txt|
|Noé  |[{name -> bla003.txt, docid -> DOC003}, {name -> bla004.txt, docid -> DOC004}]|bla004.txt|
+-----+------------------------------------------------------------------------------+----------+

+-----+---------------------+
|name |doc_names            |
+-----+---------------------+
|Andre|bla001.txt,bla002.txt|
|Noé  |bla003.txt,bla004.txt|
+-----+---------------------+



# UDF(Not recommended)
User Defined Functions(UDF) is a way to parse information from a column. In this case, the docs inside the JSON file is available in a list of objects which is parsed by pySpark and convenient converted into Python data structure objects. In this case, a list of dictionaries which is eaiser to manipulate. Brilliant!

In [15]:
@udf
def extract_doc(data_list):
    n = list()
    for li in data_list:
        n += [v for k,v in li.items() if k == 'name']

    return ','.join(n)


In [16]:
# Running the UDF called 'extract_doc_udf' and storing into a new column called 'udf_res'
dfu = sdf.withColumn('doc_names', extract_doc(F.col('doc_list'))).select('name','doc_names')
# Showing the result
dfu.show(truncate=False)

+-----+---------------------+
|name |doc_names            |
+-----+---------------------+
|Andre|bla001.txt,bla002.txt|
|Noé  |bla003.txt,bla004.txt|
+-----+---------------------+



# "UDF" using RDD(less recommended)
Alternativelly, it's possible to use a simple Python function passing the dataframe row as a parameter. But, to do that is necessary to use RDD framework instead UDF and then convert it to DataFrame object later. This way is useful when you parse different fields in a row in iteractive way. But, note that the performance will drop considerably depending on data amount.

In [17]:
def extract_doc_rdd(row):
    d = row.asDict()
    n = list()
    if 'doc_list' in d:
        for li in d['doc_list']:
            n += [v for k,v in li.items() if k == 'name']

        d['doc_names'] = ','.join(n)

    return Row(**d)
    


In [18]:
# Executing 'extract_doc_rdd' using map method from rdd object
rdd = sdf.rdd.map(extract_doc_rdd)
# Converting into a dataframe object
edf = rdd.toDF().select('name','doc_names')
# Showing the result
edf.show(truncate=False)

+-----+---------------------+
|name |doc_names            |
+-----+---------------------+
|Andre|bla001.txt,bla002.txt|
|Noé  |bla003.txt,bla004.txt|
+-----+---------------------+



# JSON from string

In [19]:
json_str = """[
    {
        "name":"Andre",
        "doc_list":[{"docid":"DOC001", "name":"bla001.txt"}, {"docid":"DOC002", "name":"bla002.txt"}]
    },
    {
        "name": "Noe",
        "doc_list":[{"docid":"DOC002", "name":"bla002.txt"}, {"docid":"DOC003", "name":"bla003.txt"}]
    }
]"""


In [20]:
json_str=re.sub(r"\n","",json_str)
sc = spark.sparkContext
schema = StructType([
    StructField("name", StringType(), True),
    StructField("doc_list", ArrayType(MapType(StringType(), StringType())), True)
])
df = spark.read.json(sc.parallelize([json_str]), schema)
df.show(truncate=False)

+-----+------------------------------------------------------------------------------+
|name |doc_list                                                                      |
+-----+------------------------------------------------------------------------------+
|Andre|[{docid -> DOC001, name -> bla001.txt}, {docid -> DOC002, name -> bla002.txt}]|
|Noe  |[{docid -> DOC002, name -> bla002.txt}, {docid -> DOC003, name -> bla003.txt}]|
+-----+------------------------------------------------------------------------------+



In [21]:
dfe = df.withColumn("item", F.explode("doc_list"))
dfe = dfe.withColumn("doc", dfe.item.getItem('name')).drop("item") \
        .groupBy('name').agg(F.collect_list("doc").alias("doclist")) \
        .withColumn('doc_names', F.concat_ws(',','doclist')).drop("doclist")
dfe.show(truncate=False)

+-----+---------------------+
|name |doc_names            |
+-----+---------------------+
|Noe  |bla002.txt,bla003.txt|
|Andre|bla001.txt,bla002.txt|
+-----+---------------------+



In [22]:
json_str2="""[
        {
        "name": "Andre",
        "id": 1,
        "l1":[{"a":1},{"b":2}]
        },
        {
        "name": "Noé",
        "id": 2,
        "l1":[{"c":3},{"d":4}]
        }
]"""

In [24]:
json_str=re.sub(r"\n","",json_str2)
sc = spark.sparkContext
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("l1", ArrayType(MapType(StringType(), StringType())), True)
])
df2 = spark.read.json(sc.parallelize([json_str]), schema)
df2.show(truncate=False)

+---+-----+--------------------+
|id |name |l1                  |
+---+-----+--------------------+
|1  |Andre|[{a -> 1}, {b -> 2}]|
|2  |Noé  |[{c -> 3}, {d -> 4}]|
+---+-----+--------------------+

