# Spark SQL with pySpark

Answer the following questions

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, DateType
from pyspark.sql.functions import *
from pyspark.errors import AnalysisException
from glob import glob

# Reading data from the cloud

You can use Spark to read data from a Cloud location the same as if it were a local file in your computer, you just need to specify the address, add [the appropiate JARs](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage#clusters) for it, and set the credentials file.

In [2]:
credentials_location = glob("/secrets/*json")[0] # this is the location of the JSON file for GCS authentication

spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.executor.memory", "3g")
    .config("spark.jars", "/usr/local/spark/jars/gcs-connector-hadoop3-latest.jar") # here we set up the spark JAR to read amd write Google Cloud Services
    .config("google.cloud.auth.service.account.json.keyfile", credentials_location) # here we tell Spark where to look for credentials
    .appName("GCS_DataLake")
    .getOrCreate()
)

Now we can read directly from GCS:

In [3]:
cloud_json_location = "gs://solutions-public-assets/bqetl/artist.json"

df = spark.read.json(cloud_json_location)
df.printSchema()

root
 |-- area: long (nullable = true)
 |-- begin_area: long (nullable = true)
 |-- begin_date_day: long (nullable = true)
 |-- begin_date_month: long (nullable = true)
 |-- begin_date_year: long (nullable = true)
 |-- comment: string (nullable = true)
 |-- edits_pending: long (nullable = true)
 |-- end_area: long (nullable = true)
 |-- end_date_day: long (nullable = true)
 |-- end_date_month: long (nullable = true)
 |-- end_date_year: long (nullable = true)
 |-- ended: boolean (nullable = true)
 |-- gender: long (nullable = true)
 |-- gid: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_updated: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sort_name: string (nullable = true)
 |-- type: long (nullable = true)



# Writing data

