<a href="https://colab.research.google.com/github/Fuenfgeld/2022TeamADataEngineeringBC/blob/save-to-database/PySparkTutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
!pip install pyspark pandas



In [None]:
!wget -cq https://raw.githubusercontent.com/Fuenfgeld/2022TeamADataEngineeringBC/ca4b2ecc9e9ee242037d11c27edd4f4ad770e7ee/iris.json

In [None]:
!wget -cq https://raw.githubusercontent.com/Fuenfgeld/2022TeamADataEngineeringBC/PySpark/iris2.json

##1. Loading Data

Before we can analyze data we have to load it into our working environment. PySpark has a lot of functions that can deal with all kinds of formats from `.csv` to `.json`. The basic unit of data storage in PySpark is the so called `DataFrame` class.

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
df1 = spark.read.option("multiline",True).json('iris.json')
print(f"Object Type: {type(df1)}\n")
print("Column Info:")
df1.printSchema()
print("Summary Statistics of columns:")
df1.describe().show()
print("Overview Dataframe:")
df1.show(10)

Object Type: <class 'pyspark.sql.dataframe.DataFrame'>

Column Info:
root
 |-- petalLength: double (nullable = true)
 |-- petalWidth: double (nullable = true)
 |-- sepalLength: double (nullable = true)
 |-- sepalWidth: double (nullable = true)
 |-- species: string (nullable = true)

Summary Statistics of columns:
+-------+------------------+------------------+------------------+-------------------+---------+
|summary|       petalLength|        petalWidth|       sepalLength|         sepalWidth|  species|
+-------+------------------+------------------+------------------+-------------------+---------+
|  count|               150|               150|               150|                150|      150|
|   mean|3.7580000000000027| 1.199333333333334| 5.843333333333335|  3.057333333333334|     null|
| stddev|1.7652982332594662|0.7622376689603467|0.8280661279778637|0.43586628493669793|     null|
|    min|               1.0|               0.1|               4.3|                2.0|   setosa|
|    m

##2. Basic transformations
Some of the most basic functionalities of tables are that we can access specific chunks of the table's rows and columns as well as create new rows and columns.

### 2.1. Accessing Rows

Since Spark was concieved to work with distributed data there is no simple way to access rows at will.

If you want to do so anyways you have the possibility to pull the data onto your local node.

`DataFrame.collect()` collects the distributed data to the driver side as local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [None]:
# Returns list of Row objects
local_df1 = df1.collect()
print(f"Type of entries: {type(local_df1[0])}\n")
print(f"Entries: {local_df1[:5]}")

Type of entries: <class 'pyspark.sql.types.Row'>

Entries: [Row(petalLength=1.4, petalWidth=0.2, sepalLength=5.1, sepalWidth=3.5, species='setosa'), Row(petalLength=1.4, petalWidth=0.2, sepalLength=4.9, sepalWidth=3.0, species='setosa'), Row(petalLength=1.3, petalWidth=0.2, sepalLength=4.7, sepalWidth=3.2, species='setosa'), Row(petalLength=1.5, petalWidth=0.2, sepalLength=4.6, sepalWidth=3.1, species='setosa'), Row(petalLength=1.4, petalWidth=0.2, sepalLength=5.0, sepalWidth=3.6, species='setosa')]


### 2.2. Accessing Columns

Accessing columns doesn't come with the difficulties associated with handling rows. If we want to get specific columns we can simply do so through the `.select()` method. 

In [None]:
df1.select("petalLength").show(5)

+-----------+
|petalLength|
+-----------+
|        1.4|
|        1.4|
|        1.3|
|        1.5|
|        1.4|
+-----------+
only showing top 5 rows



It is also possible to choose mutiple columns. Notice that we can adress our columns with `DataFrame.NameOfColumn` instead of `"NameOfColumn"`.

In [None]:
petalLength = df1.petalLength
petalWidth = df1.petalWidth
df1.select(petalLength, petalWidth).show(5)

