# Lab: Use Spark SQL for ETL

This lab extends the work done in the Spark DataFrames lab, and so assumes you have the data in the relevant places and
have completed those exercises.

--> 1. Create a view from the webpages dataframe you created in the previous lab.

In [0]:
webRDD = sc.textFile("dbfs:/FileStore/tables/extracts/webpage/*"). \
         map( lambda line : line.split()).  \
         map( lambda values :[int(values[0]),values[1],values[2]])

webDF = webRDD.toDF()
webDF = webDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "webpage").withColumnRenamed("_3", "assoc_files")

webDF.show(5)

In [0]:
#webDF = spark.read.parquet("/FileStore/webpage_files/", format="parquet")
webDF.show(5)

In [0]:
webDF.createOrReplaceTempView("webview")

In [0]:
# to view this in spark catalog

spark.catalog.listTables()

--> 2. We can create a new view from an existing view using SQL's \CREATE OR REPLACE TEMP VIEW <view name> AS"
which can be followed by a SELECT statement correspond to the items to be extracted from the existing view. For
example, to create a new view from the webpages view which contains all the rows corresponding to webpage starting
with s, we could run

In [0]:
spark.sql("CREATE OR REPLACE TEMP VIEW s_webview AS SELECT * FROM webview WHERE webpage LIKE 's%'")
spark.sql("SELECT * FROM s_webview").show(5)

In [0]:
spark.catalog.listTables()

--> 3. Check that this view appears to contain the expected items by converting it to a dataframe and showing the contents.

In [0]:
webDF_2 = spark.sql("SELECT * FROM s_webview")
webDF_2.show(5)

--> 4. Create a new view which contains the webpage and associated files columns from the view you created in
Point 1.

In [0]:
spark.sql("CREATE OR REPLACE TEMP VIEW webfiles_tmp AS SELECT webpage, assoc_files FROM webview")

spark.sql("SELECT * FROM webfiles_tmp").show(5)

--> 5. Check the view was created using listTables.

In [0]:
spark.catalog.listTables()

--> 6. This time, we'll manipulate the data using SQL rather than converting to RDDs. We're aiming to use the SQL function
explode which takes an array and separates out its elements into multiple rows (like flatMap). So first we need an
array to be made out of associated files. There's also a function split in SQL, which can be used to split a string
into an array with a specific delimiter. For example, check the contents of the view created by

In [0]:
spark.sql("CREATE OR REPLACE TEMP VIEW spliteg AS SELECT SPLIT(webpage,'_') as split_wp, assoc_files FROM webfiles_tmp")

In [0]:
spark.sql("SELECT * FROM spliteg").show(5)

In [0]:
spark.sql("CREATE OR REPLACE TEMP VIEW explodeeg AS SELECT EXPLODE(SPLIT(webpage,'_')) as explode_wp, assoc_files FROM webfiles_tmp")
spark.sql("SELECT * FROM explodeeg").show(5)

--> 7. Create a view which converts the associated files column in the view from Point 4 into an array, splitting on the
commas.

In [0]:
spark.sql("CREATE OR REPLACE TEMP VIEW assoc_split AS SELECT webpage, SPLIT(assoc_files, ',') as split_assfiles FROM webfiles_tmp")
spark.sql("SELECT * FROM assoc_split").show(5)

In [0]:
# A bit more advanced --> Explode takes only arrays/lists.. so split() will convert the string into an array
spark.sql("CREATE OR REPLACE TEMP VIEW assoc_explode AS SELECT webpage, EXPLODE(SPLIT(assoc_files, ',')) as explode_assfiles FROM webfiles_tmp")
spark.sql("SELECT * FROM assoc_explode").show(5)

--> 8. Now we move onto using explode. The definition of SQL explode is:

explode(expr) - Separates the elements of array expr into multiple rows, or the elements of map expr into
multiple rows and columns. Unless specified otherwise, uses the default column name col for elements of the
array or key and value for the elements of the map.

Take a look at the new view created by

In [0]:
from pyspark.sql.functions import explode

spliteg_DF = spark.sql("SELECT * FROM spliteg")

spliteg_DF.withColumn("indiv_webfiles", explode("split_wp")).show(5)

In [0]:
# One more
spark.sql("SELECT * FROM spliteg").withColumn("indiv_webfiles", explode("split_wp")).show(5)

--> 9. Create a new view from the view you created in Point 7 which explodes the associated files array as was wanted.

In [0]:
spark.sql("SELECT * FROM assoc_split").withColumn("Indiv_files", explode("split_assfiles")).show(5)

In [0]:
# If we use explode_outer --> It would use null values in the column also, and create a new row with null.
# If we use posexplode    --> It would bring 2 new columns - one is position/index and other is column

In [0]:
# To make it more reader friendly, use drop()
from pyspark.sql.functions import split

spark.sql("SELECT * FROM webfiles_tmp") \
  .withColumn("Indiv_files", explode(split("assoc_files", ","))) \
  .withColumn("Indiv_webs", explode(split("webpage", "_"))) \
  .drop("webpage", "assoc_files") \
  .show(30)
    

## Challenge: perform the ETL solely in DataFrames

The split and explode functions also exist in DataFrames. The optional challenge section will guide you through implementing
the solution purely in DataFrames.

--> 10. Again, we can use a split function to turn the contents of a DataFrame column into an array. Check the contents of

In [0]:
from pyspark.sql.functions import split

splitDF = webDF.withColumn('splitWebpage', split(webDF["webpage"], "_"))
splitDF.show(10)

--> 11. Create DF with an additional column which corresponds to an array made of the comma separated elements of the
original associated files column of the webpages DataFrame.

In [0]:
from pyspark.sql.functions import split

splitDF_ass = webDF.withColumn('splitAssFiles', split(webDF["assoc_files"], ","))
splitDF_ass.show(10)

--> 12. Now consider the contents of the following DataFrame:

In [0]:
from pyspark.sql.functions import explode
explodedDF = splitDF.withColumn('explodedWebpage', explode(splitDF.splitWebpage))
explodedDF.show(10)

--> 13. Enhance your DataFrame from Point 11 with the exploded array column.

In [0]:
splitDF_ass.withColumn("Exploded_Ass_Files", explode("splitAssFiles")).show(10)