In [1]:
import pyspark
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth

#os.environ["JAVA_HOME"] = "/home/alex/.jdks/corretto-11.0.17/"

# RDD API

API почти полностью совпадает с нативным на Scala. Отсутствие типов и избыточный синтаксис для lambda-функций в Python затрудняет написание и чтение подобного кода. 

In [2]:
sc = SparkContext(master="local[*]")
# sc = SparkContext(master='spark://localhost:7077')

JAVA_HOME is not set


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

В качестве примере создадим `RDD` из чисел от 0 до 999, отфильтруем те, которые делятся на 3 и оставшиеся возведем в квадрат. 

In [None]:
rdd = sc.parallelize(range(1, 1000))

result = (
    rdd
    .filter(lambda x: x % 3 == 0)
    .map(lambda x: x**2)
    .map(lambda x: x + 1)
    .collect()
)

result[:10]

                                                                                

[10, 37, 82, 145, 226, 325, 442, 577, 730, 901]

Дальше будем работать с данными о средней дневной температуре в Санкт-Петербурге за 2008-2016 гг. Файл имеет следующий формат (`CSV` без заголовка):

In [None]:
WEATHER_FILE = "../data/weather.csv"

# !head -n 10 {WEATHER_FILE}
[line.strip() for line, _ in zip(open(WEATHER_FILE), range(10))]

['2008-01-01,0',
 '2008-01-02,-5',
 '2008-01-03,-11',
 '2008-01-04,-11',
 '2008-01-05,-12',
 '2008-01-06,-14',
 '2008-01-07,-6',
 '2008-01-08,-7',
 '2008-01-09,-6',
 '2008-01-10,0']

Реализуем функцию `parse_date`, которая будет переводить строку с датой в кортеж `(year, month, day)`. Затем:
- прочитаем содержимое файла в `RDD`
- разделим содержимое каждой строки по запятой
- первую колонку превратим в кортеж `(year, month, day)` и прибавим к нему вторую - температуру

In [None]:
def parse_date(date_str):
    """2008-02-02 -> (2008, 2, 2)"""
    return tuple(map(int, date_str.split("-")))


rdd = (
    sc.textFile(WEATHER_FILE)
    .map(lambda line: line.split(","))
    .map(lambda lst: parse_date(lst[0]) + (int(lst[1]),))
)

rdd.takeSample(num=10, withReplacement=False)

[(2014, 7, 24, 26),
 (2011, 2, 10, -11),
 (2009, 6, 21, 16),
 (2008, 4, 11, 4),
 (2009, 1, 8, -11),
 (2010, 2, 28, 2),
 (2013, 11, 18, 4),
 (2014, 12, 27, -3),
 (2016, 1, 27, 2),
 (2010, 2, 18, -13)]

Посчитаем среднюю дневную температуру летних месяцев по годам. Для этого:
- отфильтруем данные для летних месяцев
- отобразим исходные данные в набор пар ключ-значение - `(year, temp)`
- каждое значение отобразим в пару `(temp, 1)`
- сделаем свертку по ключу, считая суммарную температуру и число дней
- посчитаем среднее
- отсортируем по году

In [None]:
(rdd
  .filter(lambda lst: 6 <= lst[1] <= 8)
  .map(lambda lst: (lst[0], lst[-1]))
  .mapValues(lambda v: (v, 1))
  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
)



[(2008, (1667, 92)),
 (2010, (2035, 92)),
 (2012, (1756, 92)),
 (2014, (1905, 92)),
 (2016, (1778, 92)),
 (2009, (1723, 92)),
 (2011, (1940, 91)),
 (2013, (1982, 92)),
 (2015, (1751, 92))]

In [None]:
(
    rdd
    .filter(lambda lst: 6 <= lst[1] <= 8)
    .map(lambda lst: (lst[0], lst[-1]))
    .mapValues(lambda v: (v, 1))
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    .mapValues(lambda v: v[0] / v[1])
    .sortByKey()
    .take(10)
)

[(2008, 18.119565217391305),
 (2009, 18.72826086956522),
 (2010, 22.119565217391305),
 (2011, 21.318681318681318),
 (2012, 19.08695652173913),
 (2013, 21.543478260869566),
 (2014, 20.706521739130434),
 (2015, 19.032608695652176),
 (2016, 19.32608695652174)]

# Spark SQL

Точка входа - `SparkSession`. Из неё можно получить `SparkContext`.

