# ETL I

## Task I - Define correct schema for json data

* load json dataset
* look at the infered schema (is it inferred correctly or is it wrong?)
* define the schema explicitly
* see what happens if the schema is defined wrong

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, count, explode, split, regexp_replace, collect_list

from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

import os

In [26]:
spark = (
    SparkSession
    .builder
    .appName('ETL I')
    .getOrCreate()
)

In [27]:
print(spark.version)

3.0.0


The input dataset is in the json format and is in the `data/questios-json` folder. Below is the path definition:

In [28]:
base_path = os.getcwd()

project_path = ('/').join(base_path.split('/')[0:-3]) 

data_input_path = os.path.join(project_path, 'data/questions-json')

output_path = os.path.join(project_path, 'output/questions-transformed')

#### First let Spark infer the schema:

In [29]:
questionsDF = (
    spark
    .read
    .format('json')
    .option('path', data_input_path)
    .load()
)

Note: Here we have only 8 json files. In case where you have lots of json files and you know that each file has the same schema, consider loading only one file to check the schema. Inferring the schema from many files can be expensive.

In [30]:
questionsDF.printSchema()

root
 |-- accepted_answer_id: long (nullable = true)
 |-- answers: long (nullable = true)
 |-- body: string (nullable = true)
 |-- comments: long (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- question_id: long (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: string (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- views: long (nullable = true)



#### Define the schema:

As you can see the `creation_date` was inferred as `StringType` however in reality it is a Timestamp. Define the schema by hand and provide it to create the DataFrame

In [31]:
json_schema = StructType(
    [
        StructField('question_id', LongType(), True),
        StructField('creation_date', TimestampType(), True),
        StructField('title', StringType(), True),
        StructField('body', StringType(), True),
        StructField('tags', StringType(), True),
        StructField('accepted_answer_id', LongType(), True),
        StructField('answers', LongType(), True),
        StructField('comments', LongType(), True),
        StructField('user_id', LongType(), True),
        StructField('views', LongType(), True),
    ]
)

In [32]:
questionsDF = (
    spark
    .read
    .schema(json_schema)
    .format('json')
    .option('path', data_input_path)    
    .load()
)

In [33]:
questionsDF.show(truncate=10, n=5)

+-----------+-------------+----------+----------+----------+------------------+-------+--------+-------+-----+
|question_id|creation_date|     title|      body|      tags|accepted_answer_id|answers|comments|user_id|views|
+-----------+-------------+----------+----------+----------+------------------+-------+--------+-------+-----+
|   61416257|   2020-04...|Ag-Grid...|<p>Ag-g...| <ag-grid>|          61420331|      1|       1|4292512|   21|
|   61482176|   2020-04...|Optiona...|<p>My c...|<c#><fu...|              null|      0|       4|3603502|   28|
|   61919808|   2020-05...|Matchin...|<p>Ther...|<python...|              null|      0|       3|4453105|   40|
|   60340057|   2020-02...|Knockou...|<p>I'm ...|<knocko...|          60340749|      1|       0|3157885|   35|
|   62001217|   2020-05...|Python ...|<p>I ru...|<python...|              null|      1|       0|4220475|   17|
+-----------+-------------+----------+----------+----------+------------------+-------+--------+-------+-----+
o

In [34]:
questionsDF.count()

195179

#### What happens if we make a mistake:

* set `title` as `LongType` in the defined schema

Hint
* Different things will happen depending on the `mode` option, where `mode` is one of the following:
    * FAILFAST
    * DROPMALFORMED
    * PERMISSIVE (default)


In [35]:
# Define the schema with a mistake in the title column:

wrong_schema = StructType(
    [
        StructField('question_id', LongType(), True),
        StructField('creation_date', TimestampType(), True),
        StructField('title', LongType(), True),
        StructField('body', StringType(), True),
        StructField('tags', StringType(), True),
        StructField('accepted_answer_id', LongType(), True),
        StructField('answers', LongType(), True),
        StructField('comments', LongType(), True),
        StructField('user_id', LongType(), True),
        StructField('views', LongType(), True),
    ]
)

In [36]:
(
    spark
    .read
    .schema(wrong_schema)
    .format('json')
    .option('mode', 'PERMISSIVE') # this is the default
    .option('path', data_input_path)    
    .load()
).show(truncate=15, n=5)

+-----------+---------------+-----+---------------+---------------+------------------+-------+--------+-------+-----+
|question_id|  creation_date|title|           body|           tags|accepted_answer_id|answers|comments|user_id|views|
+-----------+---------------+-----+---------------+---------------+------------------+-------+--------+-------+-----+
|   61416257|2020-04-24 2...| null|<p>Ag-grid's...|      <ag-grid>|          61420331|      1|       1|4292512|   21|
|   61482176|2020-04-28 1...| null|<p>My curren...|<c#><functio...|              null|      0|       4|3603502|   28|
|   61919808|2020-05-20 2...| null|<p>There are...|<python><reg...|              null|      0|       3|4453105|   40|
|   60340057|2020-02-21 1...| null|<p>I'm hopin...|<knockout.js...|          60340749|      1|       0|3157885|   35|
|   62001217|2020-05-25 1...| null|<p>I run a p...|<python><mys...|              null|      1|       0|4220475|   17|
+-----------+---------------+-----+---------------+-----

In [37]:
# shows no records
(
    spark
    .read
    .schema(wrong_schema)
    .format('json')
    .option('mode', 'DROPMALFORMED')
    .option('path', data_input_path)    
    .load()
).show(truncate=15, n=5)

+-----------+-------------+-----+----+----+------------------+-------+--------+-------+-----+
|question_id|creation_date|title|body|tags|accepted_answer_id|answers|comments|user_id|views|
+-----------+-------------+-----+----+----+------------------+-------+--------+-------+-----+
+-----------+-------------+-----+----+----+------------------+-------+--------+-------+-----+



In [38]:
# throws an error
(
    spark
    .read
    .schema(wrong_schema)
    .format('json')
    .option('mode', 'FAILFAST')
    .option('path', data_input_path)    
    .load()
).show(truncate=15, n=5)

Py4JJavaError: An error occurred while calling o170.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 27, 192.168.0.12, executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$9(JsonDataSource.scala:144)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.RuntimeException: Failed to parse a value for data type bigint (current token: VALUE_STRING).
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:478)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$readFile$7(JsonDataSource.scala:140)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 25 more
Caused by: java.lang.RuntimeException: Failed to parse a value for data type bigint (current token: VALUE_STRING).
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$failedConversion$1.applyOrElse(JacksonParser.scala:368)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$failedConversion$1.applyOrElse(JacksonParser.scala:348)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeConverter$5$1.applyOrElse(JacksonParser.scala:182)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeConverter$5$1.applyOrElse(JacksonParser.scala:182)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:336)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeConverter$5(JacksonParser.scala:182)
	at org.apache.spark.sql.catalyst.json.JacksonParser.org$apache$spark$sql$catalyst$json$JacksonParser$$convertObject(JacksonParser.scala:386)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:89)
	at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:88)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:336)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeStructRootConverter$3(JacksonParser.scala:88)
	at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$parse$2(JacksonParser.scala:453)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2539)
	at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:448)
	... 27 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


