In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.2.19 pyspark-shell'

from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
  .builder.config("spark.driver.host", "localhost") \
  .appName("Spark SQL") \
  .master("local") \
  .enableHiveSupport() \
  .getOrCreate()

In [2]:
cars_df = spark \
  .read \
  .json("data/cars") \
  .persist(StorageLevel.MEMORY_ONLY)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/jovyan/notebooks/data/cars.

In [3]:
cars_df.show(10, 30) # показать 10 строк, обрубать длинные строки по 30 символу

NameError: name 'cars_df' is not defined

In [5]:
# Spark DSL
american_cars_df = cars_df \
  .filter(col("Origin") == "Japan") \
  .select(col("Name"))

In [5]:
american_cars_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Name#13]
   +- Filter (isnotnull(Origin#14) AND (Origin#14 = Japan))
      +- InMemoryTableScan [Name#13, Origin#14], [isnotnull(Origin#14), (Origin#14 = Japan)]
            +- InMemoryRelation [Acceleration#8, Cylinders#9L, Displacement#10, Horsepower#11L, Miles_per_Gallon#12, Name#13, Origin#14, Weight_in_lbs#15L, Year#16], StorageLevel(memory, 1 replicas)
                  +- FileScan json [Acceleration#8,Cylinders#9L,Displacement#10,Horsepower#11L,Miles_per_Gallon#12,Name#13,Origin#14,Weight_in_lbs#15L,Year#16] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/notebooks/data/cars], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Acceleration:double,Cylinders:bigint,Displacement:double,Horsepower:bigint,Miles_per_Gallo...




In [6]:
american_cars_df.show(10, False) # показать 10 строк, не обрубать длинные строки

+---------------------------+
|Name                       |
+---------------------------+
|toyota corona mark ii      |
|datsun pl510               |
|datsun pl510               |
|toyota corona              |
|toyota corolla 1200        |
|datsun 1200                |
|toyota corona hardtop      |
|mazda rx2 coupe            |
|datsun 510 (sw)            |
|toyouta corona mark ii (sw)|
+---------------------------+
only showing top 10 rows



In [3]:
# Сохранить как таблицу в Spark, но не сохранять данные на диск
#  DataFrame => SQL metastore
print("DataFrame => SQL metastore = EXTERNAL TABLE")
cars_df.createOrReplaceTempView("cars")
spark.catalog.listTables()

DataFrame => SQL metastore = EXTERNAL TABLE


[Table(name='cars', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [4]:
# Spark SQL != Spark DSL
# Выполнить SQL запросы к DF, которые известны Apache Spark под какими-то именами
american_cars_df_v2 = spark.sql("SELECT Name FROM cars WHERE Origin = 'Japan'")
american_cars_df_v2.show(10, False)

+---------------------------+
|Name                       |
+---------------------------+
|toyota corona mark ii      |
|datsun pl510               |
|datsun pl510               |
|toyota corona              |
|toyota corolla 1200        |
|datsun 1200                |
|toyota corona hardtop      |
|mazda rx2 coupe            |
|datsun 510 (sw)            |
|toyouta corona mark ii (sw)|
+---------------------------+
only showing top 10 rows



In [6]:
american_cars_df_v2.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Name#13]
   +- Filter (isnotnull(Origin#14) AND (Origin#14 = Japan))
      +- InMemoryTableScan [Name#13, Origin#14], [isnotnull(Origin#14), (Origin#14 = Japan)]
            +- InMemoryRelation [Acceleration#8, Cylinders#9L, Displacement#10, Horsepower#11L, Miles_per_Gallon#12, Name#13, Origin#14, Weight_in_lbs#15L, Year#16], StorageLevel(memory, 1 replicas)
                  +- FileScan json [Acceleration#8,Cylinders#9L,Displacement#10,Horsepower#11L,Miles_per_Gallon#12,Name#13,Origin#14,Weight_in_lbs#15L,Year#16] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/notebooks/data/cars], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Acceleration:double,Cylinders:bigint,Displacement:double,Horsepower:bigint,Miles_per_Gallo...




In [7]:
# Временные таблицы действуют только на время жизни сессии
try:
  spark.newSession().sql("SELECT Name FROM cars WHERE Origin = 'Japan'")
except Exception as e:
  print (e)

[TABLE_OR_VIEW_NOT_FOUND] The table or view `cars` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 17;
'Project ['Name]
+- 'Filter ('Origin = Japan)
   +- 'UnresolvedRelation [cars], [], false



In [8]:
# Удаление внешней таблицы (External Table) не удаляет данные на диске, только в metastore
spark.sql("DROP TABLE cars")

DataFrame[]

In [14]:
spark.catalog.listTables()

[Table(name='cars_managed_table', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False)]

In [12]:
print("Ожидаемо падает:")
try:
  spark.sql("SELECT Name FROM cars WHERE Origin = 'Japan'")
except Exception as e:
  print(e)

Ожидаемо падает:
[TABLE_OR_VIEW_NOT_FOUND] The table or view `cars` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 17;
'Project ['Name]
+- 'Filter ('Origin = Japan)
   +- 'UnresolvedRelation [cars], [], false



In [14]:
# Но данные по прежнему доступны для чтения с диска
cars_df_again = spark.read.json("data/cars")
cars_df_again.count()

407

Сохранить датафрейм как Spark таблицу: `DataFrame => SQL metastore + Spark storage`

In [15]:
!find ~ -type d -name cars_managed_table | wc -l

0


In [10]:
cars_df \
  .write \
  .mode("overwrite") \
  .saveAsTable("cars_managed_table")

In [11]:
!find ~ -type d -name cars_managed_table -exec find {} \;

/home/jovyan/notebooks/spark-warehouse/cars_managed_table
/home/jovyan/notebooks/spark-warehouse/cars_managed_table/.part-00000-a9752fb9-7b8a-4959-9a50-e4806ab5715f-c000.snappy.parquet.crc
/home/jovyan/notebooks/spark-warehouse/cars_managed_table/._SUCCESS.crc
/home/jovyan/notebooks/spark-warehouse/cars_managed_table/part-00000-a9752fb9-7b8a-4959-9a50-e4806ab5715f-c000.snappy.parquet
/home/jovyan/notebooks/spark-warehouse/cars_managed_table/_SUCCESS


`saveAsTable` выполняет действия отличные от `save()` + `orc(...)` или `parquet(....)`.

В случае, например, `parquet()` указывается место для хранения файлов на диске:
```python
df.write \
  .parquet("data/parquet"). \
  save()
```

In [19]:
print("Чтение из управляемой таблицы (Managed Table) при помощи SQL запроса:")
american_cars_df_v2 = spark.sql("SELECT * FROM cars_managed_table")
american_cars_df_v2.show(10, False)

Чтение из управляемой таблицы (Managed Table) при помощи SQL запроса:
+------------+---------+------------+----------+----------------+-------------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|Name                     |Origin|Weight_in_lbs|Year      |
+------------+---------+------------+----------+----------------+-------------------------+------+-------------+----------+
|12.0        |8        |307.0       |130       |18.0            |chevrolet chevelle malibu|USA   |3504         |NULL      |
|12.0        |8        |307.0       |130       |18.0            |chevrolet chevelle malibu|USA   |3504         |1970-01-01|
|11.5        |8        |350.0       |165       |15.0            |buick skylark 320        |USA   |3693         |1970-01-01|
|11.0        |8        |318.0       |150       |18.0            |plymouth satellite       |USA   |3436         |1970-01-01|
|12.0        |8        |304.0       |150       |16.0          

In [20]:
assert(cars_df.count() == american_cars_df_v2.count())

In [17]:
# spark.table == spark.read.table
# cars_managed_df = spark.read.table("cars_managed_table")

print("Чтение из управляемой таблицы (Managed Table) при помощи Spark DSL:")

cars_managed_df = spark.table("cars_managed_table")
assert (cars_managed_df.count() != 0)
cars_managed_df.show(10, False)

In [15]:
spark.newSession().sql("SELECT * FROM cars_managed_table").show()


+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|                Name|Origin|Weight_in_lbs|      Year|
+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|      NULL|
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|1970-01-01|
|        11.5|        8|       350.0|       165|            15.0|   buick skylark 320|   USA|         3693|1970-01-01|
|        11.0|        8|       318.0|       150|            18.0|  plymouth satellite|   USA|         3436|1970-01-01|
|        12.0|        8|       304.0|       150|            16.0|       amc rebel sst|   USA|         3433|1970-01-01|
|        10.5|        8|       302.0|       140|

In [16]:
print("Удалить управляемую таблицу (Managed Table)")
spark.sql("DROP TABLE cars_managed_table")

Удалить управляемую таблицу (Managed Table)


DataFrame[]

In [17]:
!find ~ -type d -name cars_managed_table | wc -l

0


In [19]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/home/jovyan/notebooks/spark-warehouse'),
 Database(name='test', catalog='spark_catalog', description='', locationUri='file:/home/jovyan/notebooks/spark-warehouse/test.db')]

In [20]:
spark.catalog.listTables("test")

[Table(name='students', catalog='spark_catalog', namespace=['test'], description=None, tableType='MANAGED', isTemporary=False)]

In [18]:
# Через Spark SQL можно создавать таблицы и вставлять записи
spark.sql("CREATE SCHEMA test")
spark.sql("CREATE TABLE test.students (name VARCHAR(64), address VARCHAR(64)) USING PARQUET PARTITIONED BY (student_id INT)")
spark.sql("""
INSERT INTO test.students
VALUES ('Bob Brown', '456 Taylor St, Cupertino', 222222)
     , ('Cathy Johnson', '789 Race Ave, Palo Alto', 333333)
""")

DataFrame[]

In [21]:
ddl_demo_df = spark.sql("SELECT * FROM test.students")
ddl_demo_df.show(10, False)

+-------------+------------------------+----------+
|name         |address                 |student_id|
+-------------+------------------------+----------+
|Cathy Johnson|789 Race Ave, Palo Alto |333333    |
|Bob Brown    |456 Taylor St, Cupertino|222222    |
+-------------+------------------------+----------+



За дополнительными сведениями об INSERT можно обратиться к [документации](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)

In [22]:
print("Удалить схему test")
spark.sql("DROP SCHEMA test CASCADE")

## Delta Lake

[Delta](https://delta.io/learn/getting-started/) - современный формат эффективного хранения частоменяющихся данных

In [23]:
# сохранить первую версию
cars_df \
  .limit(5) \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .save("../out/my_delta_cars")

In [24]:
# Прочитать текущую версию
delta_df = spark.read \
  .format("delta") \
  .load("../out/my_delta_cars")

In [25]:
delta_df.createOrReplaceTempView("my_delta_cars")

In [26]:
spark.sql("select * from my_delta_cars").show()

In [27]:
# дописать одну строку / записать новую версию
cars_df.limit(1) \
    .write \
    .format("delta") \
    .mode("append") \
    .save("../out/my_delta_cars")

In [28]:
spark.sql("select count(*) from my_delta_cars").show()

In [29]:
df = spark \
  .read \
  .format("delta") \
  .option("versionAsOf", 0) \
  .load("../out/my_delta_cars")

df.count()

# Задания

1. Получить список всех сотрудников и их максимальные зарплаты
1. Получить список всех сотрудников, кто никогда не был менеджером
1. Для каждого сотрудника, найти разницу между их зарплатой (текущей/последней) и максимальной зарплатой в их отделе

In [30]:
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://postgres:5432/spark"
user = "docker"
password = "docker"

In [31]:
def read_table(table_name):
    return spark.read. \
        format("jdbc"). \
        option("driver", driver). \
        option("url", url). \
        option("user", user). \
        option("password", password). \
        option("dbtable", "public." + table_name). \
        load()

employees_df = read_table("employees")
salaries_df = read_table("salaries")
dept_managers_df = read_table("dept_manager")
dept_emp_df = read_table("dept_emp")
departments_df = read_table("departments")

In [32]:
# save table names
employees_df.createOrReplaceTempView("employees")
salaries_df.createOrReplaceTempView("salaries")
dept_managers_df.createOrReplaceTempView("dept_manager")
dept_emp_df.createOrReplaceTempView("dept_emp")
departments_df.createOrReplaceTempView("departments")