<a href="https://colab.research.google.com/github/PENROG21/PySpark/blob/main/SQL_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



# Пример использования DataFrame API

### **Описание задачи**

Предположим, у вас есть JSON файл с информацией о людях. Вам нужно прочитать эти данные, выполнить различные операции, такие как фильтрация, группировка и агрегация, а затем сохранить результат в формате CSV.

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Создание SparkSession
spark = SparkSession.builder.appName("DataFrameAPIExample").getOrCreate()

# Чтение данных из JSON файла
df = spark.read.json("people.json")

# Фильтрация данных
filtered_df = df.filter(col("age") > 30)

# Группировка и агрегация данных
grouped_df = df.groupBy("department").agg({"age": "avg", "name": "count"}).withColumnRenamed("avg(age)", "avg_age").withColumnRenamed("count(name)", "count")

# Сортировка данных
sorted_df = grouped_df.orderBy(col("count").desc())

# Показ результатов
filtered_df.show()
sorted_df.show()

# Сохранение результирующего DataFrame в CSV файл
sorted_df.write.csv("output.csv", header=True)

+---------------+---+----------+----+
|_corrupt_record|age|department|name|
+---------------+---+----------+----+
|           NULL| 35|        HR|Jane|
|           NULL| 40|   Finance|Mark|
+---------------+---+----------+----+

+-----------+-----+-------+
| department|count|avg_age|
+-----------+-----+-------+
|         HR|    2|   32.5|
|    Finance|    2|   32.5|
|Engineering|    1|   23.0|
|       NULL|    0|   NULL|
+-----------+-----+-------+



Таким образом, как такового SQL нету. То есть есть SQL-подобные функции, но их предоставляет сам Spark.

# Теперь давайте посмотрим на то, как работает "оригинальный SQL" в PySpark.

### Описание задачи
Предположим, у вас есть два JSON файла: один с информацией о людях, а другой с информацией о департаментах. Вам нужно прочитать эти данные, зарегистрировать их как временные таблицы, выполнить JOIN-запрос и сохранить результат в формате CSV.

In [6]:
from pyspark.sql import SparkSession

# Создание SparkSession
spark = SparkSession.builder.appName("SQLAPIExample").getOrCreate()

# Чтение данных из JSON файлов
people_df = spark.read.json("/content/sample_data/people.json")
departments_df = spark.read.json("/content/sample_data/departments.json")

# Регистрация DataFrame как временные таблицы
people_df.createOrReplaceTempView("people")
departments_df.createOrReplaceTempView("departments")

# Выполнение JOIN-запроса с использованием SQL
join_df = spark.sql("""
SELECT p.name, p.age, d.department_name
FROM people p
JOIN departments d
ON p.department_id = d.id
""")

# Показ результатов
join_df.show()

