# **Programming Spark using Jupyter Notebook**
---
__Santander Consumer Bank Germany__  
__CTO & IT Architecture__  

__Version:__ 1.0  
__Date:__ 2024-04-03  
__Github:__

This document is intended to serve as a first introduction to programming Apache Spark using the Python framework PySpark. Basic knowledge of the Python programming language is required. The most important concepts and data structures of Spark, in particular Resilient Distributed Datasets (RDD) and DataFrames, are presented using short example programs. Jupyter Notebook and Microsoft Visual Code are used as development environments. Many of the examples are taken from the [official Spark documentation](https://spark.apache.org/docs/latest/index.html) and some have been slightly modified.

## What is Apache Spark?

Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

## **How to create a development environment**

### **Install Docker**
Install Docker following the instructions for your operating system.

### Download the jupyter/pyspark-notebook image

Once installed download the jupyter/pyspark-notebook image.
```
docker pull jupyter/pyspark-notebook
```


### Create a bash file

Create a bash file (e.g. run.sh) with the following content:

```
#!/bin/bash

CONTAINER=$(docker run -d --rm --name my-pyspark -p 8888:8888 -v /home/peter/projects:/home/jovyan/work jupyter/pyspark-notebook)
docker cp /home/peter/projects/postgres/lib/postgresql-42.7.0.jar $CONTAINER:/usr/local/spark/jars
docker cp /home/peter/projects/iceberg/lib/iceberg-spark-runtime-3.5_2.12-1.4.0.jar $CONTAINER:/usr/local/spark/jars
export CONTAINER
sleep 5
docker exec $CONTAINER jupyter server list
```

For Windows, create a corresponding Powershell file and adapt the syntax join above. 

The second line creates a container (with the name "my-pyspark") from the downloaded image, maps the Juypter port 8888 so it becomes accessible outside the container under the same port number, and additionally maps the pre-configured home directory inside the container (/home/jovyan/work) to a folder in the file system of your operating system (here: /home/peter/projects). Any filed stored there will appear later inside the container as if it were local. The third and fourth line show how to copy libraries like database drivers into Sparks library folder inside the container (e.g. to read from a Postgres database within your Spark program).

### Open the Jupyter Notebook in your browser 

Open your preferred browser and enter the following as URL:

```
localhost:8888/tree?token=0f9541f307a73fcd220474bfd24d2476ea145d58d165ad1b
```
The token (here: 0f9541f307a73fcd220474bfd24d2476ea145d58d165ad1b) will be different each time you start Jupyter Notebook. The token you need for the current Juypter session is shown on the screen when the run script terminates. Look for "token=".

```
Currently running servers:
http://cc03a1a1513f:8888/?token=2ea951ec0dc87115a4f40a7b21f1a7b823ce3379a14f94fb
```

## How to run Jupyter Notebook and PySpark in Anaconda

### Install Anaconda or Minoconda

Install Anaconda or Miniconda following the instructions for your operating system.

### Create a virtual environment

Create a file env.yml with the following content in your working directory. Replace the name "Onboarding" with the name of your project.

```
name: onboarding
channels:
  - conda-forge
dependencies:
  - python=3.10
  - pyspark=3.5
  - pypandoc=1.12
  - pytest=7.4.3
  - pylint=3.0.3
  - findspark=2.0.1
  - jupyter=1.0.0
  - pandas=2.2.1
  - numpy=1.26.4
  - openpyxl=3.1.2
  ```
  Then run

  ```
  conda env create -f env.yml
  ```

Conda will download all the required packages and take care of all dependencies. This may take a couple minutes. Once ready activate the new environment with

```
conda activate onboarding
```

To start Jupyter Notebook enter

```
jupyter notebook
```

In [1]:
# SparkSession und SparkContext

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Introduction to Spark").getOrCreate()

sc = spark.sparkContext

print(spark)
print(sc)

<pyspark.sql.session.SparkSession object at 0x7b836cc65350>
<SparkContext master=local[*] appName=Introduction to Spark>


# RDDs and DataFrames

[RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)  
[SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)  
[Getting Started - PySpark](https://spark.apache.org/docs/latest/api/python/getting_started/index.html)

## What are RDDs?
A __RDD__, or Resilient Distributed Dataset, is a fundamental data structure in Apache Spark. RDDs are designed to provide an efficient and fault-tolerant way to distribute and process data across a cluster of computers. They have several key characteristics:

* __Resilient:__ RDDs are "resilient" because they can recover from node failures. If a node in the cluster fails, the data and the operations applied to it can be reconstructed on another node, ensuring fault tolerance.
* __Distributed:__ RDDs are distributed across multiple nodes in a cluster, allowing for parallel processing of data. This distribution is a fundamental feature of Spark, making it suitable for handling large datasets.
* __Immutable:__ Once created, RDDs are immutable, meaning their data cannot be changed. Instead, transformations applied to an RDD result in the creation of new RDDs, which helps maintain data consistency and enables lineage information for fault recovery.
* __In-Memory:__ RDDs are typically stored in memory, which makes them faster to access and process compared to reading and writing to disk. This in-memory storage is one of the key performance benefits of Spark.
* __Lazy execution:__ When Spark transforms data, it does not immediately compute the transformation but plans how to compute later. When actions such as collect() are explicitly called, the computation starts.

Common operations on RDDs include mapping, filtering, reducing, and joining data.

While RDDs were the primary data abstraction in earlier versions of Spark, more recent versions introduced __DataFrames__ and Datasets, which provide higher-level abstractions and optimizations for structured data processing. Nevertheless, RDDs still play a crucial role in Spark, particularly when you need fine-grained control over data and operations or when working with unstructured data.

## What are DataFrames?
Spark __DataFrames__ are a distributed collection of data organized into named columns, much like a table in a relational database or a data frame in R or Python. They are a higher-level abstraction built on top of Resilient Distributed Datasets (RDDs) in Apache Spark, designed to provide a more structured and optimized way to work with data. DataFrames were introduced in Spark 2.0 to address the limitations of RDDs, particularly when dealing with structured and semi-structured data.

Key features and characteristics of Spark DataFrames include:
* __Schema:__ DataFrames have a well-defined schema that describes the structure of the data, including column names and data types. This schema information helps Spark optimize query execution and provides type safety.
* __Performance:__ DataFrames are designed for optimized performance. Spark can leverage the schema information to perform predicate pushdown, column pruning, and other query optimizations to speed up data processing.
* __Language support:__ DataFrames are available in multiple programming languages, including Scala, Java, Python, and R. This means you can work with DataFrames in a language you're comfortable with.
* __Interoperability:__ DataFrames can seamlessly interoperate with RDDs, allowing you to use the right data abstraction for your specific use case.
* __Catalyst query optimizer:__ Spark DataFrames use the Catalyst query optimizer, which is a powerful tool for optimizing query plans. Catalyst can apply a wide range of optimizations, making queries more efficient.
* __Tungsten execution engine:__ DataFrames also use the Tungsten execution engine, which is designed for code generation and in-memory processing. This engine further enhances performance.
* __Data sources:__ DataFrames can read and write data from various sources, including Parquet, Avro, ORC, JSON, CSV, and more, making it easy to work with a variety of data formats.
* __SQL support:__ Spark DataFrames support SQL queries, enabling you to write SQL-like statements for data manipulation and analysis.

In [None]:
# RDDs

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDDs").getOrCreate()

sc = spark.sparkContext

rdd_numbers = sc.parallelize(range(1, 11))
# altenatively: rdd_numbers = spark.sparkContext.parallelize(range(1, 11))

list_numbers = rdd_numbers.collect()
# print(list_numbers[5])

for n in list_numbers:
   print(n)

In [None]:
# Map and Reduce

from pyspark.sql import SparkSession
from operator import add

spark = SparkSession.builder.appName("Map and Reduce").getOrCreate()

sc = spark.sparkContext

rdd_squared_numbers = sc.parallelize(range(1, 11)).map(lambda n: n**2)
# altenatively: rdd_numbers = spark.sparkContext.parallelize(range(1, 11))

list_numbers = rdd_squared_numbers.collect()

print("The first 10 square numbers are:")
print()

for n in list_numbers:
   print(n)
print()    

sum = sc.parallelize(range(1, 11)).map(lambda n: n**2).reduce(add)
print("The Sum of the first 10 square numbers is {summe}".format(summe = sum))
print()

rdd = sc.parallelize([("Peter", 10), ("Thomas", 10), ("Peter", 20), ("Thomas", 30)])
sorted(rdd.reduceByKey(add).collect())

In [None]:
# Filter

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Filter").getOrCreate() 

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd.filter(lambda x: x % 2 == 0).collect()


In [None]:
# Passing functions as parameters

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("Passing functions as parameters").getOrCreate() 

def myFunction(s):
    words = s.split(" ")
    return len(words)

rdd_words = spark.sparkContext.textFile("/home/jovyan/work/spark/data/words.txt").map(myFunction)
number_of_words_in_file = rdd_words.collect()[0]

print("The file contains {count} words.".format(count = number_of_words_in_file))


In [None]:
import random

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pi").getOrCreate()

sc = spark.sparkContext

num_samples = 100000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

In [None]:
# Creating an RDD from a file

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("RDD from a file").getOrCreate() 

rdd_data = spark.sparkContext.textFile("/home/jovyan/work/spark/data/data.txt")
print(rdd_data.collect())
lineLengths = rdd_data.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

In [None]:
# Small text files

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("Small text files").getOrCreate() 

# Load a text file and convert each line to a Row
rdd_textfiles = spark.sparkContext.wholeTextFiles("/home/jovyan/work/spark/data/small-text-files")

list_textfiles = rdd_textfiles.sortByKey().collect()

for tf in list_textfiles:
    print(tf[1])

In [None]:
# RDD finger exercises

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("RDDs").getOrCreate() 

# Create RDD from parallelize    
list_person = [("Peter", 60), ("Thomas", 59), ("Michael", 54), ("Anabel", 50)]
rdd_person = spark.sparkContext.parallelize(list_person)
rdd_person.setName('Personen')

print("The RDD {rdd_name} contains {anzahl} records.".format(rdd_name = rdd_person.name(), anzahl = rdd_person.count()))
print()

for p in rdd_person.collect():
  # Print name only
    print(p[0])
print()
    
print("First record before sorting: {ds}".format(ds = rdd_person.first()))
rdd_person_sorted = rdd_person.sortByKey()
print("First record after sorting:  {ds}".format(ds = rdd_person_sorted.first()))
print()

# Filter 
rdd_numbers = spark.sparkContext.parallelize(range(1, 10 + 1))
print(rdd_numbers.collect())
print(rdd_numbers.filter(lambda x: x % 2 == 0).collect())
print()

# Cartesian product 
rdd1 = spark.sparkContext.parallelize(range(1, 10 + 1))
rdd2 = spark.sparkContext.parallelize(range(1, 10 + 1))
print(rdd1.cartesian(rdd2).collect())



In [None]:
# DataFrame from a list of tuples

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframes").getOrCreate()

list_of_tuples = [('Peter', 60, 'M'), ('Thomas', 59, 'M'), ('Michael', 54, 'M'), ('Anabel', 50, 'F')]
spark.createDataFrame(list_of_tuples).show()

In [None]:
# DataFrame from a list of tuples specifying column names

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframes").getOrCreate()

list_of_tuples = [('Peter', 60, 'M'), ('Thomas', 59, 'M'), ('Michael', 54, 'M'), ('Anabel', 50, 'F')]
spark.createDataFrame(list_of_tuples, ['Name', 'Age', 'Gender']).show()

In [None]:
# DataFrame from a list of dictionaries (key-value-pairs)

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Dataframes").getOrCreate()

list_of_dictionaries = [{'Name': 'Peter', 'Age':60, 'Gender': 'M'}, {'Name': 'Thomas', 'Age':59, 'Gender': 'M'}, {'Name': 'Michael', 'Age':54, 'Gender': 'M'}, {'Name': 'Anabel', 'Age':50, 'Gender': 'F'}]
df = spark.createDataFrame(list_of_dictionaries).select('Name', 'Age', 'Gender')
# df.show(2)
# df.show(vertical = True)
# df.show(2, vertical = True)
df.filter(df.Gender != 'F').show()

In [None]:
# Create a DataFrame with a schema inferred from the data

from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("Dataframes 2").getOrCreate()

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

print(df) # Schema is inferred
print()

for r in df.collect():
   print(r[0], r[1], r[2], r[3], r[4] )

In [None]:
# Create a DataFrame with an explicit schema

from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("Dataframes 3").getOrCreate()

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')

# df.show()
# df.show(1, vertical = True)

# df.printSchema()
# df.columns
# df.select("a", "b", "c").describe().show()
# print(type(df.a))
# df.select(df.c).show()
# df.collect()
# df.take(2)
# df.filter(df.a == 1).show()

In [None]:
# DataFrame aggregations

from datetime import datetime, date
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("Dataframes 4").getOrCreate()

df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])