+-----------+----------+
|petalLength|petalWidth|
+-----------+----------+
|        1.4|       0.2|
|        1.4|       0.2|
|        1.3|       0.2|
|        1.5|       0.2|
|        1.4|       0.2|
+-----------+----------+
only showing top 5 rows



### 2.3. Concatenating DataFrames

Suppose we have a dataset that is split into multiple DataFrames. Wouldn't it be practical to combine them into one table ? `pyspark` provides such a funcionality via the `.union()` method.

In [None]:
df2 = spark.read.json('iris2.json')
df2.show()
df1.union(df2)

+-----------+----------+-----------+----------+---------+
|petalLength|petalWidth|sepalLength|sepalWidth|  species|
+-----------+----------+-----------+----------+---------+
|        5.1|       1.8|        5.9|       3.0|virginica|
+-----------+----------+-----------+----------+---------+



DataFrame[petalLength: double, petalWidth: double, sepalLength: double, sepalWidth: double, species: string]

### 2.4 Adding Columns

In case we want to add columns we can do so via the `.withColumn()` method. Note that we have to specify the name of the column which is in this case `petalSum`. Usually the new column is a function of one or more of the old columns. 

In [None]:
df_extraCol = df1.withColumn('newColumn', df1.petalWidth + df1.petalLength)
df_extraCol.show(5)

+-----------+----------+-----------+----------+-------+------------------+
|petalLength|petalWidth|sepalLength|sepalWidth|species|         newColumn|
+-----------+----------+-----------+----------+-------+------------------+
|        1.4|       0.2|        5.1|       3.5| setosa|1.5999999999999999|
|        1.4|       0.2|        4.9|       3.0| setosa|1.5999999999999999|
|        1.3|       0.2|        4.7|       3.2| setosa|               1.5|
|        1.5|       0.2|        4.6|       3.1| setosa|               1.7|
|        1.4|       0.2|        5.0|       3.6| setosa|1.5999999999999999|
+-----------+----------+-----------+----------+-------+------------------+
only showing top 5 rows



The name `'newColumn'` isn't really informative. It's therefore hard for the user to deduce that is it the sum of `'petalWidth'` and `'petalLength'`. So why not rename it to something more indicative ? We can do this via the `.withColumnRenamed()` method.

In [None]:
df_extraCol = df_extraCol.withColumnRenamed('newColumn','petalSum')
df_extraCol.show(5)

+-----------+----------+-----------+----------+-------+------------------+
|petalLength|petalWidth|sepalLength|sepalWidth|species|          petalSum|
+-----------+----------+-----------+----------+-------+------------------+
|        1.4|       0.2|        5.1|       3.5| setosa|1.5999999999999999|
|        1.4|       0.2|        4.9|       3.0| setosa|1.5999999999999999|
|        1.3|       0.2|        4.7|       3.2| setosa|               1.5|
|        1.5|       0.2|        4.6|       3.1| setosa|               1.7|
|        1.4|       0.2|        5.0|       3.6| setosa|1.5999999999999999|
+-----------+----------+-----------+----------+-------+------------------+
only showing top 5 rows



### 2.5. Removing Columns

In order to get rid of our new column `.drop()` can be used. In contrast to `.select()`, this method removes the specified column completely instead of returning it as slice ot the table.




In [None]:
df1 = df_extraCol.drop(df_extraCol.petalSum)
df1.show(5)

+-----------+----------+-----------+----------+-------+
|petalLength|petalWidth|sepalLength|sepalWidth|species|
+-----------+----------+-----------+----------+-------+
|        1.4|       0.2|        5.1|       3.5| setosa|
|        1.4|       0.2|        4.9|       3.0| setosa|
|        1.3|       0.2|        4.7|       3.2| setosa|
|        1.5|       0.2|        4.6|       3.1| setosa|
|        1.4|       0.2|        5.0|       3.6| setosa|
+-----------+----------+-----------+----------+-------+
only showing top 5 rows



### 2.6. Basic Data Cleaning

