<a href="https://colab.research.google.com/github/Maks2811/test_nagaev/blob/main/spark_dataframe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ed024abd300c0d61f37cb6434085f2dccafd117cfd04123bad233f9339bdf898
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# **Создание датафрейма с описанием схемы**

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
.appName("schema") \
.config("spark.master", "local[*]") \
.getOrCreate()

# создаем набор данных
data =[("Alice", 25), ("Artem", 26), ("Ivan", 48)]

# описываем схему датафрейма
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Создаем датафрейм
df =spark.createDataFrame(data, schema)

# Выводим схему на печать
df.printSchema()

df.show()

spark.stop()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|Artem| 26|
| Ivan| 48|
+-----+---+



# **Создание датафрейма без явного указания схемы**

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
.appName("schema") \
.config("spark.master", "local[*]") \
.getOrCreate()

# создаем набор данных без уточнения схемы
data =[{"name":"Alice", "age":29},
       {"name":"Artem", "age":129},
       {"name":"Ivan", "age":56}
       ]

# Создаем датафрейм
df =spark.createDataFrame(data, schema)

# Выводим схему на печать
df.printSchema()

df.show()   # выводим, датафрейм, схема распознается автоматически

spark.stop()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-----+---+
| name|age|
+-----+---+
|Alice| 29|
|Artem|129|
| Ivan| 56|
+-----+---+



# **Создание датафрейма из RDD**

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
.appName("schema") \
.config("spark.master", "local[*]") \
.getOrCreate()

# создаем набор данных без уточнения схемы
data =[("Alice", 25), ("Artem", 26), ("Ivan", 48)]

rdd = spark.sparkContext.parallelize(data)  # создаем rdd

# Создаем датафрейм
df = rdd.toDF(["name", "age"])   # создаем датафрейм, в скобках указываем названия полей
# Выводим схему на печать
df.printSchema()

df.show(1)   # выводим только первую строку датафрейма
spark.stop()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
+-----+---+
only showing top 1 row



# **Различные методы**

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
.appName("schema") \
.config("spark.master", "local[*]") \
.getOrCreate()

# создаем набор данных без уточнения схемы
data =[("Alice", 25), ("Artem", 26), ("Ivan", 48)]

rdd = spark.sparkContext.parallelize(data)  # создаем rdd

# Создаем датафрейм
df = rdd.toDF(["name", "age"])   # создаем датафрейм, в скобках указываем названия полей
# Выводим схему на печать
df.printSchema()

df.show()
print(df.head(2))
print(df.take(2))
print(df.head())

spark.stop()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|Artem| 26|
| Ivan| 48|
+-----+---+

[Row(name='Alice', age=25), Row(name='Artem', age=26)]
[Row(name='Alice', age=25), Row(name='Artem', age=26)]
Row(name='Alice', age=25)


In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
.appName("schema") \
.config("spark.master", "local[*]") \
.getOrCreate()

# создаем набор данных без уточнения схемы
data =[("Alice", 25), ("Artem", 26), ("Ivan", 48)]

rdd = spark.sparkContext.parallelize(data)  # создаем rdd

# Создаем датафрейм
df = rdd.toDF(["name", "age"])   # создаем датафрейм, в скобках указываем названия полей
# Выводим схему на печать
df.printSchema()

df.show()

columns = df.columns
print(columns)    # выводим список названий колонок(полей)

print(df.dtypes)  # выводим наименования колонок с типами данных в них

# spark.stop()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|Artem| 26|
| Ivan| 48|
+-----+---+

['name', 'age']
[('name', 'string'), ('age', 'bigint')]


In [21]:
df.select("name").show()     # возвращает данные из столбца name

+-----+
| name|
+-----+
|Alice|
|Artem|
| Ivan|
+-----+



In [22]:
df.selectExpr("name", "age + 1 as age_plus_one").show()   # прибавялем 1 к возрасту и обозначаем alias для получившегося результата

+-----+------------+
| name|age_plus_one|
+-----+------------+
|Alice|          26|
|Artem|          27|
| Ivan|          49|
+-----+------------+



