In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Создаём SparkSession. Добавляем путь к драйверу JDBC.

In [None]:
spark = (
    SparkSession
        .builder
        .master("local")
        .appName("JDBC Data Source")
        .config("spark.jars", "jars/postgresql-42.7.2.jar")
        .config("spark.driver.memory", "8g")
        .getOrCreate()
)

Задаём свойства подключения.

In [None]:
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/spark"
user = "postgres"
password = "postgres"

## Чтение таблицы целиком

### Вариант 1

In [None]:
employees_df = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.employees") \
    .load()

employees_df.count()

In [None]:
employees_df.printSchema()

In [None]:
employees_df.show(10)

### Вариант 2

In [None]:
DBPARAMS = {
    "user": user,
    "password": password,
    "driver": driver
}

In [None]:
df = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS)
df.count()

In [None]:
df.printSchema()

In [None]:
df.show(10)

Проверим количество партиций.

In [None]:
df.rdd.getNumPartitions()

## Как распараллелить чтение?

### Партиционирование по столбцам

Добавим количество партиций к параметрам чтения таблицы.

In [None]:
df101 = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, numPartitions=10)

print("count = ", df101.count())
print("num partitions = ", df101.rdd.getNumPartitions())

Количество партиций не изменилось.

Узнаем минимальное и максимальное значения столбца *emp_no*

In [None]:
df.agg(min(col("emp_no")), max(col("emp_no"))).show()

In [None]:
min_emp_no = df.agg(min(col("emp_no"))).collect()[0][0]
max_emp_no = df.agg(max(col("emp_no"))).collect()[0][0]

print("min = ", min_emp_no, "\nmax = ", max_emp_no)

In [None]:
df102 = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS,
                        column="emp_no", lowerBound = min_emp_no, upperBound = max_emp_no, numPartitions=10)

print("count = ", df102.count())
print("num partitions = ", df102.rdd.getNumPartitions())

In [None]:
df102.show(10)

Посмотрим сколько записей попало в каждую партицию

In [None]:
df102.rdd.foreachPartition(lambda p: print("Partition count = ", len(list(p))))

Это также можно получить другим способом

In [None]:
pl = df102.rdd.mapPartitionsWithIndex(lambda p, i: (p, len(list(i)))).collect()
list(zip(pl[::2], pl[1::2]))

Зададим в качестве *lowerBound* и *upperBound* произвольные значения (не min и max)

In [None]:
df103 = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.employees") \
    .option("partitionColumn", "emp_no") \
    .option("lowerBound", "20000") \
    .option("upperBound", "50000") \
    .option("numPartitions", "10") \
    .load()

print("count = ", df103.count())
print("num partitions = ", df103.rdd.getNumPartitions())

Посмотрим сколько теперь записей попало в каждую партицию

In [None]:
pl2 = df103.rdd.mapPartitionsWithIndex(lambda p, i: (p, len(list(i)))).collect()
list(zip(pl2[::2], pl2[1::2]))

In [None]:
df103.show(10)

### Партиционирование по предикатам

### Пример 1

Опредилим **два** предиката по значению столбца *gender*

In [None]:
pred = ["gender = 'M'", "gender = 'F'"]

df_pred = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, predicates=pred)

print("count = ", df_pred.count())
print("num partitions = ", df_pred.rdd.getNumPartitions())

Опредилим **один** предиката по одному значению столбца *gender*

In [None]:
pred1 = ["gender = 'F'"]

df_pred1 = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, predicates=pred1)

print("count = ", df_pred1.count())
print("num partitions = ", df_pred1.rdd.getNumPartitions())

Опредилим **три** предиката по значению столбца *gender*

In [None]:
pred3 = ["gender = 'F'", "gender = 'M'", "gender = 'M'"]

df_pred3 = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, predicates=pred3)

print("count = ", df_pred3.count())
print("num partitions = ", df_pred3.rdd.getNumPartitions())

Опредилим **четыре** предиката по значению столбца *gender*

In [None]:
pred4 = ["gender = 'F'", "gender = 'F'", "gender = 'M'", "gender = 'M'"]

df_pred4 = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, predicates=pred4)

print("count = ", df_pred4.count())
print("num partitions = ", df_pred4.rdd.getNumPartitions())

Посмотрим сколько записей для каждого значения столбца *gender* было в исходной таблице

In [None]:
df.groupBy(col("gender")).agg(count(col("emp_no"))).show()

Сравним с количеством записей при применении трёх и четырёх предикатов

In [None]:
df_pred3.groupBy(col("gender")).agg(count(col("emp_no"))).show()

In [None]:
df_pred4.groupBy(col("gender")).agg(count(col("emp_no"))).show()

### Пример 2

Определим **два** предиката по условиям на значения столбца *emp_no* 

In [None]:
pred2 = ["emp_no > 20000 and emp_no <= 50000", "emp_no >= 50000 and emp_no <= 100000"]

df_pred2 = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, predicates=pred2)

print("count = ", df_pred2.count())
print("num partitions = ", df_pred2.rdd.getNumPartitions())

In [None]:
df_pred2.show(10)

Определим **один** предикат по условию на значения столбца *emp_no* 

In [None]:
pred22 = ["emp_no > 20000 and emp_no <= 50000"]

df_pred22 = spark.read.jdbc(url=url, table="public.employees", properties=DBPARAMS, predicates=pred22)

print("count = ", df_pred22.count())
print("num partitions = ", df_pred22.rdd.getNumPartitions())

## Фильтрация

Выполним запрос к базе на выборку значений из таблицы с условием

In [None]:
q = """select * from public.employees where emp_no > 20000 and emp_no <= 50000"""

dfq = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("query", q) \
    .load()

dfq.count()

In [None]:
dfq.show(10)

## Соединения в базе

Выполним запрос к базе на выборку значений из соединения таблиц

In [None]:
qj = """select e.emp_no, birth_date, first_name, last_name, gender, hire_date, salary, from_date, to_date
from employees e join salaries s on e.emp_no = s.emp_no"""

dfj = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("query", qj) \
    .load()

dfj.count()

In [None]:
dfj.show()

## Запись в таблицу

Посмотрим на таблицу *employees*

In [None]:
employees_df.show(10)

Загрузим таблицу *salaries*

In [None]:
salaries_df = spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.salaries") \
    .load()

salaries_df.count()

In [None]:
salaries_df.show(10)

Сделаем группировку по колонке *emp_no* и найдём максимальное значение колонки *salary*

In [None]:
employees_salaries_df = salaries_df.groupBy(col("emp_no")).agg(max(col("salary")).alias("max_salary"))

employees_salaries_df.show(10)

Создадим новый Dataframe как результат соединения *employees_df* и агрегированного *salaries_df*

In [None]:
employees_salaries_df = employees_df.join(employees_salaries_df, "emp_no")

employees_salaries_df.show()

Сохраним новый Dataframe в таблицу в базе. Таблицы с таким именем в базе не было. Она будет создана.

In [None]:
employees_salaries_df.write \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.employees_salaries") \
    .save()

Если таблица с таким именем существовала в базе, то при сохранении надо использовать режим *overwrite*

In [None]:
employees_salaries_df.write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.employees_salaries") \
    .option("truncate", "true") \
    .save()

In [None]:
spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.employees_salaries") \
    .load() \
    .count()

Если использовать режим *append* содержимое Dataframe будет добавлено в таблицу

In [None]:
employees_salaries_df.write \
    .mode("append") \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.employees_salaries") \
    .save()

In [None]:
spark.read \
    .format("jdbc") \
    .option("driver", driver) \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("dbtable", "public.employees_salaries") \
    .load() \
    .count()