In [1]:
import findspark
from pyspark.sql import SparkSession
findspark.init()
spark=SparkSession\
.builder\
.appName('Testing')\
.getOrCreate()
spark

In [12]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,ArrayType,MapType,LongType
schema = StructType([
    StructField('invoice_no',LongType()),
    StructField('country',StringType()),
    StructField('timestamp',StringType()),
    StructField('type',StringType()),
    StructField(
        'items',ArrayType(
            StructType([
                StructField('SKU',StringType(),False),
                StructField('title',StringType(),False),
                StructField('unit_price',FloatType(),False),
                StructField('quantity',IntegerType(),False),
            ])
        )
    )    
])

In [13]:
json_list = [['{"invoice_no": 154132541653705,"country": "United Kingdom","timestamp": "2020-09-18 10:55:23","type": "ORDER","items": [{"SKU": "21485","title": "RETROSPOT HEART HOT WATER BOTTLE","unit_price": 4.95,"quantity": 6},{"SKU": "23499","title": "SET 12 VINTAGE DOILY CHALK","unit_price": 0.42,"quantity": 2}]}']]
# x=[(1,2,'3'),(4,5,'6')]
rdd=spark.sparkContext.parallelize(json_list)

In [14]:
df=rdd.toDF(['value'])
df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                          |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"invoice_no": 154132541653705,"country": "United Kingdom","timestamp": "2020-09-

In [15]:
from pyspark.sql.functions import from_json,col
df1 = df.select(from_json(col("value").cast("string"),schema).alias('data')).select('data.*')
df1.show(truncate=False)

+---------------+--------------+-------------------+-----+--------------------------------------------------------------------------------------------------+
|invoice_no     |country       |timestamp          |type |items                                                                                             |
+---------------+--------------+-------------------+-----+--------------------------------------------------------------------------------------------------+
|154132541653705|United Kingdom|2020-09-18 10:55:23|ORDER|[{21485, RETROSPOT HEART HOT WATER BOTTLE, 4.95, 6}, {23499, SET 12 VINTAGE DOILY CHALK, 0.42, 2}]|
+---------------+--------------+-------------------+-----+--------------------------------------------------------------------------------------------------+



In [16]:
from pyspark.sql.functions import udf

@udf(returnType=FloatType())
def totalCost(arr1,arr2):
    result = 0
    for x,y in zip(arr1,arr2):
        result+=x*y
    return result

@udf(returnType=IntegerType())
def totalItems(arr1):
    return sum(arr1)
isOrder = udf(lambda x:1 if x=='ORDER' else 0)
isReturn = udf(lambda x:1 if x=='RETURN' else 0)

df1.withColumn('total_cost',totalCost(df1.items.quantity,df1.items.unit_price))\
.withColumn('total_items',totalItems(df1.items.quantity))\
.withColumn('is_order',isOrder(df1.type))\
.withColumn('is_return',isReturn(df1.type))\
.drop()
.show()

+---------------+--------------+-------------------+-----+--------------------+----------+-----------+--------+---------+
|     invoice_no|       country|          timestamp| type|               items|total_cost|total_items|is_order|is_return|
+---------------+--------------+-------------------+-----+--------------------+----------+-----------+--------+---------+
|154132541653705|United Kingdom|2020-09-18 10:55:23|ORDER|[{21485, RETROSPO...| 30.539999|          8|       1|        0|
+---------------+--------------+-------------------+-----+--------------------+----------+-----------+--------+---------+



In [106]:
df1.select(df1.items.quantity,df1.items.unit_price).show(truncate=False)

+--------------+----------------+
|items.quantity|items.unit_price|
+--------------+----------------+
|[6, 2]        |[4.95, 0.42]    |
+--------------+----------------+



In [4]:
import json
print(json.loads(json_str))

{'invoice_no': 154132541653705, 'country': 'United Kingdom', 'timestamp': '2020-09-18 10:55:23', 'type': 'ORDER', 'items': [{'SKU': '21485', 'title': 'RETROSPOT HEART HOT WATER BOTTLE', 'unit_price': 4.95, 'quantity': 6}, {'SKU': '23499', 'title': 'SET 12 VINTAGE DOILY CHALK', 'unit_price': 0.42, 'quantity': 2}]}


In [None]:
from pyspark.sql.functions import udf
@udf(returnType=StringType())
def jsonToDf(data_str:str):
    key_list = ['SKU','title','unit_price','quantity']
    data_dict = json.loads(data_str)
    dataItem_list,data_dict['items'] = data_dict['items'],{
        
    }
    for x in dataItem_list:
        