### Start SparkSession

In [35]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = SparkSession \
            .builder \
            .appName("halltape_pyspark_local") \
    .getOrCreate()


print("Активные Spark сессии:", spark.sparkContext.uiWebUrl)

### Read

In [3]:
PATH = 'data/customs_data.csv'

In [36]:
spark.read.csv(PATH).show()

In [37]:
spark.read.csv(PATH, sep=';', header=True).show(5)

In [38]:
df = spark.read.csv(PATH, sep=';', header=True)

df.show(truncate=False)

df.show(2, False, True)

### PrintSchema

In [39]:
df.printSchema()

In [40]:
result = df\
            .withColumnRenamed("direction_eng", "direction")\
            .withColumnRenamed("measure_eng", "measure")

result.columns

### Select

In [41]:
result.select('country').distinct().show(10, truncate=False)

### GroupBy

In [42]:
result\
    .groupBy('country')\
    .agg(F.count('*').alias('total_rows'))\
    .orderBy(F.col('total_rows').desc())\
    .show()

### Filter

In [43]:
# Два варианта написании фильтрации
df_de = result\
            .where(F.col('country') == 'DE')\
            .where(F.col('value').isNotNull())

df_de2 = result\
            .where(''' country == "DE" ''')\
            .where(''' value IS NOT NULL ''')

print(df_de.count() == df_de2.count())

### Save to CSV

In [44]:
df_de.show(2, truncate=False)

In [45]:
df_de.columns

In [46]:
final = df_de\
            .select('month',
                    'country',
                    'code',
                    'value',
                    'netto',
                    'quantity',
                    'region',
                    'district',
                    'direction',
                    'measure',
                    F.col('load_date').cast('date'))

final.show(2, truncate=False)

In [47]:
# Сохранение неконтроллируемое по кол-ву файлов
final\
    .write\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_no_control')

partition_num = final.rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

# Сохранение контроллируемое по кол-ву файлов - ОДИН ФАЙЛ
final\
    .coalesce(1)\
    .write\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_one_file') 

partition_num = final.coalesce(1).rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')


# Сохранения с партицированием
final\
    .write\
    .partitionBy('load_date')\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned')

print_df = final.select('load_date').distinct()
print(f'Load_date distinct: {print_df.count()}')


# Сохранения с партицированием и repartition внутри самой партиции
final\
    .repartition(1, 'load_date')\
    .write\
    .partitionBy('load_date')\
    .format('csv')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned_repart')

partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

### Read Transformed

In [48]:
reader_no_control = spark\
                        .read\
                        .csv('data/final_no_control/', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')

reader_final_one_file = spark\
                            .read\
                            .csv('data/final_one_file/', header=True, sep=';')\
                            .where(''' load_date = "2024-01-01" ''')

reader_partitioned = spark\
                        .read\
                        .csv('data/final_partitioned', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')

reader_partitioned_repart = spark\
                                .read\
                                .csv('data/final_partitioned_repart', header=True, sep=';')\
                                .where(''' load_date = "2024-01-01" ''')


reader_no_control.count() # number of files read: 16 | size of files read: 88.4 MiB | 2.5 s (90 ms, 301 ms, 384 ms)

reader_final_one_file.count() # number of files read: 1 | size of files read: 88.4 MiB | 3.2 s (306 ms, 407 ms, 420 ms )

reader_partitioned.count() # number of files read: 16 | size of files read: 16.4 MiB | 305 ms (32 ms, 39 ms, 54 ms )

reader_partitioned_repart.count() # number of files read: 1 | size of files read: 16.4 MiB | 179 ms (9 ms, 43 ms, 44 ms )

### JOIN

In [None]:
data = [
    (14000, "Северный"),
    (11000, "Южный"),
    (10000, "Восточный"),
    (26000, "Западный"),
    (56000, "Центральный")
]

region_df = spark.createDataFrame(data, schema='region_id long, name string')

region_df.show()


customs_data = spark\
                .read\
                .csv('data/customs_data.csv', header=True, sep=';')

customs_data.show(2)

In [23]:
# Отключим автоматический Broadcast JOIN
import time
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [50]:
# Замерим выполнение запроса без broadcast join

start_time = time.time()

customs_data.join(region_df, customs_data.region==region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for join operation: {end_time - start_time:.2f} seconds")

In [51]:
# Замерим выполнение запроса c broadcast join

start_time = time.time()

customs_data.join(F.broadcast(region_df), customs_data.region == region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for broadcast join operation: {end_time - start_time:.2f} seconds")

### Cache | Persist

In [52]:
customs_data.cache().count()

In [53]:
customs_data.unpersist()

In [54]:
from pyspark.storagelevel import StorageLevel

customs_data.persist(StorageLevel.DISK_ONLY).count()

### Repartition & Coalesce

In [56]:
data = [(1,'one'), (2,'two'), (3,'three'), (4,'four'),
        (5,'five'), (6,'six'), (7, 'seven'), (8, 'eight'),
        (9, 'nine')]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

In [57]:
# Намеренно перемешаем и поделим на 8 разделов
mix = df.repartition(8)
mix.rdd.glom().collect()

In [58]:
mix.repartition(3).rdd.glom().collect()

In [59]:
mix.coalesce(3).rdd.glom().collect()

In [60]:
mix.toPandas().head()

In [None]:
# OUT OF MEMORY

d = spark.read.csv('data/customs_data.csv', header=True, sep='\t')
d.collect()

In [34]:
spark.stop()