In [None]:
spark = SparkSession.builder.master("local[*]").appName("PySpark").getOrCreate()

Будем решать ту же задачу, но с помощью `Spark SQL`. В этом случае данные логически представимы в виде таблицы с типизированными колонками. Для того чтобы прочитать `CSV`-файл нужно указать типы колонок или попросить вывести их самостоятельно. Ниже три способа:

In [None]:
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    TimestampType,
    FloatType,
)

# 1. Вручную описываем схему
schema = StructType(
    [
        StructField("date", TimestampType(), False),
        StructField("temp", IntegerType(), False),
    ]
)
df = spark.read.csv(WEATHER_FILE, schema=schema)

# 2. Для простоты можно передать её в виде строки
df = spark.read.csv(WEATHER_FILE, schema="date Timestamp, temp INT")

# 3. Можно вывести схему автоматически
df = spark.read.csv(WEATHER_FILE, inferSchema=True).toDF("date", "temp")

df.show(10)

+----------+----+
|      date|temp|
+----------+----+
|2008-01-01|   0|
|2008-01-02|  -5|
|2008-01-03| -11|
|2008-01-04| -11|
|2008-01-05| -12|
|2008-01-06| -14|
|2008-01-07|  -6|
|2008-01-08|  -7|
|2008-01-09|  -6|
|2008-01-10|   0|
+----------+----+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import udf, pandas_udf
from typing import Iterator
import pyspark.sql.functions as F

import pandas as pd


@pandas_udf(FloatType())
def our_agg_udf(x: pd.Series) -> float:
    return x.mean() + 2


@udf(IntegerType())
def our_udf_bad(x: int) -> int:
    return 5 * x + 1