df.show()
df.groupby('color').avg().show()

In [None]:
# DataFrames: Processing data in CSV files 

spark = SparkSession.builder.appName("CSV").getOrCreate()

df_customers = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", "|") \
    .load("/home/jovyan/work/spark/data/customers.csv")

# number of records in a dataframe
# print(df_customers.count())

# show first 10 records of a dataframe
# df_customers.limit(10).show()

# show selected columns only
# df_customers.select('last_name', 'first_name', 'gender_code').limit(10).show()

# filter by gender code
# df_customers.select('last_name', 'first_name', 'gender_code').filter("gender_code == 'F'").limit(10).show()

# write file with female entries only
df_female_customers = df_customers.filter("gender_code == 'F'")
df_female_customers.limit(10).show()
df_female_customers.write.format("csv").mode("overwrite").option("sep", "|").save("/home/jovyan/work/spark/data/female_customers.csv")
df2 = spark.read.format("csv").option("inferSchema", "false").option("header", "false").option("sep", "|").load("/home/jovyan/work/spark/data/female_customers.csv")
df2.limit(10).show()


In [None]:
# DataFrames: Playing around with JSON files

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Read a JSON File").getOrCreate()

df = spark.read.option("multiline","true").json("/home/jovyan/work/spark/data/customers.json")
df.printSchema()

