# Introduction into Apache Spark Programming
---
__Santander Consumer Bank Germany__  
__CTO/Architecture__  

__Version:__ 1.0  
__Date:__ 2023-10-19  
__Github:__

This document is intended to serve as a first introduction to programming Apache Spark using the Python framework PySpark. Basic knowledge of the Python programming language is required. The most important concepts and data structures of Spark, in particular Resilient Distributed Datasets (RDD) and DataFrames, are presented using short example programs. Jupyter Notebook and Microsoft Visual Code are used as development environments. Many of the examples are taken from the [official Spark documentation](https://spark.apache.org/docs/latest/index.html) and some have been slightly modified.

## What is Apache Spark?

Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

## Installation

There are mainly three options to run Apache Spark and Jupyter. You can use a ready-to-go Docker image, setup a Linux environment in a Virtual Machine and install Spark and Jupyter or follow a procedure that installs Spark and Jupyter on your Windows system.

### Docker
The eastiest and quickest way to use Spark and Jupyter is Docker. A Docker image containing Jupyter and the Spark stack is available at [https://hub.docker.com/r/jupyter/pyspark-notebook](https://hub.docker.com/r/jupyter/pyspark-notebook) and can be pulled from the Dockerhub repositories. With Docker installed on your system execute the following command:

```bash
docker pull jupyter/pyspark-notebook
```
The image is about 1,7 GB so the download and might take some time depending on your Internet connection. Once the image is downloaded and installed in your local Docker repository you can start the container with

```bash
docker run -it --rm -p 8888:8888 -v /home/peter/Projects/spark:/home/jovyan/work jupyter/pyspark-notebook

```
I have used the -v switch which allows me to persist the data that I might generate in the Jupyter Notebook of the Docker container on my local hard disc (Docker containers can't persist data per se). To do so I've mapped my local host folder (/home/peter/Projects/spark) to the default folder of Jupyter Notebook in the Docker container, which is “/home/jovyan/work”. In case you don't need to persist data just execute the _run_ command without -v:

```bash
docker run -it --rm -p 8888:8888 jupyter/pyspark-notebook

```

![URL](jupyter_in_docker.jpg)

When the startup process is finished copy the URL that I've marked with a red line in the picture above and paste it into your preferred browser. After a couple of seconds the Jupyter Notebook should appear and you are ready start programming Spark.

### Linux Virtual Machine

The installation of Apache Spark and Jupyter that is described below was carried out on the Debian-based Linux distribution Xubuntu 22.04.3 LTS. If you want to play around with Spark on Linux you can run a virtual machine using the free [VMWare Player](https://www.vmware.com/products/workstation-player.html) and use a [Xubuntu ISO image](https://xubuntu.org/download/) for installation. The setup procedure is only slightly more complex than the Docker option. 

As a Linux user with sudo permissions execute the following commands:

```bash
sudo apt-get update
sudo apt install openjdk-19-jdk openjdk-19-jre
wget https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -zxvf spark-3.5.0-bin-hadoop3.tgz
sudo mv spark-3.5.0-hadoop3 /opt
```

Add the following lines to the file /etc/profiles to publish some environment variables:<br>

```bash
export SPARK_HOME=/opt/spark-3.5.0-bin-hadoop3
export PATH=\\$PATH:\\$SPARK_HOME/bin:\\$SPARK_HOME/sbin
export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
```

Save the file. Back on shell source the profile file:<br>

```bash
source /etc/profile
```

Change into your project folder, create and activate a virtual environment and install PySpark and Jupyter:<br>

```bash
cd /path/to/my/project/folder
python -m venv ./venv
. ./venv/bin/activate
pip install pyspark
pip install jupyter
```
Now start Juypther Notebooks:

```bash
pyspark
```

### PySpark and Jupyter on Windows

Installing PySpark on Windows and the use of Visual Code for programming is described very well in the following YouTube video: [How to run PySpark in Visual Studio Code (on Windows)](https://www.youtube.com/results?search_query=how+to+run+pyspark+in+visual+studio+code). Download and install Microsoft Visual Code and follow the instructions shown in the video. To install Jupyter and integrate it with Spark check this out: [Get Started with PySpark and Jupyter Notebook in 3 Minutes](https://medium.com/sicara/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f#id_token=eyJhbGciOiJSUzI1NiIsImtpZCI6ImM2MjYzZDA5NzQ1YjUwMzJlNTdmYTZlMWQwNDFiNzdhNTQwNjZkYmQiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJodHRwczovL2FjY291bnRzLmdvb2dsZS5jb20iLCJhenAiOiIyMTYyOTYwMzU4MzQtazFrNnFlMDYwczJ0cDJhMmphbTRsamRjbXMwMHN0dGcuYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJhdWQiOiIyMTYyOTYwMzU4MzQtazFrNnFlMDYwczJ0cDJhMmphbTRsamRjbXMwMHN0dGcuYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJzdWIiOiIxMDA3ODAxOTg4MzMwNTI2NDU2MzMiLCJlbWFpbCI6InBldGVyLmViZWwuanJAZ29vZ2xlbWFpbC5jb20iLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZSwibmJmIjoxNjk3MTAxMzIyLCJuYW1lIjoiUGV0ZXIgRWJlbCIsInBpY3R1cmUiOiJodHRwczovL2xoMy5nb29nbGV1c2VyY29udGVudC5jb20vYS9BQ2c4b2NJRVBQeHlmeXJ6MFRCQzI5OXl0LUJ6RS1fN2htMDY1cDNNeXRuYjkwMXI9czk2LWMiLCJnaXZlbl9uYW1lIjoiUGV0ZXIiLCJmYW1pbHlfbmFtZSI6IkViZWwiLCJsb2NhbGUiOiJkZSIsImlhdCI6MTY5NzEwMTYyMiwiZXhwIjoxNjk3MTA1MjIyLCJqdGkiOiJiN2JkNGYyZTAwNDEyZjAyYjMxYjUzNmJkY2E0NWE3NzI5YTNiYzBiIn0.royHv8iBfsa4UYr-EyKjtQKCkUyIVuUn-KqsxPzW8rGneRtgQxm0i_pZEEnA9Gx6BbR99mFTNro2u9AMVKS-apgUfxSmcZe3AODY6g3TzsuC1wMyA02JulJIfPzXdsPKfrgYEtGAofL7A4_isrvlOWvNEHLDn2ZeidwZVAtoKzb4PYkkeLj6wRWYpmCESJUWW3gsKNEj5UoJiPV7no8nGB4fMEzLFOCL_LoCOqLP4f4Ig5fv_HM68dU5Exzpl4xpzLUBtmQNiEOeuBu3bZoaN30AdbDqkIF-4kgld18_Pp8MSGoWNpDhX4lXYKp4QU1G-xM0Kq7qDcntXP6rLAjmHw)

## Important concepts
In Apache Spark, both __SparkSession__ and __SparkContext__ are important components, but they serve different purposes and are used in different contexts.

[SparkSession](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala) source code (Scala) at Github  
[SparkContext](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala) source code (Scala) at Github

### SparkContext
* _SparkContext_ is the entry point to Spark and represents the connection to a Spark cluster. It is the core component of Spark and is responsible for coordinating the execution of tasks in a cluster.
* It is primarily used for low-level operations such as creating Resilient Distributed Datasets (RDDs), which are the fundamental data structure in Spark.
* _SparkContext_ is not aware of structured data like DataFrames, and it does not provide a high-level API for structured data processing.

### SparkSession
* _SparkSession_ was introduced in Spark 2.0 and is an abstraction built on top of the _SparkContext_. It's designed for higher-level, more user-friendly structured data processing, including working with DataFrames and Datasets.
* _SparkSession_ is a unified entry point for working with structured data in Spark. It provides a convenient API for creating, manipulating, and querying structured data.
* It includes functionalities for working with structured data sources like Parquet, Avro, ORC, JSON, and more, and it also allows you to interact with structured data using SQL queries via the Spark SQL module.

In summary, the key difference between _SparkContext_ and _SparkSession_ is their purpose and level of abstraction: _SparkContext_ is the fundamental entry point for Spark and is primarily used for low-level operations and RDD-based data processing whereas _SparkSession_ is a higher-level, more user-friendly entry point for structured data processing, including DataFrames and Spark SQL.

<style>
mark {
    color:red;
}
</style>

In practice, if you are working with structured data, it is recommended to use SparkSession for its ease of use and more powerful capabilities. If you are working with older Spark code or low-level operations, you may still encounter SparkContext, but modern Spark applications often use SparkSession for structured data processing.

In [None]:
# SparkSession und SparkContext

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Introduction to Spark").getOrCreate()

sc = spark.sparkContext

print(spark)
print(sc)

# RDDs and DataFrames

[RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)  
[SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)  
[Getting Started - PySpark](https://spark.apache.org/docs/latest/api/python/getting_started/index.html)

## What are RDDs?
A __RDD__, or Resilient Distributed Dataset, is a fundamental data structure in Apache Spark. RDDs are designed to provide an efficient and fault-tolerant way to distribute and process data across a cluster of computers. They have several key characteristics:

* __Resilient:__ RDDs are "resilient" because they can recover from node failures. If a node in the cluster fails, the data and the operations applied to it can be reconstructed on another node, ensuring fault tolerance.
* __Distributed:__ RDDs are distributed across multiple nodes in a cluster, allowing for parallel processing of data. This distribution is a fundamental feature of Spark, making it suitable for handling large datasets.
* __Immutable:__ Once created, RDDs are immutable, meaning their data cannot be changed. Instead, transformations applied to an RDD result in the creation of new RDDs, which helps maintain data consistency and enables lineage information for fault recovery.
* __In-Memory:__ RDDs are typically stored in memory, which makes them faster to access and process compared to reading and writing to disk. This in-memory storage is one of the key performance benefits of Spark.
* __Lazy execution:__ When Spark transforms data, it does not immediately compute the transformation but plans how to compute later. When actions such as collect() are explicitly called, the computation starts.

Common operations on RDDs include mapping, filtering, reducing, and joining data.

While RDDs were the primary data abstraction in earlier versions of Spark, more recent versions introduced __DataFrames__ and Datasets, which provide higher-level abstractions and optimizations for structured data processing. Nevertheless, RDDs still play a crucial role in Spark, particularly when you need fine-grained control over data and operations or when working with unstructured data.

## What are DataFrames?
Spark __DataFrames__ are a distributed collection of data organized into named columns, much like a table in a relational database or a data frame in R or Python. They are a higher-level abstraction built on top of Resilient Distributed Datasets (RDDs) in Apache Spark, designed to provide a more structured and optimized way to work with data. DataFrames were introduced in Spark 2.0 to address the limitations of RDDs, particularly when dealing with structured and semi-structured data.

Key features and characteristics of Spark DataFrames include:
* __Schema:__ DataFrames have a well-defined schema that describes the structure of the data, including column names and data types. This schema information helps Spark optimize query execution and provides type safety.
* __Performance:__ DataFrames are designed for optimized performance. Spark can leverage the schema information to perform predicate pushdown, column pruning, and other query optimizations to speed up data processing.
* __Language support:__ DataFrames are available in multiple programming languages, including Scala, Java, Python, and R. This means you can work with DataFrames in a language you're comfortable with.
* __Interoperability:__ DataFrames can seamlessly interoperate with RDDs, allowing you to use the right data abstraction for your specific use case.
* __Catalyst query optimizer:__ Spark DataFrames use the Catalyst query optimizer, which is a powerful tool for optimizing query plans. Catalyst can apply a wide range of optimizations, making queries more efficient.
* __Tungsten execution engine:__ DataFrames also use the Tungsten execution engine, which is designed for code generation and in-memory processing. This engine further enhances performance.
* __Data sources:__ DataFrames can read and write data from various sources, including Parquet, Avro, ORC, JSON, CSV, and more, making it easy to work with a variety of data formats.
* __SQL support:__ Spark DataFrames support SQL queries, enabling you to write SQL-like statements for data manipulation and analysis.

In [None]:
# RDDs

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDDs").getOrCreate()

sc = spark.sparkContext

rdd_numbers = sc.parallelize(range(1, 11))
# altenatively: rdd_numbers = spark.sparkContext.parallelize(range(1, 11))

list_numbers = rdd_numbers.collect()
# print(list_numbers[5])

for n in list_numbers:
   print(n)

In [None]:
# Map and Reduce

from pyspark.sql import SparkSession
from operator import add

spark = SparkSession.builder.appName("Map and Reduce").getOrCreate()

sc = spark.sparkContext

rdd_squared_numbers = sc.parallelize(range(1, 11)).map(lambda n: n**2)
# altenatively: rdd_numbers = spark.sparkContext.parallelize(range(1, 11))

list_numbers = rdd_squared_numbers.collect()

print("The first 10 square numbers are:")
print()

for n in list_numbers:
   print(n)
print()    

sum = sc.parallelize(range(1, 11)).map(lambda n: n**2).reduce(add)
print("The Sum of the first 10 square numbers is {summe}".format(summe = sum))
print()

rdd = sc.parallelize([("Peter", 10), ("Thomas", 10), ("Peter", 20), ("Thomas", 30)])
sorted(rdd.reduceByKey(add).collect())

In [None]:
# Filter

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Filter").getOrCreate() 

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd.filter(lambda x: x % 2 == 0).collect()


In [None]:
# Passing functions as parameters

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("Passing functions as parameters").getOrCreate() 

def myFunction(s):
    words = s.split(" ")
    return len(words)

rdd_words = spark.sparkContext.textFile("/home/peter/Projects/spark/data/words.txt").map(myFunction)
number_of_words_in_file = rdd_words.collect()[0]

print("The file contains {count} words.".format(count = number_of_words_in_file))


In [None]:
import random

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pi").getOrCreate()

sc = spark.sparkContext

num_samples = 100000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

In [None]:
# Creating an RDD from a file

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("RDD from a file").getOrCreate() 

rdd_data = spark.sparkContext.textFile("/home/peter/Projects/spark/data/data.txt")
print(rdd_data.collect())
lineLengths = rdd_data.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

In [None]:
# Small text files

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("Small text files").getOrCreate() 

# Load a text file and convert each line to a Row
rdd_textfiles = spark.sparkContext.wholeTextFiles("/home/peter/Projects/spark/data/small-text-files")

list_textfiles = rdd_textfiles.sortByKey().collect()

for tf in list_textfiles:
    print(tf[1])

In [None]:
# RDD finger exercises

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("RDDs").getOrCreate() 

# Create RDD from parallelize    
list_person = [("Peter", 60), ("Thomas", 59), ("Michael", 54), ("Anabel", 50)]
rdd_person = spark.sparkContext.parallelize(list_person)
rdd_person.setName('Personen')

print("The RDD {rdd_name} contains {anzahl} records.".format(rdd_name = rdd_person.name(), anzahl = rdd_person.count()))
print()

for p in rdd_person.collect():
  # Print name only
    print(p[0])
print()
    
print("First record before sorting: {ds}".format(ds = rdd_person.first()))
rdd_person_sorted = rdd_person.sortByKey()
print("First record after sorting:  {ds}".format(ds = rdd_person_sorted.first()))
print()

# Filter 
rdd_numbers = spark.sparkContext.parallelize(range(1, 10 + 1))
print(rdd_numbers.collect())
print(rdd_numbers.filter(lambda x: x % 2 == 0).collect())
print()

# Cartesian product 
rdd1 = spark.sparkContext.parallelize(range(1, 10 + 1))
rdd2 = spark.sparkContext.parallelize(range(1, 10 + 1))
print(rdd1.cartesian(rdd2).collect())



In [None]:
# DataFrame from a list of tuples

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframes").getOrCreate()

list_of_tuples = [('Peter', 60, 'M'), ('Thomas', 59, 'M'), ('Michael', 54, 'M'), ('Anabel', 50, 'F')]
spark.createDataFrame(list_of_tuples).show()

In [None]:
# DataFrame from a list of tuples specifying column names

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframes").getOrCreate()

list_of_tuples = [('Peter', 60, 'M'), ('Thomas', 59, 'M'), ('Michael', 54, 'M'), ('Anabel', 50, 'F')]
spark.createDataFrame(list_of_tuples, ['Name', 'Age', 'Gender']).show()

In [None]:
# DataFrame from a list of dictionaries (key-value-pairs)

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframes").getOrCreate()

list_of_dictionaries = [{'Name': 'Peter', 'Age':60, 'Gender': 'M'}, {'Name': 'Thomas', 'Age':59, 'Gender': 'M'}, {'Name': 'Michael', 'Age':54, 'Gender': 'M'}, {'Name': 'Anabel', 'Age':50, 'Gender': 'F'}]
df = spark.createDataFrame(list_of_dictionaries).select('Name', 'Age', 'Gender')
# df.show(2)
# df.show(vertical = True)
# df.show(2, vertical = True)
df.filter(df.Gender != 'F').show()

In [None]:
# Create a DataFrame with a schema inferred from the data

from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("Dataframes 2").getOrCreate()

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

print(df) # Schema is inferred
print()

for r in df.collect():
   print(r[0], r[1], r[2], r[3], r[4] )

In [None]:
# Create a DataFrame with an explicit schema

from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("Dataframes 3").getOrCreate()

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')

# df.show()
# df.show(1, vertical = True)

# df.printSchema()
# df.columns
# df.select("a", "b", "c").describe().show()
# print(type(df.a))
# df.select(df.c).show()
# df.collect()
# df.take(2)
# df.filter(df.a == 1).show()

In [None]:
# DataFrame aggregations

from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("Dataframes 4").getOrCreate()

df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])

df.show()
df.groupby('color').avg().show()

In [None]:
# DataFrames: Processing data in CSV files 

spark = SparkSession.builder.appName("CSV").getOrCreate()

df_customers = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "|") \
    .load("/home/peter/Projects/spark/data/customers.csv")

# number of records in a dataframe
# print(df_customers.count())

# show first 10 records of a dataframe
# df_customers.limit(10).show()

# show selected columns only
# df_customers.select('last_name', 'first_name', 'gender_code').limit(10).show()

# filter by gender code
# df_customers.select('last_name', 'first_name', 'gender_code').filter("gender_code == 'F'").limit(10).show()

# write file with female entries only
# df_female_customers = df_customers.filter("gender_code == 'F'")
# df_female_customers.limit(10).show()
# df_female_customers.write.format("csv").mode("overwrite").option("sep", "|").save("/home/peter/Projects/spark/data/female_customers.csv")
df2 = spark.read.format("csv").option("inferSchema", "false").option("header", "false").option("sep", "|").load("/home/peter/Projects/spark/data/female_customers.csv")
df2.limit(10).show()


In [56]:
# DataFrames: Playing around with JSON files

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Read a JSON File").getOrCreate()

df = spark.read.option("multiline","true").json("/home/peter/Projects/spark/data/customers.json")
df.printSchema()

# Display some columns of the firts 10 DataFrame records
df.select('last_name', 'first_name').show(10)

# Display females only
df.select('last_name', 'first_name', 'gender_code').filter(df['gender_code'] == 'F').show(10)

# Group by gender
df.groupBy("gender_code").count().show()

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("customers")

sqlDF = spark.sql("SELECT * FROM customers where first_name =='Peter'")
sqlDF.select('first_name', 'last_name', 'birth_date').show()

23/10/20 11:55:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- birth_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- customer_number: long (nullable = true)
 |-- data_date_part: string (nullable = true)
 |-- entity_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender_code: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- record_number: long (nullable = true)
 |-- street: string (nullable = true)
 |-- valid_from_date: string (nullable = true)
 |-- valid_to_date: string (nullable = true)

+---------+----------+
|last_name|first_name|
+---------+----------+
|Schönland| Willfried|
|   Johann|Friedemann|
|Christoph|      Hugo|
|    Barth|    Moritz|
|   Keudel|    Rouven|
| Schuster|    Justus|
|     Rose|    Rosita|
|   Flantz|    Marian|
|    Junck| Kornelius|
|     Junk|    Hatice|
+---------+----------+
only showing top 10 rows

+-----------+----------+-----------+
|  last_name|firs

In [None]:
# Aggregations - grouping. average, min, max

# import non-standard functions
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("Dataframes and aggretations").getOrCreate()

# create a dataframe with named columns from an array of tuples (corresponds to a table with 2 columns and 6 rows) 
person_df = spark.createDataFrame([("Peter", 20), ("Thomas", 31), ("Michael", 30), ("Anabel", 35), ("Elke", 25), ("Peter", 58)], ["name", "age"])

# group person with the same name, aggregate over ages and calculate avg age
avg_age_df = person_df.groupBy("name").agg(avg("age"))

# show results
person_df.show()
avg_age_df.show()

In [57]:
# Playing around with RDDs and DataFrames

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("RDDs and DataFrames").getOrCreate()

sc = spark.sparkContext

# Load a text file and convert each line to a Row
rdd_customers_raw = sc.textFile("/home/peter/Projects/spark/data/customers.csv")
rdd_balances_raw = sc.textFile("/home/peter/Projects/spark/data/balances.csv")

rdd_customer_attributes = rdd_customers_raw.map(lambda l: l.split("|"))
rdd_balances_attributes = rdd_balances_raw.map(lambda l: l.split("|"))

rdd_customers = rdd_customer_attributes.map(lambda a: Row(record_number=a[0], \
                                         entity_id=a[1], \
                                         customer_number=a[2], \
                                         valid_from_date=a[3], \
                                         valid_to_date=a[4], \
                                         gender_code=a[5], \
                                         last_name=a[6], \
                                         first_name=a[7], \
                                         birth_date=a[8], \
                                         country_code=a[9], \
                                         postal_code=a[10], \
                                         city=a[11], \
                                         street=a[12], \
                                         data_date_part=a[13]))

rdd_balances = rdd_balances_attributes.map(lambda a: Row(record_number=a[0], \
                                         entity_id=a[1], \
                                         customer_number=a[2], \
                                         instalment_amount=a[3], \
                                         term=a[4], \
                                         debt_amount=a[5], \
                                         data_date_part=a[6]))

# Infer the schemas and register the DataFrames as a tables
df_customers = spark.createDataFrame(rdd_customers)
df_customers.createOrReplaceTempView("customers")
df_balances = spark.createDataFrame(rdd_balances)
df_balances.createOrReplaceTempView("balances")

# Find balance records that have no customer
df_balances_without_customer = spark.sql("SELECT balances.* FROM balances LEFT JOIN customers ON balances.customer_number = customers.customer_number WHERE customers.customer_number IS NULL;")
df_balances_without_customer.show()

# females = spark.sql("SELECT last_name, first_name, gender_code FROM customers WHERE gender_code='F' LIMIT 10")
# female_names = females.rdd.map(lambda f: "Name: " + f.first_name).collect()

# for name in female_names:
#     print(name)

23/10/20 11:58:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------------+---------+---------------+-----------------+----+-----------+--------------+
|record_number|entity_id|customer_number|instalment_amount|term|debt_amount|data_date_part|
+-------------+---------+---------------+-----------------+----+-----------+--------------+
|           13|     3294|      000000000|           438.41|  32|   14029.12|    2021-11-08|
|           19|     3296|      000000000|           297.86|  29|    8637.94|    2021-11-08|
|          175|     3295|      000000000|           176.82|  17|    3005.94|    2021-11-08|
|          410|     3296|      000000000|           418.59|  22|    9208.98|    2021-11-08|
|          713|     3295|      000000000|           135.01|  35|    4725.35|    2021-11-08|
|          905|     3294|      000000000|           381.41|  37|   14112.17|    2021-11-08|
|          978|     3294|      000000000|           364.22|  27|    9833.94|    2021-11-08|
|         1023|     3296|      000000000|           345.46|   2|     690.92|    

In [None]:
# Apache Iceberg

import pyspark

spark = pyspark.sql.SparkSession.builder.appName("Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/home/peter/Projects/spark/warehouse") \
    .getOrCreate()

df_customers = spark.read.format("com.databricks.spark.csv") \
    .option("delimiter", "|") \
    .option("header", "true") \
    .load("/home/peter/Projects/spark/data/customers.csv")

df_customers.select('last_name', 'first_name', 'gender_code', 'data_date_part').filter("gender_code = 'F'").createOrReplaceTempView("temp_view_customers")
spark.sql("CREATE or REPLACE TABLE local.db.customers USING iceberg AS SELECT * FROM temp_view_customers")
df = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers")
df.show()
df = spark.sql("UPDATE local.db.customers SET data_date_part = '2023-10-20' WHERE data_date_part = '2021-11-08'")
df = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers")
df.show()


In [58]:
# Use a function to read from a MariaDB and from PostgreSQL database table

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Databases") \
    .config("spark.jars", "/opt/spark-3.5.0/jars/postgresql-42.6.0.jar") \
    .config("spark.jars", "/opt/spark-3.5.0/jars/mariadb-java-client-3.2.0.jar") \
    .getOrCreate()

def show_customers(spark: SparkSession, database) -> None:
    if (database.lower() == "mariadb"):
        df_customers = spark.read \
            .format("jdbc") \
            .option("url", "jdbc:mysql://localhost:3306/shop?permitMysqlScheme") \
            .option("driver", "org.mariadb.jdbc.Driver") \
            .option("dbtable", "customers") \
            .option("user", "spark") \
            .option("password", "spark") \
            .load()
        df_customers.select('last_name', 'first_name', 'birth_date').show(10)
    elif (database.lower() == "postgresql"):
        df_customers = spark.read \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://localhost:5432/galeria_anatomica") \
            .option("driver", "org.postgresql.Driver") \
            .option("dbtable", "shop.customers") \
            .option("user", "spark") \
            .option("password", "spark") \
            .load()
        df_customers.select('last_name', 'first_name', 'birth_date').show(10)
    else:
        print("Unbekannter Datenbanktyp")

show_customers(spark, "postgresql")

23/10/20 12:00:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---------+----------+----------+
|last_name|first_name|birth_date|
+---------+----------+----------+
|    Dobes|    Danuta|1998-01-23|
|   Dippel|     Maike|1945-08-09|
|     Pohl|      Anni|2002-08-10|
|     Kade|  Siegbert|1977-03-24|
|  Kühnert| Dietlinde|1971-04-09|
|    Lange|   Baptist|1975-01-28|
|   Hecker|     Elena|1965-05-30|
|   Dowerg|   Aribert|1961-05-30|
|    Heser|    Norman|1952-05-09|
|   Ladeck|Karl-Josef|1989-08-18|
+---------+----------+----------+
only showing top 10 rows



In [None]:
# Write and read Parquet files

spark = SparkSession.builder.appName("Write and read a Parquet file").getOrCreate()

customers_df = spark.read.format("csv").option("header", "true").option("sep", "|").load("/home/peter/Projects/spark/data/customers.csv")
customers_df.write.mode("overwrite").parquet("/home/peter/Projects/spark/data/customers.parquet")
parquet_df = spark.read.parquet("/home/peter/Projects/spark/data/customers.parquet")
parquet_df.createOrReplaceTempView("parquet_file")
males_df = spark.sql("SELECT last_name, first_name, gender_code FROM parquet_file WHERE gender_code = 'M'")
males_df.show(10)

In [None]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 12345) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()