### Написание приложения

In [46]:
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .getOrCreate()

df = spark.read.csv("video_game_sales.csv", header=True, inferSchema=True)
df.show(5)

# Применение трансформаций
result_df = df.groupBy("publisher").agg({"global_sales": "sum"})

# Запись результата в CSV
# Запись результата обратно в HDFS (или локальную файловую систему) с режимом "overwrite"
result_df.write.mode("overwrite").csv("hdfs://namenode:9000/path/to/output.csv", header=True)

# Завершение работы
spark.stop()

+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|rank|                name|platform|year|       genre|publisher|na_sales|eu_sales|jp_sales|other_sales|global_sales|
+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing| Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports| Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing| Nintendo|   11.27|    8.89|   10.22|        1.0|       31.37|
+----+--------------------+--------+----+------------+---------+

### Распределенные набора данных

In [47]:
from pyspark import SparkContext

# Создание SparkContext
sc = SparkContext("local", "RDD Example")

# Создание RDD из списка
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Применение map для преобразования значений
squared_rdd = rdd.map(lambda x: x ** 2)

# Получение результатов
result = squared_rdd.collect()  # Вернет список
print(result)

# Остановка SparkContext
sc.stop()

[1, 4, 9, 16, 25]


### Фреймы данных

In [44]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Создание SparkSession
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()

# Пример данных
data = [Row(value=1, category='A'),
        Row(value=2, category='B'),
        Row(value=3, category='A'),
        Row(value=4, category='B')]

# Создание DataFrame из списка
df = spark.createDataFrame(data)

# Выбор столбцов
df.select("value").show()

# Применение агрегирования
df.groupBy("category").agg({"value": "sum"}).show()

# Остановка SparkSession
spark.stop()

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
+-----+

+--------+----------+
|category|sum(value)|
+--------+----------+
|       A|         4|
|       B|         6|
+--------+----------+



### Расширение логики работы с данными с помощью UDF

UDF (User-Defined Functions) позволяют вам определять собственные функции для обработки данных в Spark. Они могут быть использованы как в RDD, так и в DataFrame.

In [24]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql import Row

# Создание Spark Context и Session
sc = SparkContext("local", "Example")
spark = SparkSession.builder.appName("Example").getOrCreate()

# Создание RDD
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# Преобразование RDD в DataFrame с использованием Row
df = spark.createDataFrame(rdd.map(lambda x: Row(value=x)))

# Определение UDF
def square(x):
    return x * x

square_udf = udf(square, IntegerType())

# Применение UDF к DataFrame
df_with_square = df.withColumn("squared_value", square_udf(df["value"]))

# Показать результат
df_with_square.show()

# Завершение работы
sc.stop()
spark.stop()

+-----+-------------+
|value|squared_value|
+-----+-------------+
|    1|            1|
|    2|            4|
|    3|            9|
|    4|           16|
|    5|           25|
+-----+-------------+



### Запись данных в HDFS

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Создание SparkSession
spark = SparkSession.builder \
    .appName("Hadoop Integration Example") \
    .getOrCreate()

# Создание данных в виде списка объектов Row
data = [
    Row(category='A', value=10),
    Row(category='B', value=20),
    Row(category='A', value=30),
    Row(category='B', value=40)
]

# Преобразование списка в DataFrame
df = spark.createDataFrame(data)
df.show()

# Применение трансформаций
result_df = df.groupBy("category").agg({"value": "sum"})

# Запись результата в HDFS
result_df.write.csv("hdfs://namenode:9000/user/kseo/dir/output.csv", header=True)

# Завершение работы
spark.stop()

+--------+-----+
|category|value|
+--------+-----+
|       A|   10|
|       B|   20|
|       A|   30|
|       B|   40|
+--------+-----+



In [45]:
#### Пример на Python

Вот простой пример приложения на Python, использующего PySpark для обработки данных:

from pyspark.sql import SparkSession

# Создание SparkSession

spark = SparkSession.builder \

    .appName("MySparkApp") \

    .getOrCreate()

# Чтение данных из CSV файла

df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Показать данные

df.show()

# Применение трансформаций

result_df = df.groupBy("category").agg({"value": "sum"})

# Запись результата в CSV

result_df.write.csv("output.csv", header=True)

# Завершение работы

spark.stop()

SyntaxError: invalid syntax (1249926768.py, line 3)