@pandas_udf(IntegerType())
def our_udf(chunks: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for chunk in chunks:
        yield 5 * chunk + 1


df_res = (
    df.repartition(2)
    .select(F.year(df.date).alias("year"), "temp")
    .groupby("year")
    .agg(our_agg_udf("temp").alias("year_avg"))
)

Любые операции с вручную написанные на `Python`  будут неэффективны. Потом все действия над данными лучше описывать декларативно с помощью "функций" из пакета `pyspark.sql.functions`

In [None]:
import pyspark.sql.functions as F

yt = (
    df.filter((F.month("date") >= 6) & (F.month("date") <= 8))
      .select(F.year("date").alias("year"), "temp")
)

# Можно использовать другой синтаксис
# yt = df.filter((F.month(df['date']) >= 6) & (F.month(df['date']) <= 8))
# yt = df.filter((F.month(F.col('date')) >= 6) & (F.month(F.col('date')) <= 8))

yt.show(10)

+----+----+
|year|temp|
+----+----+
|2008|  15|
|2008|  17|
|2008|  10|
|2008|  16|
|2008|  20|
|2008|  20|
|2008|  11|
|2008|  17|
|2008|  17|
|2008|  13|
+----+----+
only showing top 10 rows



In [None]:
from typing import Iterable
import pandas as pd


def pandas_mapper(iterator: Iterable[pd.DataFrame]):
    for pdf in iterator:
        pdf["temp"] -= 10
        yield pdf


yt.mapInPandas(pandas_mapper, yt.schema).show(10)

[Stage 19:>                                                         (0 + 1) / 1]

+----+----+
|year|temp|
+----+----+
|2008|   5|
|2008|   7|
|2008|   0|
|2008|   6|
|2008|  10|
|2008|  10|
|2008|   1|
|2008|   7|
|2008|   7|
|2008|   3|
+----+----+
only showing top 10 rows



                                                                                

In [None]:
psdf = yt.pandas_api()
psdf[psdf["year"] == 2013].head()



Unnamed: 0,year,temp
459,2013,25
460,2013,28
461,2013,28
462,2013,28
463,2013,25


In [None]:
yt = yt.repartition(10)
yt.cache()
yt.show()

+----+----+
|year|temp|
+----+----+
|2010|  15|
|2010|  31|
|2013|  18|
|2013|  24|
|2015|  19|
|2015|  19|
|2014|  16|
|2013|  21|
|2016|  24|
|2016|  18|
|2014|  15|
|2009|  21|
|2010|  32|
|2016|  14|
|2011|  27|
|2011|  16|
|2016|  23|
|2012|  21|
|2010|  16|
|2013|  20|
+----+----+
only showing top 20 rows



In [None]:
yt.groupBy("year").agg(F.avg("temp").alias("avg_temp")).sort("year").show(10)

+----+------------------+
|year|          avg_temp|
+----+------------------+
|2008|18.119565217391305|
|2009| 18.72826086956522|
|2010|22.119565217391305|
|2011|21.318681318681318|
|2012| 19.08695652173913|
|2013|21.543478260869566|
|2014|20.706521739130434|
|2015|19.032608695652176|
|2016| 19.32608695652174|
+----+------------------+



`DataFrame` можно зарегестрировать в виде именованной сущности, в таком случае все эти действия мы можем выполнить с помощью `SQL` запроса:

In [None]:
df.createOrReplaceTempView("weather")

yt = spark.sql("""SELECT year(date) as y, avg(temp) FROM weather 
                    WHERE month(date) > 5  and month(date) < 9 
                    GROUP BY y 
                    ORDER BY y""")
yt.show()

+----+------------------+
|   y|         avg(temp)|
+----+------------------+
|2008|18.119565217391305|
|2009| 18.72826086956522|
|2010|22.119565217391305|
|2011|21.318681318681318|
|2012| 19.08695652173913|
|2013|21.543478260869566|
|2014|20.706521739130434|
|2015|19.032608695652176|
|2016| 19.32608695652174|
+----+------------------+



Привидем ещё один пример - посчитаем разницу между температурой за конкретный день и средней температурой в этой месяц. 

Cоздадим `DataFrame` с полями `(year, month, avg_temp)`. Метод `select` открывает новый `DataFrame`, но с иными полями, вычисленными на основе существующих.

In [None]:
new_df = df.select(
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    "temp",
)

ms = new_df.groupby("year", "month").agg(F.avg("temp").alias("month_avg"))

ms.show(10)

+----+-----+--------------------+
|year|month|           month_avg|
+----+-----+--------------------+
|2012|   10|    7.67741935483871|
|2010|    7|   27.06451612903226|
|2010|   12|  -7.935483870967742|
|2015|    2|-0.21428571428571427|
|2008|    8|  17.419354838709676|
|2009|   11|  2.8333333333333335|
|2014|    4|   8.766666666666667|
|2015|   12|  2.3225806451612905|
|2016|    7|  20.451612903225808|
|2016|   11| -1.4333333333333333|
+----+-----+--------------------+
only showing top 10 rows



Дальше воспользуемся `join`, который похож на аналогичную операцию в `SQL`

In [None]:
m_avg = new_df.join(ms, ["year", "month"])

m_avg.show(10)

+----+-----+---+----+-------------------+
|year|month|day|temp|          month_avg|
+----+-----+---+----+-------------------+
|2008|    1|  1|   0|-1.6129032258064515|
|2008|    1|  2|  -5|-1.6129032258064515|
|2008|    1|  3| -11|-1.6129032258064515|
|2008|    1|  4| -11|-1.6129032258064515|
|2008|    1|  5| -12|-1.6129032258064515|
|2008|    1|  6| -14|-1.6129032258064515|
|2008|    1|  7|  -6|-1.6129032258064515|
|2008|    1|  8|  -7|-1.6129032258064515|
|2008|    1|  9|  -6|-1.6129032258064515|
|2008|    1| 10|   0|-1.6129032258064515|
+----+-----+---+----+-------------------+
only showing top 10 rows



В итоге посчитаем разницу:

In [None]:
m_avg.withColumn("diff", F.abs(m_avg["temp"] - m_avg["month_avg"])).show(10)

+----+-----+---+----+-------------------+------------------+
|year|month|day|temp|          month_avg|              diff|
+----+-----+---+----+-------------------+------------------+
|2008|    1|  1|   0|-1.6129032258064515|1.6129032258064515|
|2008|    1|  2|  -5|-1.6129032258064515|3.3870967741935485|
|2008|    1|  3| -11|-1.6129032258064515| 9.387096774193548|
|2008|    1|  4| -11|-1.6129032258064515| 9.387096774193548|
|2008|    1|  5| -12|-1.6129032258064515|10.387096774193548|
|2008|    1|  6| -14|-1.6129032258064515|12.387096774193548|
|2008|    1|  7|  -6|-1.6129032258064515| 4.387096774193548|
|2008|    1|  8|  -7|-1.6129032258064515| 5.387096774193548|
|2008|    1|  9|  -6|-1.6129032258064515| 4.387096774193548|
|2008|    1| 10|   0|-1.6129032258064515|1.6129032258064515|
+----+-----+---+----+-------------------+------------------+
only showing top 10 rows

