In [1]:
import findspark
findspark.init()
import pyspark
findspark.find()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
sc.uiWebUrl

'http://host.docker.internal:4041'

In [4]:
s6_df = spark.read.option("recursiveFileLookup", "true").json("bulk-s6", multiLine=True)
s6_df = s6_df.repartition(10)

In [5]:
s6_df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- awards: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amendment: struct (nullable = true)
 |    |    |    |-- amendsReleaseID: string (nullable = true)
 |    |    |    |-- changes: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- date: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- rationale: string (nullable = true)
 |    |    |    |-- releaseID: string (nullable = true)
 |    |    |-- amendments: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- contractPeriod: struct (nullable = true)
 |    |    |    |-- durationInDays: string (nullable = true)
 |    |    |    |-- endDate: string (nullable = true)
 |    |    |    |-- maxExtentDate: string (nullable = t

In [6]:
s6_df = s6_df.withColumn("explode_awards", explode_outer("awards")) \
.withColumn("explode_parties", explode_outer("parties"))

In [7]:
new_cols_select_contract = ", ".join(["explode_awards.contractPeriod." + col + " as contractPeriod_" + col for col in s6_df.select("explode_awards.contractPeriod.*").columns])
new_cols_select_parties = ", ".join(["explode_parties." + col + " as parties_" + col for col in s6_df.select("explode_parties.*").columns])
s6_df.createOrReplaceTempView("s6_df")

In [8]:
new_cols_select_contract

'explode_awards.contractPeriod.durationInDays as contractPeriod_durationInDays, explode_awards.contractPeriod.endDate as contractPeriod_endDate, explode_awards.contractPeriod.maxExtentDate as contractPeriod_maxExtentDate, explode_awards.contractPeriod.startDate as contractPeriod_startDate'

In [9]:
s6_df = spark.sql("select ocid, id, _id, " + new_cols_select_contract + ", " + new_cols_select_parties + " from s6_df")

In [10]:
schema = ("ocid", "id", col("_id.$oid").alias("_id.$oid"), "contractPeriod_durationInDays", "contractPeriod_endDate", "contractPeriod_maxExtentDate", "contractPeriod_startDate",
          "parties_name", col("parties_contactPoint.name").alias("parties_contactPoint_name"))

In [11]:
s6_df = s6_df.select(*schema)

In [12]:
s6_df.printSchema()

root
 |-- ocid: string (nullable = true)
 |-- id: string (nullable = true)
 |-- _id.$oid: string (nullable = true)
 |-- contractPeriod_durationInDays: string (nullable = true)
 |-- contractPeriod_endDate: string (nullable = true)
 |-- contractPeriod_maxExtentDate: string (nullable = true)
 |-- contractPeriod_startDate: string (nullable = true)
 |-- parties_name: string (nullable = true)
 |-- parties_contactPoint_name: string (nullable = true)



In [13]:
s6_df.write.parquet("s6_extracted_repartition") 