# PySpark - Работа с Dataset

## Введение в Dataset
Dataset - это строго типизированный API для работы с данными в Spark. Он был внедрен в Spark 1.6 и представляет собой расширение DataFrame API, объединяющее преимущества как RDD (строгая типизация), так и DataFrame (оптимизация запросов через Catalyst).

**Важно:** В PySpark Dataset API напрямую не доступен из-за особенностей динамической типизации Python. В PySpark DataFrame является эквивалентом Dataset[Row] в Scala/Java. Однако, понимание концепции Dataset важно для тех, кто работает в смешанных средах Java/Scala/Python.

## Содержание
1. Инициализация Spark и создание SparkSession
2. Dataset в экосистеме Spark
3. Типизация в PySpark
4. Использование UDF для типизированной обработки данных
5. Pandas UDF и vectorized UDF
6. Практические примеры работы с типизированной обработкой данных
7. Интеграция с Pandas через Pandas API на PySpark



## 1. Инициализация Spark и создание SparkSession



In [1]:
# !pip install findspark pandas matplotlib

In [2]:
import findspark
findspark.init()

In [3]:
# Импорт необходимых библиотек
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Dict, Tuple
%matplotlib inline



In [4]:
# Создание SparkSession
spark = (SparkSession
    .builder
    .appName("Dataset Practice")
    .master("local[*]")
    .getOrCreate()
)

# Проверка версии Spark
print(f"Spark Version: {spark.version}")

Spark Version: 3.0.3


## 2. Dataset в экосистеме Spark

### `Dataset` vs `DataFrame` vs `RDD`


Рассмотрим различия между Dataset, DataFrame и RDD.

В Scala/Java:
- RDD: Низкоуровневый API с полным контролем над данными. Операции не проверяются компилятором.
- DataFrame: Распределенная коллекция данных, организованная в именованные столбцы. Эквивалент Dataset[Row].
- Dataset: Строго типизированный API, проверяемый компилятором.

В PySpark:
- RDD: Также доступен и работает так же, как в Scala/Java.
- DataFrame: Основной API для работы с данными.
- Dataset: Отдельного Dataset API нет из-за динамической типизации Python.




In [5]:
# Создадим простой DataFrame для демонстрации
data = [("John", 28, "Sales"), 
        ("Anna", 34, "Finance"), 
        ("Robert", 42, "IT"), 
        ("Maria", 39, "HR")]
columns = ["name", "age", "department"]

In [6]:
# Создаем DataFrame в PySpark
df = spark.createDataFrame(data, columns)
df.show()

+------+---+----------+
|  name|age|department|
+------+---+----------+
|  John| 28|     Sales|
|  Anna| 34|   Finance|
|Robert| 42|        IT|
| Maria| 39|        HR|
+------+---+----------+



In [7]:
# Создаем RDD из того же набора данных
rdd = spark.sparkContext.parallelize(data)
print("RDD content:", rdd.take(4))


RDD content: [('John', 28, 'Sales'), ('Anna', 34, 'Finance'), ('Robert', 42, 'IT'), ('Maria', 39, 'HR')]



В Scala мы могли бы создать Dataset следующим образом:

```scala
case class Employee(name: String, age: Int, department: String)
val dataset = spark.createDataset(data.map(row => Employee(row._1, row._2, row._3)))
```

В Python вместо Dataset мы используем класс, близкий к концепции типизации:



In [8]:
# Определение класса Python для хранения данных
class Employee:
    def __init__(self, name, age, department):
        self.name = name
        self.age = age
        self.department = department
    
    def __repr__(self):
        return f"Employee({self.name}, {self.age}, {self.department})"

In [9]:
# Преобразование данных в список объектов Employee
employees = [Employee(name, age, dept) for name, age, dept in data]
print("Python объекты:", employees)

# Однако мы не можем напрямую создать Dataset из этих объектов в PySpark
# Вместо этого мы создаем DataFrame

Python объекты: [Employee(John, 28, Sales), Employee(Anna, 34, Finance), Employee(Robert, 42, IT), Employee(Maria, 39, HR)]


## 3. Типизация в PySpark

### Использование схем для обеспечения типизации


Хотя PySpark не имеет полноценного Dataset API, мы можем использовать схемы для 
обеспечения некоторого уровня типизации при работе с данными.



In [10]:
# Определение схемы с явными типами
employee_schema = StructType([
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("department", StringType(), True)
])