In [24]:
df.filter(df["age"]>30).show()        # фильтрация по условию (age>30)

+----+---+
|name|age|
+----+---+
|Ivan| 48|
+----+---+



In [25]:
df.where(df["age"]>30).show()     # фильтрация по условию (age>30)

+----+---+
|name|age|
+----+---+
|Ivan| 48|
+----+---+



In [26]:
df.groupBy("age").count().show()    # группировка с агрегацией

+---+-----+
|age|count|
+---+-----+
| 25|    1|
| 26|    1|
| 48|    1|
+---+-----+



In [29]:
from pyspark.sql.functions import avg, max
df.groupby("name").agg(
    avg("age").alias("average_age"),
    max("age").alias("max_age")
).show()

+-----+-----------+-------+
| name|average_age|max_age|
+-----+-----------+-------+
|Alice|       25.0|     25|
|Artem|       26.0|     26|
| Ivan|       48.0|     48|
+-----+-----------+-------+



In [33]:
df.orderBy(df["name"].desc()).show()   # сортировка в обратном порядке

+-----+---+
| name|age|
+-----+---+
| Ivan| 48|
|Artem| 26|
|Alice| 25|
+-----+---+



In [34]:
df.orderBy("name").show()   # сортировка

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
|Artem| 26|
| Ivan| 48|
+-----+---+



In [35]:
df.sort("age", ascending=False).show()      # сортировка по убыванию

+-----+---+
| name|age|
+-----+---+
| Ivan| 48|
|Artem| 26|
|Alice| 25|
+-----+---+



In [39]:
from pyspark.sql.functions import col

df.withColumn("age_plus_one", df["age"]+1).show()   # добавляем новую колонку с алиасом

df_1 = df.withColumn("age_*_two", df["age"]*2).show()  # присваиваем переменной значение df с новой колонкой
print(df_1)

+-----+---+------------+
| name|age|age_plus_one|
+-----+---+------------+
|Alice| 25|          26|
|Artem| 26|          27|
| Ivan| 48|          49|
+-----+---+------------+

+-----+---+---------+
| name|age|age_*_two|
+-----+---+---------+
|Alice| 25|       50|
|Artem| 26|       52|
| Ivan| 48|       96|
+-----+---+---------+

None


In [40]:
df.withColumnRenamed("age", "fio").show()   # переименовываем колонку

+-----+---+
| name|fio|
+-----+---+
|Alice| 25|
|Artem| 26|
| Ivan| 48|
+-----+---+



In [41]:
df.drop("name").show()   # удаляем столбец

+---+
|age|
+---+
| 25|
| 26|
| 48|
+---+



In [42]:
df.distinct().show()    # вывод уникальных записей

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
| Ivan| 48|
|Artem| 26|
+-----+---+



In [43]:
df.limit(1).show()    # вывод первой строки (записи)

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
+-----+---+



In [45]:
df1 = spark.createDataFrame([(1,"Artem"), (2, "Bob")], ["id","Ivan"])
df2 = spark.createDataFrame([(1,100), (2, 200)], ["id","Ivan"])

df_join = df1.join(df2, "id", "inner")
df_join.show()

+---+-----+----+
| id| Ivan|Ivan|
+---+-----+----+
|  1|Artem| 100|
|  2|  Bob| 200|
+---+-----+----+



In [48]:
df1 = spark.createDataFrame([(1,"Artem"), (2, "Bob")], ["id","Ivan"])
df2 = spark.createDataFrame([(1,100), (2, 200)], ["id","Ivan"])

df_join = df1.join(df2)           # делаем join
df_join.show()

+---+-----+---+----+
| id| Ivan| id|Ivan|
+---+-----+---+----+
|  1|Artem|  1| 100|
|  1|Artem|  2| 200|
|  2|  Bob|  1| 100|
|  2|  Bob|  2| 200|
+---+-----+---+----+



In [50]:
df.sample(withReplacement=False, fraction=0.1).show()    # сэмплирование. Случайная выборка данных из датафрейма

+----+---+
|name|age|
+----+---+
+----+---+

