### JSON Ingestion Schema Manipulation

In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))

if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')

!pip install pyspark==2.4.5

try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')
    
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

print("Apache Spark session created.")

Apache Spark session created.


In [6]:
from pyspark.sql.functions import (lit,col,concat,split)
import os
from pyspark.sql import functions as F
import json

In [3]:
current_dir = os.path.dirname("")
relative_path = "../03 The majestic role of the dataframe/data/Restaurants_in_Durham_County_NC.json"
absolute_file_path = os.path.join(current_dir, relative_path)

absolute_file_path

'../03 The majestic role of the dataframe/data/Restaurants_in_Durham_County_NC.json'

In [4]:
# Reads a JSON file called Restaurants_in_Durham_County_NC.json, stores
# it in a dataframe
df = spark.read.json(relative_path)
print("*** Right after ingestion")
df.show(5)
df.printSchema()
print("We have {} records.".format(df.count))

*** Right after ingestion
+----------------+--------------------+--------------------+--------------------+--------------------+
|       datasetid|              fields|            geometry|    record_timestamp|            recordid|
+----------------+--------------------+--------------------+--------------------+--------------------+
|restaurants-data|[, Full-Service R...|[[-78.9573299, 35...|2017-07-13T09:15:...|1644654b953d1802c...|
|restaurants-data|[, Nursing Home, ...|[[-78.8895483, 36...|2017-07-13T09:15:...|93573dbf8c9e799d8...|
|restaurants-data|[, Fast Food Rest...|[[-78.9593263, 35...|2017-07-13T09:15:...|0d274200c7cef50d0...|
|restaurants-data|[, Full-Service R...|[[-78.9060312, 36...|2017-07-13T09:15:...|cf3e0b175a6ebad2a...|
|restaurants-data|[,, [36.0556347, ...|[[-78.9135175, 36...|2017-07-13T09:15:...|e796570677f7c39cc...|
+----------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

root
 |-- datasetid: s

In [7]:
df =  df.withColumn("county", lit("Durham")) \
        .withColumn("datasetId", col("fields.id")) \
        .withColumn("name", col("fields.premise_name")) \
        .withColumn("address1", col("fields.premise_address1")) \
        .withColumn("address2", col("fields.premise_address2")) \
        .withColumn("city", col("fields.premise_city")) \
        .withColumn("state", col("fields.premise_state")) \
        .withColumn("zip", col("fields.premise_zip")) \
        .withColumn("tel", col("fields.premise_phone")) \
        .withColumn("dateStart", col("fields.opening_date")) \
        .withColumn("dateEnd", col("fields.closing_date")) \
        .withColumn("type", split(col("fields.type_description"), " - ").getItem(1)) \
        .withColumn("geoX", col("fields.geolocation").getItem(0)) \
        .withColumn("geoY", col("fields.geolocation").getItem(1))

In [8]:
df = df.withColumn("id", concat(col("state"), lit("_"), col("county"), lit("_"), col("datasetId")))

In [9]:
print("*** Dataframe transformed")
df.select('id',"state", "county", "datasetId").show(5)
df.printSchema()

*** Dataframe transformed
+---------------+-----+------+---------+
|             id|state|county|datasetId|
+---------------+-----+------+---------+
|NC_Durham_56060|   NC|Durham|    56060|
|NC_Durham_58123|   NC|Durham|    58123|
|NC_Durham_70266|   NC|Durham|    70266|
|NC_Durham_97837|   NC|Durham|    97837|
|NC_Durham_60690|   NC|Durham|    60690|
+---------------+-----+------+---------+
only showing top 5 rows

root
 |-- datasetId: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 | 

In [10]:
print("*** Looking at partitions")
partitionCount = df.rdd.getNumPartitions()
print("Partition count before repartition: {}".format(partitionCount))

*** Looking at partitions
Partition count before repartition: 1


In [11]:
df = df.repartition(4)
print("Partition count after repartition: {}".format(df.rdd.getNumPartitions()))

Partition count after repartition: 4


In [12]:
# Good to stop SparkSession at the end of the application
spark.stop()

In [13]:
df = spark.read.json(relative_path)

Py4JJavaError: An error occurred while calling o108.json.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:745)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1486)
	at org.apache.spark.sql.execution.datasources.text.TextFileFormat.buildReader(TextFileFormat.scala:106)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:103)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:98)
	at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:64)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
