<a href="https://colab.research.google.com/github/NinhDT22022522/Big_Data_Technologies/blob/main/demoSparkSQLPython.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Getting started with Spark: Spark SQL in Python

This tutorial is based on [Spark SQL Guide - Getting started](https://spark.apache.org/docs/latest/sql-getting-started.html).

For this demo we used the city of Vienna trees dataset ("Baumkataster") made available by [Open Data Österreich](https://www.data.gv.at) and downloadable from [here](https://www.data.gv.at/katalog/dataset/c91a4635-8b7d-43fe-9b27-d95dec8392a7) .

# Table of contents
1. [Spark session](#sparkSession)
2. [Spark configuration](#sparkConfiguration)

## Spark session <a name="sparkSession"></a>

We're going to start by creating a Spark _session_. Our Spark job will be named "Python Spark SQL basic example". `spark` is the variable holding our Spark session.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

Read the file into a Spark [_dataframe_](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes).

In [None]:
df = spark.read \
          .load("FME_BaumdatenBearbeitet_OGD_20190205.csv",
           format="csv", sep=";", header="true", encoding="iso-8859-1")

**Note:** we assume that the file `FME_BaumdatenBearbeitet_OGD_20190205.csv` is in your local directory. If at this point you get an error message that looks like `AnalysisException: 'Path does not exist` then check your [Spark configuration](#sparkConfig) for how to define the correct file path.

Show first three lines of Spark dataframe

In [None]:
df.show(3)

For pretty-printing you can use `toPandas()`

In [None]:
df.toPandas().head(3)

Show number of different trees (count German names in `df` and sort by count)

In [None]:
df.groupBy("NameDeutsch").count().orderBy('count', ascending=False).show()

An example of SQL query (see [Running SQL Queries Programmatically](https://spark.apache.org/docs/latest/sql-getting-started.html#running-sql-queries-programmatically)): let's sort trees by height ("Hoehe").

In [None]:
df.createOrReplaceTempView("baeume")

In [None]:
spark.sql("SELECT BaumNr, NameDeutsch, Hoehe, lat, lon FROM baeume order  by Hoehe desc").show()

The height data doesn't seem to be up-to-date.

## Spark configuration <a name="sparkConfiguration"></a>

Spark properties control most application settings and are configured separately for each application. These properties can be set directly on a `SparkConf` passed to your `SparkContext` (from [Apache Spark documentation](https://spark.apache.org/docs/latest/configuration.html#spark-properties)).  

We've already seen how to modify the `SparkConf` when we created our Spark application session with the command:
<pre>
    spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()
</pre>

Let us look at the rest of the Spark configuration.

In [None]:
from pyspark.conf import SparkConf
spark.sparkContext._conf.getAll()

The property `spark.app.name` is the name of our app that we just defined.

Another important property is `spark.master`. This defines the _master URL_  for the Spark application. A list of all admissible values for `spark.master` is given here: [master-urls](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls).

In this example the Spark master URL is `local[*]`, this means that our Spark application will run locally with as many worker threads as logical cores on our local machine.

If you have a Hadoop cluster available you can deploy your Spark application on Yarn by setting the option `spark.master = yarn`. Let's do that and then check the Spark configuration once again.

In [None]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .master('yarn') \
    .getOrCreate()

spark.sparkContext._conf.getAll()

With this configuration our Spark application will run on the Hadoop cluster and its resources will be managed by Yarn.

**Note:** If the Hadoop cluster is configured with HDFS as its default filesystem, then you need to upload your CSV file to Hadoop in order to be able to read it:
<code>
    hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv FME_BaumdatenBearbeitet_OGD_20190205.csv
</code>
and then you can just use `.load( ...) ` again.

In [None]:
%%bash
hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv
hdfs dfs -ls FME_BaumdatenBearbeitet_OGD_20190205.csv

In [None]:
df = spark.read \
          .load("FME_BaumdatenBearbeitet_OGD_20190205.csv",
           format="csv", sep=";", header="true", encoding="iso-8859-1")

Let's now re-run the previous commands. This time the application is going to be deployed on the cluster.

In [None]:
df.createOrReplaceTempView("baeume")
spark.sql("SELECT BaumNr, NameDeutsch, Hoehe, lat, lon FROM baeume order  by Hoehe desc").show()

**Note:** After you're done, it's important to close the Spark session in order to release cluster resources.

In [None]:
spark.stop()