In [11]:
# Создание DataFrame с заданной схемой
typed_df = spark.createDataFrame(data, employee_schema)
typed_df.printSchema()
typed_df.show()

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- department: string (nullable = true)

+------+---+----------+
|  name|age|department|
+------+---+----------+
|  John| 28|     Sales|
|  Anna| 34|   Finance|
|Robert| 42|        IT|
| Maria| 39|        HR|
+------+---+----------+



In [12]:
# Попытка вставить данные неправильного типа
try:
    wrong_data = [("Alex", "thirty", "Marketing")]  # Возраст должен быть числом
    wrong_df = spark.createDataFrame(wrong_data, employee_schema)
    wrong_df.show()
except Exception as e:
    print("Ошибка при попытке вставить данные неправильного типа:")
    print(str(e))

Ошибка при попытке вставить данные неправильного типа:
field age: IntegerType can not accept object 'thirty' in type <class 'str'>


### Использование `pandas.DataFrame` с типизацией

Python имеет собственные механизмы для обеспечения типизации в pandas, 
которые можно использовать перед преобразованием в PySpark DataFrame.

In [13]:
# Создание pandas DataFrame с явными типами
pandas_df = pd.DataFrame(data, columns=columns)
pandas_df['age'] = pandas_df['age'].astype('int32')
pandas_df['name'] = pandas_df['name'].astype('string')
pandas_df['department'] = pandas_df['department'].astype('string')

print("Типы данных в pandas DataFrame:")
print(pandas_df.dtypes)

Типы данных в pandas DataFrame:
name          string
age            int32
department    string
dtype: object


In [14]:
# Преобразование в PySpark DataFrame
spark_df_from_pandas = spark.createDataFrame(pandas_df)
spark_df_from_pandas.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- department: string (nullable = true)



## 4. Использование UDF для типизированной обработки данных

### Определение обычных UDF


User Defined Functions (UDF) в PySpark позволяют нам применять пользовательскую 
логику к данным с определенной типизацией.


In [15]:
# Определение UDF с указанием возвращаемого типа
@udf(returnType=IntegerType())
def calculate_bonus(age, salary=1000):
    # Допустим, что бонус рассчитывается на основе возраста
    return int(salary * (age / 100))

In [16]:
# Использование UDF в DataFrame
df_with_bonus = typed_df.withColumn("bonus", calculate_bonus(col("age")))
df_with_bonus.show()


+------+---+----------+-----+
|  name|age|department|bonus|
+------+---+----------+-----+
|  John| 28|     Sales|  280|
|  Anna| 34|   Finance|  340|
|Robert| 42|        IT|  420|
| Maria| 39|        HR|  390|
+------+---+----------+-----+



In [None]:
# UDF для категоризации сотрудников по возрасту
@udf(returnType=StringType())
def age_category(age):
    if age < 30:
        return "Young"
    elif age < 40:
        return "Mid-career"
    else:
        return "Experienced"

In [18]:
df_with_category = df_with_bonus.withColumn("category", age_category(col("age")))
df_with_category.show()

+------+---+----------+-----+-----------+
|  name|age|department|bonus|   category|
+------+---+----------+-----+-----------+
|  John| 28|     Sales|  280|      Young|
|  Anna| 34|   Finance|  340| Mid-career|
|Robert| 42|        IT|  420|Experienced|
| Maria| 39|        HR|  390| Mid-career|
+------+---+----------+-----+-----------+



## 5. Pandas UDF и vectorized UDF

### Векторизованные UDF

Pandas UDF (векторизованные UDF) позволяют более эффективно обрабатывать данные,  
используя векторизацию и оптимизированные библиотеки Python, такие как NumPy и Pandas.  
Они предлагают значительное повышение производительности по сравнению с обычными UDF.



In [19]:
# Импорт необходимых библиотек для Pandas UDF
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Создание DataFrame с большим количеством строк для демонстрации преимуществ
large_data = [
    (f"Person-{i}", 20 + i % 40, ["Skill-1", "Skill-2"][i % 2]) 
    for i in range(10000)
]
large_df = spark.createDataFrame(large_data, ["name", "age", "skill"])



In [None]:
# Скалярный Pandas UDF
@pandas_udf(DoubleType())
def pandas_calculate_bonus(age_series):
    # Эта функция применяется к целым сериям pandas, а не к отдельным значениям
    return age_series * 10.5

