<a href="https://colab.research.google.com/github/Kuvuklija/Spark-Stepic/blob/main/Simple_RDD_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Устанавливаем PySpark

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("Simple RDD Example").setMaster("local[*]")
sc = SparkContext(conf=conf)

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(numbers)

even_numbers_rdd = rdd.filter(lambda x: x % 2 == 0)

sum_even_numbers = even_numbers_rdd.sum()

print('Четные числа:', even_numbers_rdd.collect())
print('Сумма четных чисел:', sum_even_numbers)

sc.stop()

Четные числа: [2, 4, 6, 8, 10]
Сумма четных чисел: 30


In [None]:
# CREATING DATAFRAME 3 methods
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]

# create explicit schema
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Value", IntegerType(), True)
])

# 1. create DataFrame with explicit schema
df = spark.createDataFrame(data, schema)
#df.printSchema()

# 2. automate set schema by reading data from csv
data = [{"name": "Alice", "age": 29},
        {"name": "John", "age": 32},
        {"name": "Bob", "age": 67},
        ]
df_auto = spark.createDataFrame(data)
#df_auto.printSchema()
#df_auto.show()

# 3. create from RDD
data = [("Alice", 29), ("Bob", 98), ("Cathy", 32), ("Molly", 32)]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(["name", "age"])
df.show()

#print(df.take(2))
df.select('name')#.show()
df.selectExpr("age + 50 as age_plus_50", "name")#.show()
df.filter(df["age"]>30)#.show()
df.where(df["age"] > 30)#.show()
df.groupBy("age").count().show()

from pyspark.sql.functions import avg, max
df.agg(
    avg("age").alias("average_age"),
    max("age").alias("max_age")
).show()

df.orderBy(df["age"].desc()).show()
df.sort('age', ascending=False).show()

spark.stop()


+-----+---+
| name|age|
+-----+---+
|Alice| 29|
|  Bob| 98|
|Cathy| 32|
|Molly| 32|
+-----+---+

+---+-----+
|age|count|
+---+-----+
| 29|    1|
| 98|    1|
| 32|    2|
+---+-----+

+-----------+-------+
|average_age|max_age|
+-----------+-------+
|      47.75|     98|
+-----------+-------+

+-----+---+
| name|age|
+-----+---+
|  Bob| 98|
|Cathy| 32|
|Molly| 32|
|Alice| 29|
+-----+---+

+-----+---+
| name|age|
+-----+---+
|  Bob| 98|
|Cathy| 32|
|Molly| 32|
|Alice| 29|
+-----+---+



In [None]:
# pip install pyspark avro
# sudo apt install python3-avro