# Display some columns of the firts 10 DataFrame records
df.select('last_name', 'first_name').show(10)

# Display females only
df.select('last_name', 'first_name', 'gender_code').filter(df['gender_code'] == 'F').show(10)

# Group by gender
df.groupBy("gender_code").count().show()

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("customers")

sqlDF = spark.sql("SELECT * FROM customers where first_name =='Peter'")
sqlDF.select('first_name', 'last_name', 'birth_date').show()

In [None]:
# Aggregations - grouping. average, min, max

# import non-standard functions
from pyspark.sql.functions import avg

spark = SparkSession.builder.appName("Dataframes and aggretations").getOrCreate()

# create a dataframe with named columns from an array of tuples (corresponds to a table with 2 columns and 6 rows) 
person_df = spark.createDataFrame([("Peter", 20), ("Thomas", 31), ("Michael", 30), ("Anabel", 35), ("Elke", 25), ("Peter", 58)], ["name", "age"])

# group person with the same name, aggregate over ages and calculate avg age
avg_age_df = person_df.groupBy("name").agg(avg("age"))

# show results
person_df.show()
avg_age_df.show()

In [None]:
# Playing around with RDDs and DataFrames

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("RDDs and DataFrames").getOrCreate()

sc = spark.sparkContext

# Load a text file and convert each line to a Row
rdd_customers_raw = sc.textFile("/home/jovyan/work/spark/data/customers.csv")
rdd_balances_raw = sc.textFile("/home/jovyan/work/spark/data/balances.csv")