In [21]:
# Применение Pandas UDF
large_df_with_bonus = large_df.withColumn("bonus", pandas_calculate_bonus(col("age")))
large_df_with_bonus.show(5)

+--------+---+-------+-----+
|    name|age|  skill|bonus|
+--------+---+-------+-----+
|Person-0| 20|Skill-1|210.0|
|Person-1| 21|Skill-2|220.5|
|Person-2| 22|Skill-1|231.0|
|Person-3| 23|Skill-2|241.5|
|Person-4| 24|Skill-1|252.0|
+--------+---+-------+-----+
only showing top 5 rows



## 6. Практические примеры работы с типизированной обработкой данных

### Пример: Анализ данных о сотрудниках

Рассмотрим более комплексный пример обработки данных о сотрудниках 
с использованием типизированных конструкций.


In [24]:
# Создание файла с данными о сотрудниках
employees_data = """
id,name,age,department,salary,hire_date,performance_scores
1,John Doe,32,IT,65000,2019-05-15,"{'technical': 85, 'communication': 78, 'teamwork': 90}"
2,Jane Smith,28,Marketing,58000,2020-03-10,"{'creativity': 92, 'communication': 88, 'organization': 85}"
3,Michael Brown,41,Finance,75000,2015-11-20,"{'analytical': 95, 'attention': 89, 'reporting': 92}"
4,Emily Davis,35,HR,62000,2018-07-05,"{'interpersonal': 94, 'organization': 91, 'communication': 87}"
5,Robert Wilson,39,IT,70000,2017-01-15,"{'technical': 90, 'problem_solving': 88, 'innovation': 85}"
6,Sarah Lee,31,Marketing,59500,2020-02-20,"{'creativity': 89, 'market_research': 86, 'presentation': 92}"
7,David Martinez,45,Finance,78000,2014-09-12,"{'analytical': 92, 'financial_modeling': 95, 'risk_assessment': 88}"
8,Lisa Anderson,33,HR,63500,2018-04-25,"{'recruitment': 91, 'policy': 87, 'mediation': 93}"
9,Thomas Taylor,37,IT,68000,2016-08-30,"{'technical': 87, 'architecture': 92, 'security': 89}"
10,Jennifer White,29,Marketing,57000,2020-05-18,"{'social_media': 94, 'content': 90, 'analytics': 85}"
"""

with open("employees_detailed.csv", "w") as f:
    f.write(employees_data)

In [26]:
!hadoop fs -put -f employees_detailed.csv /user/ubuntu/employees_detailed.csv

In [27]:
# Функция для парсинга строки JSON с оценками производительности
import json
def parse_performance_scores(scores_str):
    try:
        return json.loads(scores_str.replace("'", "\""))
    except:
        return {}


In [28]:
# Регистрация UDF для парсинга JSON
parse_scores_udf = udf(parse_performance_scores, MapType(StringType(), IntegerType()))

In [29]:
# Чтение данных с использованием схемы
employee_detailed_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False),
    StructField("department", StringType(), False),
    StructField("salary", IntegerType(), False),
    StructField("hire_date", DateType(), False),
    StructField("performance_scores", StringType(), True)
])

employees_detailed_df = spark.read \
    .option("header", "true") \
    .schema(employee_detailed_schema) \
    .csv("employees_detailed.csv")


In [30]:
# Парсинг JSON столбца
employees_detailed_df = employees_detailed_df \
    .withColumn("parsed_scores", parse_scores_udf(col("performance_scores")))

employees_detailed_df.printSchema()
employees_detailed_df.show(truncate=False)


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- performance_scores: string (nullable = true)
 |-- parsed_scores: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)

+---+--------------+---+----------+------+----------+-------------------------------------------------------------------+-------------------------------------------------------------------+
|id |name          |age|department|salary|hire_date |performance_scores                                                 |parsed_scores                                                      |
+---+--------------+---+----------+------+----------+-------------------------------------------------------------------+-------------------------------------------------------------------+
|1  |John Doe      |32 |IT     

In [32]:
# Определение Pandas UDF для расчета средней оценки производительности
@pandas_udf(DoubleType())
def avg_performance(scores_series):
    def calc_avg(scores_str):
        scores = parse_performance_scores(scores_str)
        if scores and len(scores) > 0:
            # Простое математическое выражение вместо вызова sum()
            total = 0
            for value in scores.values():
                total += value
            return total / len(scores)
        return 0.0
    
    return scores_series.apply(calc_avg)