Just as in the hospital, hygiene is of great importance to working with data, sometimes rows contain entries that make dealing with our data more difficult or lower its quality (information pollution). Two examples come to mind: Duplicate entries could bias introduce into our data which negatively impacts the performance of a lot of machine learning algorithms.

The second example would be null entries which might render some rows useless due to the fact that most algorithms generally can't handle such entries. Luckily PySpark provides us with two methods `.dropna()` and `.dropDuplicates()` to get rid of such problematic rows.



In [None]:
df1 = df1.dropna()

In [None]:
df1 = df1.dropDuplicates() 

Although our dataframe is now free of unwanted entries we might still want to put further restrictions on the data we want to keep. 

### 2.7. Conditional Selection of Rows.

In 2.1. we explained that directly accessing rows of a DataFrame comes with some caveats, it is however possible to indirectly access rows without pulling all the data onto your local node. This is done via conditional selection where we select rows based on user given conditions via the `.filter()` method. This means however that we don't know which rows we will obtain in the end, hence why we speak of indirect access.

Let's say we want to get only the flowers of type `"virginica"` we then have to write the following:

In [None]:
df_virginica = df1.filter(df1.species == "virginica")
df_virginica.show(5)

+-----------+----------+-----------+----------+---------+
|petalLength|petalWidth|sepalLength|sepalWidth|  species|
+-----------+----------+-----------+----------+---------+
|        6.0|       1.8|        7.2|       3.2|virginica|
|        5.6|       2.1|        6.4|       2.8|virginica|
|        5.1|       2.3|        6.9|       3.1|virginica|
|        6.1|       2.5|        7.2|       3.6|virginica|
|        5.7|       2.3|        6.9|       3.2|virginica|
+-----------+----------+-----------+----------+---------+
only showing top 5 rows



### 2.8 Alter data based using Lambda

Using the `map` function columns and the full structure can be altered using Lambdas.

In [None]:
from pyspark.sql import types, functions

data = [
        ('Max', 'Mustermann', 'm', '10', '1954', '2020'),
        ('Erika', 'Mustermann', 'w', '12', '1994', None)
        ]
schema = ['firstname', 'lastname', 'gender', 'salary', 'birthyear', 'deathyear']

frame = spark.createDataFrame(data = data, schema = schema)
frame.show()

# To cast or alter a column, just override it
parsed = frame.withColumn('salary', functions.col('salary').cast(types.IntegerType()))

# Single column transformations
doubled = parsed.withColumn('salary', functions.col('salary') * 2)

# Conditional replacements are possible using functions
replaceNullValue = doubled.withColumn(
    'deathyear', 
    functions.when(functions.col('deathyear').isNull(), '2022')
    .otherwise(functions.col('deathyear'))
)

# Spark has its own mapping language which can be used in withColumn
withAge = replaceNullValue.withColumn('age', functions.col('deathyear') - functions.col('birthyear'))

# To replace a value with a value in a dictionary, you replace the value on the whole
# dataset and restrict the changes to the columns in which it should be replaced
genders = { 'm': 'male', 'w': 'female' }
withGender = withAge.replace(genders, subset='gender')
withGender.show()

# Lambdas can also be used. They are slower but more powerful and can alter the schema
converted = (withGender.rdd
  .map(lambda row: (row[0] + ' ' + row[1], row[2], row[3], row[6]))
  .toDF(['name', 'gender', 'salary', 'age'])
)
converted.show()

# The transformations can also be written functionally
functional = (spark.createDataFrame(data = data, schema = schema)
    .withColumn('salary', functions.col('salary').cast(types.IntegerType()))
    .withColumn('salary', functions.col('salary') * 2)
    .withColumn(
        'deathyear', 
        functions.when(functions.col('deathyear').isNull(), '2022')
        .otherwise(functions.col('deathyear'))
    )
    .withColumn('age', functions.col('deathyear') - functions.col('birthyear'))
    .replace(genders, subset='gender')
    .rdd
    .map(lambda row: (row[0] + ' ' + row[1], row[2], row[3], row[6]))
    .toDF(['name', 'gender', 'salary', 'age'])
)
functional.show()