In order to write we use a [DataFrameWriter](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.html#pyspark.sql.DataFrameWriter) that can be created by simply calling the method `write` in our `DataFrame` object:

In [4]:
df.write.__class__

pyspark.sql.readwriter.DataFrameWriter

There are [options](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.option.html) you can set up, as well as a [writing mode](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html). The writing mode is common, but the options will depend on the format you want to set.

Writing to csv we can set up [specific csv options](https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option). For instance, here we write the first 50 rows with a header to a folder called `data/write_0`. If the folder exists already, we'll overwrite the files within.

Remember, Spark always writes to a folder.

In [5]:
df.limit(50).write.mode("overwrite").option("header","true").csv("data/write_0")

We can see the result with the `ls` command:

In [6]:
! ls -lh data/write_0/

total 8.0K
-rw-r--r-- 1 jovyan users 5.7K May 28 11:39 part-00000-0ddcd7dd-3f4a-4061-8eec-68e332e3781c-c000.csv
-rw-r--r-- 1 jovyan users    0 May 28 11:39 _SUCCESS


Use [partitionBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.partitionBy.html) to set up partitions within the writing. This will create a csv file for each element of the group you are setting.

In [7]:
df.limit(10000).write.mode("overwrite").option("header","true").partitionBy("gender").csv("data/write_1")

In [8]:
! ls -lh data/write_1/*

-rw-r--r-- 1 jovyan users    0 May 28 11:39  data/write_1/_SUCCESS

'data/write_1/gender=1':
total 152K
-rw-r--r-- 1 jovyan users 150K May 28 11:39 part-00000-f3952c4e-1221-4fb9-ba51-56ab2d8a05cc.c000.csv

'data/write_1/gender=2':
total 52K
-rw-r--r-- 1 jovyan users 49K May 28 11:39 part-00000-f3952c4e-1221-4fb9-ba51-56ab2d8a05cc.c000.csv

'data/write_1/gender=__HIVE_DEFAULT_PARTITION__':
total 1004K
-rw-r--r-- 1 jovyan users 1001K May 28 11:39 part-00000-f3952c4e-1221-4fb9-ba51-56ab2d8a05cc.c000.csv


Why do you think `__HIVE_DEFAULT_PARTITION__` means?

## Writing to csv: exercises

Take the first 100 rows and write them to a csv folder called `data/write_2`, using `;` as a separator and without a header. If the folder exists, this process should fail. Catch the exception and print it, it should say:
`[PATH_ALREADY_EXISTS] Path file:/home/jovyan/work/data/write_2 already exists. Set mode as "overwrite" to overwrite the existing path.
`

In [9]:
try:
    df.limit(100).write.mode("errorifexists").option("header","false").option("sep",";").csv("data/write_2")
except AnalysisException as e:
    print(e)

[PATH_ALREADY_EXISTS] Path file:/home/jovyan/work/data/write_2 already exists. Set mode as "overwrite" to overwrite the existing path.


In [10]:
! ls -lh data/write_2/

total 12K
-rw-r--r-- 1 jovyan users 11K May 28 09:45 part-00000-03853414-ff14-43d5-93b0-b5c33987d165-c000.csv
-rw-r--r-- 1 jovyan users   0 May 28 09:45 _SUCCESS


Take the first 10 rows and write in a folder called `data/write_3`, with header, using `,` as a separator and writing all None values as `NULL`. Partition by the first letter of the artist's name.

In [11]:
(
    df
    .limit(10)
    .withColumn("first_letter",col("name").substr(0,1))
    .write
    .mode("overwrite")
    .option("header","true")
    .option("nullValue","NULL")
    .partitionBy("first_letter")
    .csv("data/write_3")
)

In [12]:
! ls -lh data/write_3/*

-rw-r--r-- 1 jovyan users    0 May 28 11:39  data/write_3/_SUCCESS

'data/write_3/first_letter=C':
total 4.0K
-rw-r--r-- 1 jovyan users 338 May 28 11:39 part-00000-fbf0bacb-5b85-4d4a-8145-7f9444d8dbae.c000.csv

'data/write_3/first_letter=D':
total 4.0K
-rw-r--r-- 1 jovyan users 329 May 28 11:39 part-00000-fbf0bacb-5b85-4d4a-8145-7f9444d8dbae.c000.csv

'data/write_3/first_letter=G':
total 4.0K
-rw-r--r-- 1 jovyan users 360 May 28 11:39 part-00000-fbf0bacb-5b85-4d4a-8145-7f9444d8dbae.c000.csv

'data/write_3/first_letter=H':
total 4.0K
-rw-r--r-- 1 jovyan users 332 May 28 11:39 part-00000-fbf0bacb-5b85-4d4a-8145-7f9444d8dbae.c000.csv

'data/write_3/first_letter=J':
total 4.0K
-rw-r--r-- 1 jovyan users 649 May 28 11:39 part-00000-fbf0bacb-5b85-4d4a-8145-7f9444d8dbae.c000.csv

'data/write_3/first_letter=N':
total 4.0K
-rw-r--r-- 1 jovyan users 333 May 28 11:39 part-00000-fbf0bacb-5b85-4d4a-8145-7f9444d8dbae.c000.csv

'data/write_3/first_letter=T':
total 4.0K
-rw-r--r-- 1 jovyan users 460 Ma

## Writing to JSON: exercises

Writing to JSON is much the same as writing to csv: set the mode, the options and the partitions, and then use the `.json()` feature. You can read more about it in [the documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.json.html), including [options](https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option).


Write the first 50 rows into json in a folder called `data/write_4`

In [13]:
df.limit(50).write.mode("overwrite").json("data/write_4")

In [14]:
! ls -lh data/write_4/*

-rw-r--r-- 1 jovyan users 9.3K May 28 11:39 data/write_4/part-00000-0b23d268-4659-42f9-9c51-7b101db939d0-c000.json
-rw-r--r-- 1 jovyan users    0 May 28 11:39 data/write_4/_SUCCESS


#### JSON lines

[JSON lines](https://jsonlines.org/) is an alternative JSON format that you may encounter at times. Instead of the objects contained in a list style `[{...}, {...}]`, each object comes in a line in the file. Take the columns `name` and `gender`, remove the nulls and take the first 5 lines of this dataframe, and write it in JSON lines format, in a folder called `data/write_5`. 

In [15]:
df.select(["name","gender"]).dropna().limit(5).write.mode("overwrite").option("multiline","true").json("data/write_5")

In [16]:
! ls -lh data/write_5/*

-rw-r--r-- 1 jovyan users 181 May 28 11:39 data/write_5/part-00000-7970f2af-fad5-4c84-9a40-2884d53ad857-c000.json
-rw-r--r-- 1 jovyan users   0 May 28 11:39 data/write_5/_SUCCESS


In [17]:
! cat data/write_5/*

{"name":"Roberto Fioretti","gender":1}
{"name":"Drumcell","gender":1}
{"name":"Jelly Roll Morton","gender":1}
{"name":"Henri Wojtkowiak","gender":1}
{"name":"Bill Leeb","gender":1}


## Writing to parquet: exercises

[Writing to parquet](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.parquet.html) is, again much the same as csv, although in this case we won't have many `option`s.

Write the first 10000 to parquet in a folder called `data/write_6` partitioned by gender. This time I'm not giving you any hints, you can do it using the documentation and what you have done so far.

In [18]:
df.limit(10000).write.mode("overwrite").partitionBy("gender").parquet("data/write_6")

In [19]:
! ls -lh data/write_6/*

-rw-r--r-- 1 jovyan users    0 May 28 11:39  data/write_6/_SUCCESS

'data/write_6/gender=1':
total 108K
-rw-r--r-- 1 jovyan users 108K May 28 11:39 part-00000-55ed6a91-9502-47ab-8542-a5baa9f40d3f.c000.snappy.parquet

'data/write_6/gender=2':
total 40K
-rw-r--r-- 1 jovyan users 40K May 28 11:39 part-00000-55ed6a91-9502-47ab-8542-a5baa9f40d3f.c000.snappy.parquet

'data/write_6/gender=__HIVE_DEFAULT_PARTITION__':
total 700K
-rw-r--r-- 1 jovyan users 697K May 28 11:39 part-00000-55ed6a91-9502-47ab-8542-a5baa9f40d3f.c000.snappy.parquet


# Reading data: exercises

Now, read all the folders that you have  written so far. How can you do it? Take hints from [the documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html).

In [20]:
# read the folder data/write_0

spark.read.option("header","true").csv("data/write_0").show()

+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+------+--------------------+------+--------------------+--------------------+--------------------+----+
|area|begin_area|begin_date_day|begin_date_month|begin_date_year|comment|edits_pending|end_area|end_date_day|end_date_month|end_date_year|ended|gender|                 gid|    id|        last_updated|                name|           sort_name|type|
+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+------+--------------------+------+--------------------+--------------------+--------------------+----+
|NULL|      NULL|          NULL|            NULL|           NULL|   NULL|            0|    NULL|        NULL|          NULL|         NULL|false|  NULL|9bcb6a6c-6085-438...|565412|                NULL|       João Anicento|       João Anicento|   1|
|NULL|  

In [21]:
# read the folder data/write_1

spark.read.option("header","true").csv("data/write_1").show()

+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+--------------------+------+--------------------+--------------------+--------------------+----+------+
|area|begin_area|begin_date_day|begin_date_month|begin_date_year|comment|edits_pending|end_area|end_date_day|end_date_month|end_date_year|ended|                 gid|    id|        last_updated|                name|           sort_name|type|gender|
+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+--------------------+------+--------------------+--------------------+--------------------+----+------+
|NULL|      NULL|          NULL|            NULL|           NULL|   NULL|            0|    NULL|        NULL|          NULL|         NULL|false|9bcb6a6c-6085-438...|565412|                NULL|       João Anicento|       João Anicento|   1|  NULL|
|NULL|  

In [22]:
# read the folder data/write_2

spark.read.option("header","false").option("sep",";").csv("data/write_2").show()

+----+----+----+----+----+----+---+----+----+----+----+-----+----+--------------------+------+--------------------+--------------------+--------------------+----+
| _c0| _c1| _c2| _c3| _c4| _c5|_c6| _c7| _c8| _c9|_c10| _c11|_c12|                _c13|  _c14|                _c15|                _c16|                _c17|_c18|
+----+----+----+----+----+----+---+----+----+----+----+-----+----+--------------------+------+--------------------+--------------------+--------------------+----+
|NULL|NULL|NULL|NULL|NULL|NULL|  0|NULL|NULL|NULL|NULL|false|NULL|9bcb6a6c-6085-438...|565412|                NULL|       João Anicento|       João Anicento|   1|
|NULL|NULL|NULL|NULL|NULL|NULL|  0|NULL|NULL|NULL|NULL|false|NULL|3b807a0d-e5c3-45a...|565413|                NULL|    Cainã Cavalcante|    Cainã Cavalcante|   1|
|NULL|NULL|NULL|NULL|NULL|NULL|  0|NULL|NULL|NULL|NULL|false|NULL|9da965f3-e7fa-438...|565414|                NULL|      Humberto Pinho|      Humberto Pinho|   1|
|NULL|NULL|NULL|NULL|N

In [23]:
# read the folder data/write_3

spark.read.option("header","true").option("nullValue","NULL").csv("data/write_3").show()

+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+------+--------------------+------+------------+--------------------+--------------------+----+------------+
|area|begin_area|begin_date_day|begin_date_month|begin_date_year|comment|edits_pending|end_area|end_date_day|end_date_month|end_date_year|ended|gender|                 gid|    id|last_updated|                name|           sort_name|type|first_letter|
+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+------+--------------------+------+------------+--------------------+--------------------+----+------------+
|NULL|      NULL|          NULL|            NULL|           NULL|       |            0|    NULL|        NULL|          NULL|         NULL|false|  NULL|9bcb6a6c-6085-438...|565412|        NULL|       João Anicento|       João Anicento|   1|  

In [24]:
# read the folder data/write_4

spark.read.json("data/write_4").show()

+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+------+--------------------+------+--------------------+--------------------+--------------------+----+
|area|begin_area|begin_date_day|begin_date_month|begin_date_year|comment|edits_pending|end_area|end_date_day|end_date_month|end_date_year|ended|gender|                 gid|    id|        last_updated|                name|           sort_name|type|
+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+------+--------------------+------+--------------------+--------------------+--------------------+----+
|NULL|      NULL|          NULL|            NULL|           NULL|       |            0|    NULL|        NULL|          NULL|         NULL|false|  NULL|9bcb6a6c-6085-438...|565412|                NULL|       João Anicento|       João Anicento|   1|
|NULL|  

In [25]:
# read the folder data/write_5

spark.read.option("multiline","true").json("data/write_5").show()

+------+----------------+
|gender|            name|
+------+----------------+
|     1|Roberto Fioretti|
+------+----------------+



In [26]:
# read the folder data/write_6

spark.read.parquet("data/write_6").show()

+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+--------------------+------+--------------------+--------------------+--------------------+----+------+
|area|begin_area|begin_date_day|begin_date_month|begin_date_year|comment|edits_pending|end_area|end_date_day|end_date_month|end_date_year|ended|                 gid|    id|        last_updated|                name|           sort_name|type|gender|
+----+----------+--------------+----------------+---------------+-------+-------------+--------+------------+--------------+-------------+-----+--------------------+------+--------------------+--------------------+--------------------+----+------+
|NULL|      NULL|          NULL|            NULL|           NULL|       |            0|    NULL|        NULL|          NULL|         NULL|false|9bcb6a6c-6085-438...|565412|                NULL|       João Anicento|       João Anicento|   1|  NULL|
|NULL|  