# Introduction to Spark

## Basic initialization

`SparkSession` is used to connect to the Spark Cluster.

In [None]:
from pyspark.sql import SparkSession

We will use Pandas to operate on the reduced data in the *driver program*.

In [None]:
import pandas as pd

Numpy will be always useful.

In [None]:
import numpy as np

Create a new session (or reuse an existing one).

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
spark

We can see that the session is established.

## Creating Spark Data Frames from Pandas

We can list the tables in our Spark Session, currently empty.

In [None]:
print(spark.catalog.listTables())

We can create a Pandas `DataFrame` with random values.

In [None]:
pd_temp = pd.DataFrame(np.random.random(100))

We can see on the plot that it is really random:

In [None]:
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')

In [None]:
pd_temp.plot()

Now we can convert it into Spark DataFrame:

In [None]:
spark_temp = spark.createDataFrame(pd_temp)

`createOrReplaceTempView` creates (or replaces if that view name already exists) a lazily evaluated "view" that you can then use like a table in Spark SQL. 

It does not persist to memory unless you cache (persist) the dataset that underpins the view.

In [None]:
spark_temp.createOrReplaceTempView("temp")

The created view is `TEMPORARY` which means it is not persistent.

In [None]:
print(spark.catalog.listTables())

In [None]:
spark_temp.show()

We can now use transformations on this DataFrame. The transformations are translated (compiled) to RDD transformations.

In [None]:
from pyspark.sql.functions import col, asc

In [None]:
spark_temp.filter((col('0') > 0.9)).show()

## Creating Spark Data Frames from input files

In [None]:
file_path = "airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path,header=True)

# Show the data
print(airports.show())

It may be useful to convert them to Pandas for quick browsing. 

**Warning!** This is not efficient for large datasets, as it requires performing actions on the dataset.

In [None]:
airports.toPandas()

### Running SQL queries on dataframes

In [None]:
airports.createOrReplaceTempView("airports")

In [None]:
# Get the first 10 rows of flights
query = "FROM airports SELECT * LIMIT 10"

airports10 = spark.sql(query)

# Show the results
airports10.show()

### More complex examples 

Read data from CSV file:
 * `inferSchema` - to detect which columns are numbers (not strigs!) - useful e.g. for sorting.
 * `header` - to read the firs line as column names

In [None]:
countries = spark.read.csv("countries of the world.csv",inferSchema=True,header=True)

In [None]:
countries.toPandas()

We can inspect the schema of the DataFrame.

In [None]:
countries.printSchema()

### Examples of SQL Queries

In [None]:
countries.createOrReplaceTempView("countries")

In [None]:
spark.sql("SELECT * FROM countries WHERE Region LIKE '%OCEANIA%'").toPandas()

### Queries using PySpark DSL

DSL = Domain Specific Language - API similar to natural or other language, implemented as library in another language.

List all the countries with the population > 38 million

In [None]:
countries.filter((col("Population") > 38000000)).orderBy("Population").toPandas()

Select all the countries from Europe

In [None]:
countries.select("Country", "Population").where(col("Region").like("%EUROPE%")).show()

Conditions in `where` clause can contain logical expressions.

In [None]:
countries.select("Country", "Population")\
.where((col("Region").like("%EUROPE%")) & (col("Population")> 10000000)).show()

### Aggregation

We can run aggregations with predefined functions (faster!):

In [None]:
from pyspark.sql.functions import sum

In [None]:
pd_countries = countries.select("Region", "Population").groupBy("Region").agg(sum("Population")).toPandas()

In [None]:
pd_countries

We can make the column name look better, by using `alias`:

In [None]:
pd_countries = countries.select("Region", "Population").groupBy("Region").agg(sum("Population").alias('Total')).toPandas()

In [None]:
pd_countries

### Plot examples 
Pandas DataFrames are useful for plotting using MatPlotLib:

In [None]:
pd_countries.plot(x='Region', y='Total',kind='bar', figsize=(10, 6))

## User defined functions for data manipulation
Our `countries` DataFrame has some problems:
 * missing values
 * some numbers use comma instead of point as floating point separator (e.g. Literacy  = 99,4)
 
We can clean the data using User Defined Functions (UDF)

In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

Define a Python function which coverts numbers with commas to `float`

In [None]:
def to_float (s) :
   return float(s.replace(',','.'))

Test that it works:

In [None]:
to_float('0,99')

Now define a Spark UDF:

In [None]:
float_udf = udf(to_float , FloatType())

Test it on a Data Frame

In [None]:
countries.withColumn("Literacy", float_udf("Literacy (%)"))

OK, we can see that the `Literacy` is now `float`

In [None]:
countries.show(50)

In [None]:
countries.where((col("Literacy") < 50) & (col("GDP ($ per capita)") > 700)).show()

Oops, what does it mean???

- some rows have empty values!

Before we can use the table, we need to remove empty rows. Otherwise our UDF will fail.

In [None]:
full_countries = countries.select('Country', 'Population', 'Literacy (%)', 'GDP ($ per capita)').na.drop()

We can now apply the new UDF to the Data Frame:

In [None]:
full_countries = full_countries.withColumn("Literacy", float_udf("Literacy (%)"))

In [None]:
full_countries.show(50)

In [None]:
full_countries.where((col("Literacy") < 50) & (col("GDP ($ per capita)") > 700)).show()

In [None]:
full_countries.toPandas().plot(x="Literacy",y="GDP ($ per capita)",kind="scatter",figsize=(10, 6))

# Useful information

 * https://spark.apache.org/docs/latest/quick-start.html
 * https://spark.apache.org/docs/latest/sql-programming-guide.html
 * https://pandas.pydata.org/pandas-docs/stable/visualization.html