In [None]:
# Устанавливаем OpenJDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Закачиваем Spark
!wget -q http://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz -O spark.tgz
# Распаковываем архив со Spark
!tar xf spark.tgz
# Устанавливаем пакет findspark для работы со Spark из Python
!pip install -q findspark
# Настраиваем переменные окружения для работы с Apache Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"
# Находим установку Spark
import findspark
findspark.init()
# Подключаем необходимые модули для работы со Spark из Python
from pyspark.sql import SparkSession
# Создаем сессию Spark на локальном компьютере
spark = SparkSession.builder.master("local[*]").getOrCreate()
!mkdir sample_data/accounts

In [None]:
!echo '{  "id": "a1globalid",  "op": "c",  "ts": 1577863800000,  "data": {    "account_id": "a1",    "name": "Anton",    "address": "Moscow",    "phone_number": "12345678",    "email": "anthony@somebank.com"  }}' > /content/sample_data/accounts/1577863800000.json
!echo '{  "id": "a2globalid",  "op": "c",  "ts": 1577873800000,  "data": {    "account_id": "a2",    "name": "Alex",    "address": "Atlanta",    "phone_number": "98765432",    "email": "Alex@somebank.com"  }}' > /content/sample_data/accounts/1577873800000.json
!echo '{  "id": "a1globalid",  "op": "u",  "ts": 1577865600000,  "set": {    "phone_number": "87654321"  }}' > /content/sample_data/accounts/1577865600000.json
!echo '{  "id": "a1globalid",  "op": "u",  "ts": 1577890800000,  "set": {    "savings_account_id": "sa1"  }}' > /content/sample_data/accounts/1577890800000.json
!echo '{  "id": "a2globalid",  "op": "u",  "ts": 1577890900000,  "set": {    "savings_account_id": "sa2"  }}' > /content/sample_data/accounts/1577890900000.json
!echo '{  "id": "a1globalid",  "op": "u",  "ts": 1577894400000,  "set": {    "address": "Yekaterinburg",    "email": "anthony@anotherbank.com"  }}' > /content/sample_data/accounts/1577894400000.json
!echo '{  "id": "a1globalid",  "op": "u",  "ts": 1577926800000,  "set": {    "card_id": "c1"  }}' > /content/sample_data/accounts/1577926800000.json
!echo '{  "id": "a1globalid",  "op": "u",  "ts": 1579078860000,  "set": {    "card_id": ""  }}' > /content/sample_data/accounts/1579078860000.json
!echo '{  "id": "a1globalid",  "op": "u",  "ts": 1579163400000,  "set": {    "card_id": "c2"  }}' > /content/sample_data/accounts/1579163400000.json

Задача: Визуализируйте полное историческое табличное представление в табличном формате в стандартном выводе

In [None]:
from pyspark.sql.functions import last, coalesce
from pyspark.sql import Window
df = spark.read.json("/content/sample_data/accounts/")

In [None]:
df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- account_id: string (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone_number: string (nullable = true)
 |-- id: string (nullable = true)
 |-- op: string (nullable = true)
 |-- set: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- card_id: string (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- phone_number: string (nullable = true)
 |    |-- savings_account_id: string (nullable = true)
 |-- ts: long (nullable = true)



In [None]:
window = Window.partitionBy("id").orderBy("ts")

In [None]:
df = df.select("id", "op", "ts","data.account_id"
,coalesce("data.address","set.address").name("address") 
,coalesce("data.email","set.email").name("email") 
,"data.name"
,coalesce("data.phone_number","set.phone_number").name("phone_number")
,"set.card_id"
,"set.savings_account_id")

In [None]:
df = df.select("ts", last("account_id", True).over(window).name("account_id")
,last("address", True).over(window).name("address")
,last("email", True).over(window).name("email")
,last("name", True).over(window).name("name")
,last("phone_number", True).over(window).name("phone_number")
,last("card_id", True).over(window).name("card_id")
,last("savings_account_id", True).over(window).name("savings_account_id"))

In [None]:
df.show()

+-------------+----------+-------------+--------------------+-----+------------+-------+------------------+
|           ts|account_id|      address|               email| name|phone_number|card_id|savings_account_id|
+-------------+----------+-------------+--------------------+-----+------------+-------+------------------+
|1577863800000|        a1|       Moscow|anthony@somebank.com|Anton|    12345678|   null|              null|
|1577865600000|        a1|       Moscow|anthony@somebank.com|Anton|    87654321|   null|              null|
|1577890800000|        a1|       Moscow|anthony@somebank.com|Anton|    87654321|   null|               sa1|
|1577894400000|        a1|Yekaterinburg|anthony@anotherba...|Anton|    87654321|   null|               sa1|
|1577926800000|        a1|Yekaterinburg|anthony@anotherba...|Anton|    87654321|     c1|               sa1|
|1579078860000|        a1|Yekaterinburg|anthony@anotherba...|Anton|    87654321|       |               sa1|
|1579163400000|        a1|Ye