### Start SparkSession

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = SparkSession \
            .builder \
            .appName("pyspark_local") \
    .getOrCreate()


print("Активные Spark сессии:", spark.sparkContext.uiWebUrl)

### Read

In [4]:
PATH = 'data/orders.csv'

In [None]:
spark.read.csv(PATH).show()

In [None]:
spark.read.csv(PATH, sep=',', header=True).show(5)

In [None]:
df = spark.read.csv(PATH, sep=',', header=True)

In [None]:
df.printSchema()

In [None]:
result = df\
    .withColumnRenamed("direction_eng", "direction")\
    .withColumnRenamed("measure_eng", "measure")

result.columns

### PrintSchema

In [27]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- address: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- country: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- country_code: string (nullable = true)



In [None]:
result = df \
    .withColumnRenamed("registration_date", "registered_at") \
    .withColumnRenamed("birthdate", "dob") \
    .withColumnRenamed("country_code", "iso_code")

result.columns


### Select

In [None]:
result.select('company').distinct().show(10, truncate=False)

### GroupBy

In [None]:
result\
    .groupBy('country')\
    .agg(F.count('*').alias('total_rows'))\
    .orderBy(F.col('total_rows').desc())\
    .show()

### Filter

In [None]:
# Два варианта написании фильтрации
df_de_salary = result\
    .where(F.col("country") == "Korea") \
    .where(F.col("salary").isNotNull()) \

df_de_salary2 = result \
    .where('country = "Korea"') \
    .where('salary IS NOT NULL')

print('df_de_salary',df_de_salary.count())
print('df_de_salary2',df_de_salary2.count())
print(df_de_salary.count() == df_de_salary2.count())

### Save to CSV

In [38]:
df_de_salary.show(2, truncate=False)

+-----------------+------------------------+---------------------------------------------+-----------------------------+-----------------+---------------------------+-----------------------------+----------+-------+------------------------------------+------+--------+
|name             |email                   |address                                      |registered_at                |phone            |company                    |job                          |dob       |country|uuid                                |salary|iso_code|
+-----------------+------------------------+---------------------------------------------+-----------------------------+-----------------+---------------------------+-----------------------------+----------+-------+------------------------------------+------+--------+
|Rhonda Foley     |jonathan92@example.org  |1234 Cox Light Suite 381, Brownfurt, NV 24776|2024-08-30T00:00:00.000+03:00|513-361-0719     |White-Oneill               |Academic librarian         

In [40]:
df_de_salary.columns

['name',
 'email',
 'address',
 'registered_at',
 'phone',
 'company',
 'job',
 'dob',
 'country',
 'uuid',
 'salary',
 'iso_code']

In [41]:
final = df_de_salary.select(
    'registered_at',
    'country',
    'iso_code',
    'salary',
    'job',
    'company',
    'name',
    'email',
    F.col('registered_at').cast('date').alias('reg_date')
)

final.show(2, truncate=False)


+-----------------------------+-------+--------+------+-----------------------------+---------------------------+-----------------+------------------------+----------+
|registered_at                |country|iso_code|salary|job                          |company                    |name             |email                   |reg_date  |
+-----------------------------+-------+--------+------+-----------------------------+---------------------------+-----------------+------------------------+----------+
|2024-08-30T00:00:00.000+03:00|Korea  |LV      |2300  |Academic librarian           |White-Oneill               |Rhonda Foley     |jonathan92@example.org  |2024-08-30|
|2024-09-03T00:00:00.000+03:00|Korea  |PE      |14000 |Development worker, community|Parker, Mcdowell and Moreno|Christine Hopkins|davisvanessa@example.com|2024-09-03|
+-----------------------------+-------+--------+------+-----------------------------+---------------------------+-----------------+------------------------+----

In [42]:
# Сохранение неконтроллируемое по кол-ву файлов
final\
    .write\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_no_control')

partition_num = final.rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

# Сохранение контроллируемое по кол-ву файлов - ОДИН ФАЙЛ
# final\
#     .coalesce(1)\
#     .write\
#     .format('csv')\
#     .options(header='True', sep=';')\
#     .csv('data/final_one_file') 