rdd_customer_attributes = rdd_customers_raw.map(lambda l: l.split("|"))
rdd_balances_attributes = rdd_balances_raw.map(lambda l: l.split("|"))

rdd_customers = rdd_customer_attributes.map(lambda a: Row(record_number=a[0], \
                                         entity_id=a[1], \
                                         customer_number=a[2], \
                                         valid_from_date=a[3], \
                                         valid_to_date=a[4], \
                                         gender_code=a[5], \
                                         last_name=a[6], \
                                         first_name=a[7], \
                                         birth_date=a[8], \
                                         country_code=a[9], \
                                         postal_code=a[10], \
                                         city=a[11], \
                                         street=a[12], \
                                         data_date_part=a[13]))

rdd_balances = rdd_balances_attributes.map(lambda a: Row(record_number=a[0], \
                                         entity_id=a[1], \
                                         customer_number=a[2], \
                                         instalment_amount=a[3], \
                                         term=a[4], \
                                         debt_amount=a[5], \
                                         data_date_part=a[6]))

# Infer the schemas and register the DataFrames as a tables
df_customers = spark.createDataFrame(rdd_customers)
df_customers.createOrReplaceTempView("customers")
df_balances = spark.createDataFrame(rdd_balances)
df_balances.createOrReplaceTempView("balances")