# Сохранение результирующего DataFrame в CSV файл
join_df.write.csv("output.csv", header=True)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `p`.`department_id` cannot be resolved. Did you mean one of the following? [`p`.`department`, `d`.`department_name`, `p`.`name`, `p`.`age`, `d`.`id`].; line 5 pos 3;
'Project ['p.name, 'p.age, 'd.department_name]
+- 'Join Inner, ('p.department_id = id#166L)
   :- SubqueryAlias p
   :  +- SubqueryAlias people
   :     +- View (`people`, [_corrupt_record#148,age#149L,department#150,name#151])
   :        +- Relation [_corrupt_record#148,age#149L,department#150,name#151] json
   +- SubqueryAlias d
      +- SubqueryAlias departments
         +- View (`departments`, [_corrupt_record#164,department_name#165,id#166L])
            +- Relation [_corrupt_record#164,department_name#165,id#166L] json


Думаю, по коду и комментариям все понятно.

*   Создаем объект SparkSession.
*   Читаем JSON файлы в DataFrame.
*   Регистрируем DataFrame как временные таблицы для выполнения SQL-запросов.
*   Выполняем SQL-запрос для соединения таблиц по идентификатору департамента.
*   Сохраняем результат в CSV файл.


# JOIN

In [7]:
from pyspark.sql import SparkSession

# Создание SparkSession
spark = SparkSession.builder.appName("JoinExamples").getOrCreate()

# Пример данных для DataFrame people
people_data = [
    ("John", 30, 1),
    ("Doe", 25, 2),
    ("Jane", 35, 1),
    ("Mark", 40, 2),
    ("Smith", 23, 3)
]
people_columns = ["name", "age", "department_id"]
people_df = spark.createDataFrame(data=people_data, schema=people_columns)

# Пример данных для DataFrame departments
departments_data = [
    (1, "HR"),
    (2, "Finance"),
    (3, "Engineering"),
    (4, "Marketing")
]
departments_columns = ["id", "department_name"]
departments_df = spark.createDataFrame(data=departments_data, schema=departments_columns)

# Показ данных
people_df.show()
departments_df.show()

+-----+---+-------------+
| name|age|department_id|
+-----+---+-------------+
| John| 30|            1|
|  Doe| 25|            2|
| Jane| 35|            1|
| Mark| 40|            2|
|Smith| 23|            3|
+-----+---+-------------+

+---+---------------+
| id|department_name|
+---+---------------+
|  1|             HR|
|  2|        Finance|
|  3|    Engineering|
|  4|      Marketing|
+---+---------------+



## Внутреннее соединение (Inner Join)
Внутреннее соединение возвращает только те строки, которые имеют совпадающие значения в обеих таблицах.

In [8]:
# Внутреннее соединение
inner_join_df = people_df.join(departments_df, people_df.department_id == departments_df.id, "inner")
inner_join_df.show()

+-----+---+-------------+---+---------------+
| name|age|department_id| id|department_name|
+-----+---+-------------+---+---------------+
| John| 30|            1|  1|             HR|
| Jane| 35|            1|  1|             HR|
|  Doe| 25|            2|  2|        Finance|
| Mark| 40|            2|  2|        Finance|
|Smith| 23|            3|  3|    Engineering|
+-----+---+-------------+---+---------------+



## Левое внешнее соединение (Left Outer Join)
Левое внешнее соединение возвращает все строки из левой таблицы и соответствующие строки из правой таблицы. Если соответствия нет, возвращаются NULL значения для столбцов правой таблицы.

In [None]:
# Левое внешнее соединение
left_outer_join_df = people_df.join(departments_df, people_df.department_id == departments_df.id, "left_outer")
left_outer_join_df.show()

## Правое внешнее соединение (Right Outer Join)
Правое внешнее соединение возвращает все строки из правой таблицы и соответствующие строки из левой таблицы. Если соответствия нет, возвращаются NULL значения для столбцов левой таблицы.

In [10]:
# Правое внешнее соединение
right_outer_join_df = people_df.join(departments_df, people_df.department_id == departments_df.id, "right_outer")
right_outer_join_df.show()

+-----+----+-------------+---+---------------+
| name| age|department_id| id|department_name|
+-----+----+-------------+---+---------------+
| Jane|  35|            1|  1|             HR|
| John|  30|            1|  1|             HR|
| Mark|  40|            2|  2|        Finance|
|  Doe|  25|            2|  2|        Finance|
|Smith|  23|            3|  3|    Engineering|
| NULL|NULL|         NULL|  4|      Marketing|
+-----+----+-------------+---+---------------+



## Полное внешнее соединение (Full Outer Join)
Полное внешнее соединение возвращает все строки, когда есть совпадение в одной из таблиц. Строки без совпадений в обеих таблицах будут иметь NULL значения для столбцов из другой таблицы.

In [13]:
full_outer_join_df = people_df.join(departments_df, people_df.department_id == departments_df.id, "outer") full_outer_join_df.show()

SyntaxError: invalid syntax (<ipython-input-13-c4ac1b22b3fe>, line 1)

## Полное перекрестное соединение (Cross Join)
Полное перекрестное соединение возвращает декартово произведение строк обеих таблиц.



In [11]:
cross_join_df = people_df.crossJoin(departments_df)

cross_join_df.show()

+-----+---+-------------+---+---------------+
| name|age|department_id| id|department_name|
+-----+---+-------------+---+---------------+
| John| 30|            1|  1|             HR|
| John| 30|            1|  2|        Finance|
|  Doe| 25|            2|  1|             HR|
|  Doe| 25|            2|  2|        Finance|
| John| 30|            1|  3|    Engineering|
| John| 30|            1|  4|      Marketing|
|  Doe| 25|            2|  3|    Engineering|
|  Doe| 25|            2|  4|      Marketing|
| Jane| 35|            1|  1|             HR|
| Jane| 35|            1|  2|        Finance|
| Mark| 40|            2|  1|             HR|
| Mark| 40|            2|  2|        Finance|
|Smith| 23|            3|  1|             HR|
|Smith| 23|            3|  2|        Finance|
| Jane| 35|            1|  3|    Engineering|
| Jane| 35|            1|  4|      Marketing|
| Mark| 40|            2|  3|    Engineering|
| Mark| 40|            2|  4|      Marketing|
|Smith| 23|            3|  3|    E

## Соединение с использованием условия (Join with Condition)
Соединение с использованием условия позволяет задать более сложные условия соединения.

In [12]:
condition_join_df = people_df.join(departments_df, (people_df.department_id == departments_df.id) &
 (people_df.age > 30), "inner")
condition_join_df.show()

+----+---+-------------+---+---------------+
|name|age|department_id| id|department_name|
+----+---+-------------+---+---------------+
|Jane| 35|            1|  1|             HR|
|Mark| 40|            2|  2|        Finance|
+----+---+-------------+---+---------------+