In [33]:
# Применение Pandas UDF
employees_with_avg_score = employees_detailed_df \
    .withColumn("avg_performance", avg_performance(col("performance_scores")))

employees_with_avg_score.select("name", "department", "avg_performance").show()

+--------------+----------+-----------------+
|          name|department|  avg_performance|
+--------------+----------+-----------------+
|      John Doe|        IT|84.33333333333333|
|    Jane Smith| Marketing|88.33333333333333|
| Michael Brown|   Finance|             92.0|
|   Emily Davis|        HR|90.66666666666667|
| Robert Wilson|        IT|87.66666666666667|
|     Sarah Lee| Marketing|             89.0|
|David Martinez|   Finance|91.66666666666667|
| Lisa Anderson|        HR|90.33333333333333|
| Thomas Taylor|        IT|89.33333333333333|
|Jennifer White| Marketing|89.66666666666667|
+--------------+----------+-----------------+



In [39]:
# Анализ по отделам с использованием Pandas API

# Преобразование Spark DataFrame в Pandas-on-Spark DataFrame
psdf = employees_with_avg_score.toPandas()

# Анализ по отделам
department_analysis = psdf.groupby("department").agg({
    "salary": ["mean", "min", "max"],
    "age": "mean",
    "avg_performance": "mean"
}).reset_index()

department_analysis.columns = ["department", "avg_salary", "min_salary", "max_salary", "avg_age", "avg_performance"]
print(department_analysis)

  department    avg_salary  min_salary  max_salary    avg_age  avg_performance
0    Finance  76500.000000       75000       78000  43.000000        91.833333
1         HR  62750.000000       62000       63500  34.000000        90.500000
2         IT  67666.666667       65000       70000  36.000000        87.111111
3  Marketing  58166.666667       57000       59500  29.333333        89.000000


## 7. Интеграция с Pandas через Pandas API на PySpark

### Pandas API на PySpark

PySpark предоставляет Pandas API, который позволяет использовать знакомый интерфейс 
Pandas для работы с большими данными. Это близко подходит к концепции Dataset 
с точки зрения удобства работы и типизации.


Можно проверить локально на последней версии PySpark 3.5.5

In [43]:
# Импорт Pandas API на PySpark
import pyspark.pandas as ps

# Создание Pandas на Spark DataFrame
ps_df = ps.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Ellen'],
    'age': [25, 30, 35, 40, 45],
    'department': ['IT', 'HR', 'Finance', 'IT', 'Marketing']
})

print("Pandas на Spark DataFrame:")
print(ps_df)


Pandas DataFrame:
   id     name  age department
0   1    Alice   25         IT
1   2      Bob   30         HR
2   3  Charlie   35    Finance
3   4    David   40         IT
4   5    Ellen   45  Marketing


In [44]:
# Базовые операции, похожие на Pandas
print("\nФильтрация данных:")
print(ps_df[ps_df['age'] > 30])

print("\nГруппировка данных:")
print(ps_df.groupby('department').agg({'age': 'mean'}))


Фильтрация данных:
   id     name  age department
2   3  Charlie   35    Finance
3   4    David   40         IT
4   5    Ellen   45  Marketing

Группировка данных:
             age
department      
Finance     35.0
HR          30.0
IT          32.5
Marketing   45.0


In [None]:
# Преобразование в обычный Spark DataFrame
spark_df = ps_df.to_spark()
print("\nПреобразованный Spark DataFrame:")
spark_df.show()

In [None]:
# Преобразование обратно в Pandas (для небольших данных)
regular_pandas_df = ps_df.to_pandas()
print("\nОбычный Pandas DataFrame:")
print(regular_pandas_df)




Обычный Pandas DataFrame:
   id     name  age department
0   1    Alice   25         IT
1   2      Bob   30         HR
2   3  Charlie   35    Finance
3   4    David   40         IT
4   5    Ellen   45  Marketing


In [39]:
# Очистка ресурсов
!rm employees_detailed.csv

In [46]:
# Остановка SparkSession
spark.stop()

print("Dataset и типизированная обработка данных в PySpark практика завершена!")

Dataset и типизированная обработка данных в PySpark практика завершена!