# Find balance records that have no customer
df_balances_without_customer = spark.sql("SELECT balances.* FROM balances LEFT JOIN customers ON balances.customer_number = customers.customer_number WHERE customers.customer_number IS NULL;")
df_balances_without_customer.show()

# females = spark.sql("SELECT last_name, first_name, gender_code FROM customers WHERE gender_code='F' LIMIT 10")
# female_names = females.rdd.map(lambda f: "Name: " + f.first_name).collect()

# for name in female_names:
#     print(name)

In [None]:
# Apache Iceberg

import pyspark

spark = pyspark.sql.SparkSession.builder.appName("Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/home/jovyan/work/iceberg/warehouse") \
    .getOrCreate()

df_customers = spark.read.format("com.databricks.spark.csv") \
    .option("delimiter", "|") \
    .option("header", "true") \
    .load("/home/jovyan/work/iceberg/data/customers.csv")

# df_customers.select('last_name', 'first_name', 'gender_code', 'data_date_part').filter("gender_code = 'F'").createOrReplaceTempView("temp_view_customers")
df_customers.select('*').createOrReplaceTempView("temp_view_customers")
spark.sql("CREATE or REPLACE TABLE local.db.customers USING iceberg AS SELECT * FROM temp_view_customers")
df = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers")
df.show()
# df = spark.sql("UPDATE local.db.customers SET data_date_part = '2024-01-01' WHERE data_date_part = '2024-02-01' AND first_name = 'Re'")
# df = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers WHERE first_name = 'Rebecca'")
# df.show()


In [None]:
# Apache Iceberg Update

import pyspark

spark = pyspark.sql.SparkSession.builder.appName("Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/home/jovyan/work/iceberg/warehouse") \
    .getOrCreate()

df_customers = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers")
df_customers.show()
df = spark.sql("UPDATE local.db.customers SET first_name = 'Stefanie' WHERE first_name = 'Stefani'")
df_customers = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers")
df_customers.show()


In [None]:
# Apache Iceberg Delete

import pyspark

spark = pyspark.sql.SparkSession.builder.appName("Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/home/jovyan/work/iceberg/warehouse") \
    .getOrCreate()

df_customers = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers")
df_customers.show()
df = spark.sql("DELETE FROM local.db.customers WHERE first_name = 'Stefanie'")
df_customers = spark.sql("SELECT last_name, first_name, data_date_part FROM local.db.customers")
df_customers.show()

In [None]:
# Use a function to read from a MariaDB and from PostgreSQL database table

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Databases") \
    .getOrCreate()

# spark = SparkSession \
#    .builder \
#    .appName("Databases") \
# location of the drivers in non-Docker envs
# in Docker driver must be copied into container to /usr/local/spark/jars with docker cp
#    .config("spark.jars", "/home/peter/projects/spark/postgresql-42.7.0.jar") \ 
#    .getOrCreate()

def show_customers(spark: SparkSession, database) -> None:
    if (database.lower() == "mariadb"):
        df_customers = spark.read \
            .format("jdbc") \
            .option("url", "jdbc:mysql://localhost:3306/shop?permitMysqlScheme") \
            .option("driver", "org.mariadb.jdbc.Driver") \
            .option("dbtable", "customers") \
            .option("user", "postgres") \
            .option("password", "guiltyspark") \
            .load()
        df_customers.select('last_name', 'first_name', 'birth_date').show(10)
    elif (database.lower() == "postgresql"):
        df_customers = spark.read \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://172.17.0.2:5432/postgres") \
            .option("driver", "org.postgresql.Driver") \
            .option("dbtable", "customers") \
            .option("user", "postgres") \
            .option("password", "guiltyspark") \
            .load()
        df_customers.select('last_name', 'first_name', 'birth_date').show(100)
    else:
        print("Unbekannter Datenbanktyp")

show_customers(spark, "postgresql")

In [None]:
# Write and read Parquet files

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Write and read a Parquet file").getOrCreate()