# partition_num = final.coalesce(1).rdd.getNumPartitions()
# print(f'Кол-во партиций {partition_num}')


# Сохранения с партицированием
# final\
#     .write\
#     .partitionBy('load_date')\
#     .format('csv')\
#     .options(header='True', sep=';')\
#     .csv('data/final_partitioned')

# print_df = final.select('load_date').distinct()
# print(f'Load_date distinct: {print_df.count()}')


# Сохранения с партицированием и repartition внутри самой партиции
# final\
#     .repartition(1, 'load_date')\
#     .write\
#     .partitionBy('load_date')\
#     .format('csv')\
#     .options(header='True', sep=';')\
#     .csv('data/final_partitioned_repart')

# partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
# print(f'Кол-во партиций {partition_num}')

[Stage 66:>                                                         (0 + 8) / 8]

Кол-во партиций 8


                                                                                

### Read Transformed

In [48]:
reader_no_control = spark\
                        .read\
                        .csv('data/final_no_control/', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')

reader_final_one_file = spark\
                            .read\
                            .csv('data/final_one_file/', header=True, sep=';')\
                            .where(''' load_date = "2024-01-01" ''')

reader_partitioned = spark\
                        .read\
                        .csv('data/final_partitioned', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')

reader_partitioned_repart = spark\
                                .read\
                                .csv('data/final_partitioned_repart', header=True, sep=';')\
                                .where(''' load_date = "2024-01-01" ''')


reader_no_control.count() # number of files read: 16 | size of files read: 88.4 MiB | 2.5 s (90 ms, 301 ms, 384 ms)

reader_final_one_file.count() # number of files read: 1 | size of files read: 88.4 MiB | 3.2 s (306 ms, 407 ms, 420 ms )

reader_partitioned.count() # number of files read: 16 | size of files read: 16.4 MiB | 305 ms (32 ms, 39 ms, 54 ms )

reader_partitioned_repart.count() # number of files read: 1 | size of files read: 16.4 MiB | 179 ms (9 ms, 43 ms, 44 ms )

### JOIN

In [None]:
data = [
    (14000, "Северный"),
    (11000, "Южный"),
    (10000, "Восточный"),
    (26000, "Западный"),
    (56000, "Центральный")
]

region_df = spark.createDataFrame(data, schema='region_id long, name string')

region_df.show()


customs_data = spark\
                .read\
                .csv('data/customs_data.csv', header=True, sep=';')

customs_data.show(2)

In [23]:
# Отключим автоматический Broadcast JOIN
import time
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [50]:
# Замерим выполнение запроса без broadcast join

start_time = time.time()

customs_data.join(region_df, customs_data.region==region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for join operation: {end_time - start_time:.2f} seconds")

In [51]:
# Замерим выполнение запроса c broadcast join

start_time = time.time()

customs_data.join(F.broadcast(region_df), customs_data.region == region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for broadcast join operation: {end_time - start_time:.2f} seconds")

### Cache | Persist

In [52]:
customs_data.cache().count()

In [53]:
customs_data.unpersist()

In [54]:
from pyspark.storagelevel import StorageLevel

customs_data.persist(StorageLevel.DISK_ONLY).count()

### Repartition & Coalesce

In [56]:
data = [(1,'one'), (2,'two'), (3,'three'), (4,'four'),
        (5,'five'), (6,'six'), (7, 'seven'), (8, 'eight'),
        (9, 'nine')]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

In [57]:
# Намеренно перемешаем и поделим на 8 разделов
mix = df.repartition(8)
mix.rdd.glom().collect()

In [58]:
mix.repartition(3).rdd.glom().collect()

In [59]:
mix.coalesce(3).rdd.glom().collect()

In [60]:
mix.toPandas().head()

In [None]:
# OUT OF MEMORY

d = spark.read.csv('data/customs_data.csv', header=True, sep='\t')
d.collect()

In [34]:
spark.stop()