# Запись данных в Avro
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.1")
        .appName("Read")
        .getOrCreate()
)
data = [("Alice", 25), ("Bob", 30), ("Vadim", 46)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.write.format("avro").save("./Avro_Files/1")

In [None]:
# чтение данных с помощью RDD
from pyspark import SparkContext

sc = SparkContext("local", "Read text file example")
rdd = sc.textFile("/content/sample_data/text.txt")

# обработка данных
words = rdd.flatMap(lambda line: line.split(" "))
word_count = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# вывод результата
print(word_count.collect())

sc.stop()

[('John', 3), ('Snow', 1), ('my', 1), ('favorit', 1), ('hero', 1), ('Hello', 1), ('World', 2), ('broooo', 1), ('yoooo', 1), ('I', 1), ('love', 1), ('you', 1), ('persik', 1)]


In [None]:
# РАБОТА С RDD

from pyspark import SparkContext,  SparkConf

conf = SparkConf().setAppName("File").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.textFile("./sample_data/README.txt")
lines = rdd.filter(lambda line: "mnist" in line)
print(lines.collect())

rdd = sc.textFile("/content/test.csv")
lines = rdd.map(lambda line: line.split(","))
print(lines.take(3))

# ищем в 1 колонке по имени Ariela ---> искомая строка
filtered_rdd = lines.filter(lambda cols: cols[1]=='Ariela')

# сохраняем файл --> получим несколько файлов в новой папке content (отказоустойчивость)
filtered_rdd.saveAsTextFile("content")

print(filtered_rdd.collect())

sc.stop()

['*   `mnist_*.csv` is a small sample of the', '    described at: http://yann.lecun.com/exdb/mnist/']
[['id', 'firstname', 'lastname', 'email', 'email2', 'profession'], ['100', 'Tera', 'Rozanna', 'Tera.Rozanna@yopmail.com', 'Tera.Rozanna@gmail.com', 'worker'], ['101', 'Cathyleen', 'Swigart', 'Cathyleen.Swigart@yopmail.com', 'Cathyleen.Swigart@gmail.com', 'worker']]
[['106', 'Ariela', 'Chrystel', 'Ariela.Chrystel@yopmail.com', 'Ariela.Chrystel@gmail.com', 'police officer']]


In [None]:
# РАБОТА С DATAFRAME
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('read') \
  .getOrCreate()

# чтение
df = spark.read.csv("/content/test.csv", header=True)
df.show()

# запись в csv (в несколько партиций, чтобы хранить данные в HDFS)
df.write.mode("overwrite").option("header", 'true').csv("result_csv")

spark.stop()


+---+---------+----------+--------------------+--------------------+--------------+
| id|firstname|  lastname|               email|              email2|    profession|
+---+---------+----------+--------------------+--------------------+--------------+
|100|     Tera|   Rozanna|Tera.Rozanna@yopm...|Tera.Rozanna@gmai...|        worker|
|101|Cathyleen|   Swigart|Cathyleen.Swigart...|Cathyleen.Swigart...|        worker|
|102|  Sherrie|      Cath|Sherrie.Cath@yopm...|Sherrie.Cath@gmai...|        doctor|
|103|    Marti|      Bluh|Marti.Bluh@yopmai...|Marti.Bluh@gmail.com|        worker|
|104|    Diena|Hieronymus|Diena.Hieronymus@...|Diena.Hieronymus@...|     developer|
|105|    Fayre|   Armanda|Fayre.Armanda@yop...|Fayre.Armanda@gma...|police officer|
|106|   Ariela|  Chrystel|Ariela.Chrystel@y...|Ariela.Chrystel@g...|police officer|
|107|   Valeda|    Philoo|Valeda.Philoo@yop...|Valeda.Philoo@gma...|police officer|
|108|    Codie|     Melan|Codie.Melan@yopma...|Codie.Melan@gmail...|        

In [1]:
# Использование sql-подобных функций

from pyspark.sql import SparkSession
from pyspark.sql.functions import col


spark = SparkSession.builder.appName("DataFrame API Example").getOrCreate()

df = spark.read.json("/content/people.json")

# фильтрация
filtered_df = df.filter(col("age") > 30)

# группировка
grouped_df = df.groupBy("department").agg({"age":"avg", "name":"count"}) \
  .withColumnRenamed("count(name)", "count") \
  .withColumnRenamed("avg(age)", "avg_age")

# сортировка
sorted_df = grouped_df.filter(col("avg_age").isNotNull()).orderBy(col("count").desc())

filtered_df.show()
sorted_df.show()

sorted_df.write.csv('output.csv', header=True)

spark.stop()

+---------------+---+----------+----+
|_corrupt_record|age|department|name|
+---------------+---+----------+----+
|           NULL| 35|        HR|Jane|
|           NULL| 40|   Finance|Mark|
+---------------+---+----------+----+

+-----------+-----+-------+
| department|count|avg_age|
+-----------+-----+-------+
|         HR|    2|   32.5|
|    Finance|    2|   32.5|
|Engineering|    1|   23.0|
+-----------+-----+-------+



In [4]:
# Использование SQL (DataFrame --> временные вьюхи)

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SQLAPIExample").getOrCreate()

people_df = spark.read.json("/content/people.json")
department_df = spark.read.json("/content/department.json")

# DF как временные представления SQL
people_df.createOrReplaceTempView("people")
department_df.createOrReplaceTempView("departments")

join_df = spark.sql("""
select p.name, p.age, d.department_name
from people as p
inner join departments as d on d.department_name = p.department
""")

join_df.show()

join_df.write.csv("output_join.csv", header=True)

spark.stop()

+-----+---+---------------+
| name|age|department_name|
+-----+---+---------------+
| John| 30|             HR|
|  Doe| 25|        Finance|
| Jane| 35|             HR|
| Mark| 40|        Finance|
|Smith| 23|    Engineering|
+-----+---+---------------+



In [12]:
# Разные JOIN

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Join Examples").getOrCreate()

people_data = [
    ("John", 30, 1),
    ("Doe", 25, 2),
    ("Jane", 35, 1),
    ("Mark", 40, 2),
    ("Smith", 23, 3)
]
people_columns = ["name", "age", "department_id"]
people_df = spark.createDataFrame(data= people_data, schema=people_columns)

department_data = [
    (1, "HR"),
    (2, "Finance"),
    (3, "Engineering"),
    (4, "Marketing")
]
department_columns = ['id', 'department_name']
department_df = spark.createDataFrame(data=department_data, schema=department_columns)

people_df.show()
department_df.show()

# INNER JOIN
inner_join_df = people_df.join(department_df, people_df.department_id == department_df.id, "inner")
inner_join_df.show()

# LEFT OUTER JOIN
left_join_df = department_df.join(people_df, department_df.id == people_df.department_id, "left_outer" )
left_join_df.show()

# RIGHT OUTER JOIN
right_join_df = department_df.join(people_df, department_df.id == people_df.department_id, "right_outer" )
right_join_df.show()

# FULL OUTER JOIN
full_join_df = department_df.join(people_df, department_df.id == people_df.department_id, "outer" )
full_join_df.show()

# CROSS JOIN
cross_join_df = department_df.crossJoin(people_df)
cross_join_df.show()

# JOIN WITH CONDITION
condition_join_df = department_df.join(people_df, (department_df.id == people_df.department_id) & (people_df.age > 30), "inner" )
condition_join_df.show()


+-----+---+-------------+
| name|age|department_id|
+-----+---+-------------+
| John| 30|            1|
|  Doe| 25|            2|
| Jane| 35|            1|
| Mark| 40|            2|
|Smith| 23|            3|
+-----+---+-------------+

+---+---------------+
| id|department_name|
+---+---------------+
|  1|             HR|
|  2|        Finance|
|  3|    Engineering|
|  4|      Marketing|
+---+---------------+

+-----+---+-------------+---+---------------+
| name|age|department_id| id|department_name|
+-----+---+-------------+---+---------------+
| John| 30|            1|  1|             HR|
| Jane| 35|            1|  1|             HR|
|  Doe| 25|            2|  2|        Finance|
| Mark| 40|            2|  2|        Finance|
|Smith| 23|            3|  3|    Engineering|
+-----+---+-------------+---+---------------+

+---+---------------+-----+----+-------------+
| id|department_name| name| age|department_id|
+---+---------------+-----+----+-------------+
|  1|             HR| Jane|  35|