customers_df = spark.read.format("csv").option("header", "true").option("sep", "|").load("/home/jovyan/work/spark/data/customers.csv")
customers_df.write.mode("overwrite").parquet("/home/jovyan/work/spark/data/customers.parquet")
parquet_df = spark.read.parquet("/home/jovyan/work/spark/data/customers.parquet")
parquet_df.createOrReplaceTempView("parquet_file")
males_df = spark.sql("SELECT last_name, first_name, gender_code FROM parquet_file WHERE gender_code = 'M'")
males_df.show(10)

In [None]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 12345) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

In [None]:
# Write and read to/from Redis

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession \
    .builder \
    .appName("Databases") \
    .config("spark.jars", "/opt/spark-3.5.0/jars/mariadb-java-client-3.2.0.jar") \
    .config("spark.jars", "/opt/spark-3.5.0/jars/spark-redis_2.12-3.1.0.jar") \
    .config("spark.redis.host", "localhost") \
    .config("spark.redis.port", "6379") \
    .getOrCreate()

sc = spark.sparkContext

rdd_customers_raw = sc.textFile("/home/peter/Projects/spark/data/customers.csv")
# one way to skip a header...
rdd_customers_headers = rdd_customers_raw.first()
rdd_customers = rdd_customers_raw.filter(lambda x: x != rdd_customers_headers) 
rdd_customers.collect()
rdd_customer_attributes = rdd_customers.map(lambda l: l.split("|"))

schema = StructType([StructField("record_number", StringType(), False),          
                     StructField("entity_id", StringType(), False),
                     StructField("customer_number", StringType(), False),
                     StructField("valid_from_date", StringType(), False),
                     StructField("valid_to_date", StringType(), False),
                     StructField("gender_code", StringType(), False),
                     StructField("last_name", StringType(), False),
                     StructField("first_name", StringType(), False),
                     StructField("birth_date", StringType(), False),
                     StructField("country_code", StringType(), False),
                     StructField("postal_code", StringType(), False),
                     StructField("city", StringType(), False),
                     StructField("street", StringType(), False),
                     StructField("data_date_part", StringType(), False)])

df_customers = spark.createDataFrame(rdd_customer_attributes, schema)

df_customers.write\
  .format("org.apache.spark.sql.redis")\
  .option("table", "customers")\
  .option("key.column", "record_number")\
  .mode("Overwrite")\
  .save()

df_redis_customers = spark.read.format("org.apache.spark.sql.redis").option("table", "customers").option("key.column", "name").load()
df_redis_customers.select('last_name', 'first_name').filter(df_redis_customers.gender_code == 'M').show(20)

In [1]:
# Read a CSV file and convert it to Parquet

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSV-to-Parquet converter").getOrCreate()

states_csv_df = spark.read.format("csv").option("header", "true").option("sep", "|").load("/home/peter/projects/onboarding/data/csv/states.csv")
states_csv_df.write.mode("overwrite").parquet("/home/peter/projects/onboarding/data/parquet/states.parquet")
states_parquet_df = spark.read.parquet("/home/peter/projects/onboarding/data/parquet/states.parquet")
states_parquet_df.createOrReplaceTempView("states")
states_sql_df = spark.sql("SELECT * FROM states")
states_sql_df.show()

24/04/03 09:11:38 WARN Utils: Your hostname, lenovo-xubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface ens33)
24/04/03 09:11:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/03 09:11:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+------+--------------------+------------+--------+-----------+--------+--------+------------------+-------+----+
|Nummer|          Bundesland|Länderkürzel|  Fläche|Bevölkerung|Männlich|Weiblich|Bevölkerungsdichte|    BIP|Jahr|
+------+--------------------+------------+--------+-----------+--------+--------+------------------+-------+----+
|    01|  Schleswig-Holstein|          SH|15804.30|    2953270| 1443269| 1510001|               187|118.680|2023|
|    02|             Hamburg|          HH|  755.09|    1892122|  925616|  966506|              2506|150.575|2023|
|    03|       Niedersachsen|          NI|47709.90|    8140242| 4009822| 4130420|               171|363.109|2023|
|    04|              Bremen|          HB|  419.61|     684864|  338233|  346631|              1632| 39.252|2023|
|    05| Nordrhein-Westfalen|          NW|34112.72|   18139116| 8890200| 9248916|               532|839.084|2023|
|    06|              Hessen|          HE|21115.62|    6391360| 3151158| 3240202|       