+---------+----------+------+------+---------+---------+
|firstname|  lastname|gender|salary|birthyear|deathyear|
+---------+----------+------+------+---------+---------+
|      Max|Mustermann|     m|    10|     1954|     2020|
|    Erika|Mustermann|     w|    12|     1994|     null|
+---------+----------+------+------+---------+---------+

+---------+----------+------+------+---------+---------+----+
|firstname|  lastname|gender|salary|birthyear|deathyear| age|
+---------+----------+------+------+---------+---------+----+
|      Max|Mustermann|  male|    20|     1954|     2020|66.0|
|    Erika|Mustermann|female|    24|     1994|     2022|28.0|
+---------+----------+------+------+---------+---------+----+

+----------------+------+------+----+
|            name|gender|salary| age|
+----------------+------+------+----+
|  Max Mustermann|  male|    20|66.0|
|Erika Mustermann|female|    24|28.0|
+----------------+------+------+----+

+----------------+------+------+----+
|            name

### Join data based on key

In [None]:
people = spark.createDataFrame(data = [( 'Max', 1 ), ( 'Erika', 0 )],
                               schema = ['name', 'cityId'])
people.show()
cities = spark.createDataFrame(data=[(0, 'Mannheim'), (1, 'Frankfurt')],
                               schema=['cityId', 'city'])
cities.show()

combined = people.join(cities, ['cityId'], "inner").drop('cityId')
combined.show()

+-----+------+
| name|cityId|
+-----+------+
|  Max|     1|
|Erika|     0|
+-----+------+

+------+---------+
|cityId|     city|
+------+---------+
|     0| Mannheim|
|     1|Frankfurt|
+------+---------+

+-----+---------+
| name|     city|
+-----+---------+
|Erika| Mannheim|
|  Max|Frankfurt|
+-----+---------+



### 2.8. Conclusion

You learned how to perform some basic transformations of the table, but maybe you also want to apply more complex functions to the dataframe's rows or columns such as summary statistics. In the next chapter we are going to take a look at advanced transformations.

##3. Advanced Transformations
Advanced transformations are where PySpark really shines enabling us to execute very complex queries using simple syntax to extract valuable insights from our data. In this chapter we will see the power of methods such as `.groupBy()`, `.join()` especially in combination with more complex functions that are provided by the `functions` module. 

### 3.1 Why use Spark functions ?
In general it is possible to use functions from other libraries such as `numpy` on Spark `DataFrame` objects, however this defeats the purpose of Spark which is its ability to optimize the performance of transformation pipelines due to lazy execution. 

This is why the `functions` exists which provides use with a copious amount of functions for all kinds of purposes.

Suppose we want to take the mean petal length of the virginica species. We can reuse the DataFrame `df_virginica` that we created before.


In [None]:
from pyspark.sql.functions import mean
virginica_mean_petalLength = df_virginica.select(mean("petalLength"))
# Execute pipeline.
virginica_mean_petalLength = virginica_mean_petalLength.collect()
print(f"Type of virginica_mean_petalLength: {type(virginica_mean_petalLength[0])}\n")
print(f"Mean petal length of virginica species: {virginica_mean_petalLength[0]}")

Type of virginica_mean_petalLength: <class 'pyspark.sql.types.Row'>

Mean petal length of virginica species: Row(avg(petalLength)=5.561224489795917)


## 4 Save results to database

The easiest way to save the data is to convert the dataframe to a pandas dataframe and have pandas generate all SQL statements by itself.

In [13]:
import sqlite3

connection = sqlite3.connect('my-database.sqlite')

# Please note that tuples with a single value always end with a comma.
# e.g. ('Max', ). If that comma does not exist, the value will be seen as a string
# instead of a tuple.
frame = spark.createDataFrame(data=[('Max', ), ('Erika', )], schema=['name'])
frame.toPandas().to_sql('table_name', connection, if_exists='replace', index=True)

print(connection.execute('SELECT * FROM table_name;').fetchall())

connection.close()

[(0, 'Max'), (1, 'Erika')]