#### Note

* To read more about schema inferrence and schema evolution of json files in Spark SQL, read my article: https://medium.com/swlh/notes-about-json-schema-handling-in-spark-sql-be1e7f13839d


## Task II - Transfrom json to parquet and convert String column to an array

* convert column tags to array of tags 

#### Convert tags to an array

Hint
* use split to get an array
* explode the array
* use regexp_replace
* groupBy + collect_list
* join with original questions DataFrame

In [39]:
resultDF = (
    questionsDF
    .withColumn('tags_arr', split('tags', '><'))
    .withColumn('tag', explode('tags_arr'))
    .withColumn('tag', regexp_replace('tag', '(<|>)', ''))
    .groupBy('question_id')
    .agg(collect_list('tag').alias('tags'))
    .join(questionsDF.drop('tags'), 'question_id')
)

#### Save the data

Hint:

* repartition the data to 8 partitions before saving
 * this will create 8 files

In [40]:
(
    resultDF
    .repartition(8)
    .write
    .mode('overwrite')
    .option('path', output_path)
    .save()
)

<b>Check if we saved the data correctly:</b>

In [42]:
checkDF = (
    spark
    .read
    .parquet(output_path)
)

In [43]:
checkDF.count()

195179

In [None]:
checkDF.show(truncate=10, n=5)

In [None]:
checkDF.select('tags').show(truncate=False, n=10)

In [None]:
checkDF.printSchema()

In [None]:
spark.stop()