# Apache Spark – шпаргалка

In [1]:
import findspark
findspark.init("/usr/lib/spark3")

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Apache Spark Lecture").getOrCreate()
spark

Picked up _JAVA_OPTIONS: 
Picked up _JAVA_OPTIONS: 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/24 12:19:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/24 12:19:09 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/03/24 12:19:09 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/03/24 12:19:09 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/03/24 12:19:15 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [3]:
spark

In [3]:
spark.stop()

### Создание датафрейма из коллекции

In [3]:
import datetime

data = [
    ["user1",  datetime.datetime(2022, 6, 7),   1234556, 1,  567.8],
    ["user2",  datetime.datetime(2022, 6, 8),   2345633, 2,  1276.0],
    ["user3",  datetime.datetime(2022, 10, 11), 3687665, 10, 1053.0]
]

In [5]:
from pyspark.sql.types import *

schema = StructType([
    StructField("user", StringType(),    nullable = False),
    StructField("date", TimestampType(), nullable = False),
    StructField("product_id", LongType(), nullable = False),
    StructField("quantity", IntegerType(), nullable = False),
    StructField("payment", DoubleType(), nullable = False)
])

In [6]:
df = spark.createDataFrame(data, schema)

In [7]:
df.printSchema() 

root
 |-- user: string (nullable = false)
 |-- date: timestamp (nullable = false)
 |-- product_id: long (nullable = false)
 |-- quantity: integer (nullable = false)
 |-- payment: double (nullable = false)



In [8]:
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+-------------------+----------+--------+-------+
| user|               date|product_id|quantity|payment|
+-----+-------------------+----------+--------+-------+
|user1|2022-06-07 00:00:00|   1234556|       1|  567.8|
|user2|2022-06-08 00:00:00|   2345633|       2| 1276.0|
|user3|2022-10-11 00:00:00|   3687665|      10| 1053.0|
+-----+-------------------+----------+--------+-------+



                                                                                

### Подключение к PostgreSQL

In [None]:
spark.stop()

spark = SparkSession \
    .builder \
    .appName("Apache Spark Lecture") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.4") \
    .getOrCreate()

In [None]:
df_pg = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:port/db") \
    .option("dbtable", "table") \
    .option("user", "user") \
    .option("password", "password") \
    .option("driver", "org.postgresql.Driver") \
    .load()

### Чтение с HDFS и запись на HDFS

In [None]:
codes = spark.read.parquet("/user/suncelesta/data/airport_codes")

In [None]:
codes.write.parquet("...")

### Простые методы DataFrame

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame

In [None]:
df_pg.select("id", "click_id").show(3)

In [None]:
df_pg.orderBy("id").show(3)

In [None]:
df_pg.select(df_pg.id.alias("newname")).explain() # explain выводит план запроса

In [None]:
df_pg.where("id > 100").show(3)

In [None]:
df_pg.select("click_id").distinct().count()

In [None]:
codes.dropDuplicates(subset=["type", "iso_country"]).count()

In [None]:
codes.na.drop(how='all')

In [None]:
ab1.unionByName(ac1).show()

In [None]:
codes.join(country_codes, F.col("iso_country") == F.col("code_2"))

### Функции: обычные, аггрегации, оконные

In [None]:
import pyspark.sql.functions as F

https://spark.apache.org/docs/3.3.0/api/python/reference/pyspark.sql/functions.html

In [None]:
name_rads = name_coords.withColumn("coords_rad", F.struct(
    F.radians(F.col("coords")[0]).alias("lat"),
    F.radians(F.col("coords")[1]).alias("lon")
))

In [None]:
rads_udf = F.udf(lambda c: rads(c), output_type)

In [None]:
codes.groupBy("iso_country").agg(F.count("*").alias("count")).show()

In [None]:
from pyspark.sql import Window
w = Window.partitionBy("iso_country")
codes\
.withColumn("max_elevation", F.max("elevation_ft").over(w))\
.select("name", "iso_country", "iso_region", "max_elevation")\
.show()

### Кэш и чекпоинт

In [None]:
from pyspark import StorageLevel
airports.persist(storageLevel=StorageLevel.DISK_ONLY_2)
airports.cache() 

In [None]:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
airports.checkpoint()