### Start SparkSession

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Create new Spark session connected to cluster
spark = (
    SparkSession
    .builder
    .appName("docker-spark-cluster")
    .master("spark://spark-master:7077")
    .config("spark.submit.deployMode", "client")
    .config("spark.driver.host", "jupyterlab")
    .getOrCreate()
)

print("Active Spark session:", spark.sparkContext.uiWebUrl)
print("Master URL:", spark.sparkContext.master)
print("App Name:", spark.sparkContext.appName)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/02 14:14:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Active Spark session: http://jupyterlab:4040
Master URL: spark://spark-master:7077
App Name: halltape_pyspark_cluster


### Read

In [2]:
PATH = 'data/customs_data.csv'

In [3]:
spark.read.csv(PATH).show()

                                                                                

+--------------------+
|                 _c0|
+--------------------+
|month;country;cod...|
|01/2016;IT;620469...|
|01/2016;CN;900190...|
|01/2016;BY;841430...|
|01/2016;US;901850...|
|01/2016;EE;902110...|
|01/2016;FR;381600...|
|01/2016;MX;852351...|
|01/2016;JP;620452...|
|01/2016;KR;611020...|
|01/2016;KG;852713...|
|01/2016;ZA;842123...|
|01/2016;CN;851810...|
|01/2016;TR;841790...|
|01/2016;IT;390610...|
|01/2016;CZ;870840...|
|01/2016;ES;640419...|
|01/2016;IT;940490...|
|01/2016;UA;820780...|
|01/2016;CN;330410...|
+--------------------+
only showing top 20 rows



                                                                                

In [4]:
df = spark.read.csv(PATH, sep=';', header=True)

df.show(truncate=False)

df.show(2, False, True)

+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|month  |country|code      |value |netto |quantity|region|district|direction_eng|measure_eng|load_date                    |
+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|01/2016|IT     |6204695000|131   |1     |7       |46000 |01      |IM           |ShT        |2024-07-01T00:00:00.000+03:00|
|01/2016|CN     |9001900009|112750|18    |0       |46000 |01      |IM           |1          |2024-01-01T00:00:00.000+03:00|
|01/2016|BY     |8414302004|392   |57    |8       |50000 |06      |IM           |ShT        |2024-06-01T00:00:00.000+03:00|
|01/2016|US     |9018509000|54349 |179   |0       |40000 |02      |IM           |1          |2024-04-01T00:00:00.000+03:00|
|01/2016|EE     |9021101000|17304 |372   |0       |46000 |01      |IM           |1          |2024-02-01T00:00:00.000+03:00|
|01/2016

### PrintSchema

In [7]:
df.printSchema()

root
 |-- month: string (nullable = true)
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)
 |-- netto: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- direction_eng: string (nullable = true)
 |-- measure_eng: string (nullable = true)
 |-- load_date: string (nullable = true)



In [8]:
result = (
    df
    .withColumnRenamed("direction_eng", "direction")
    .withColumnRenamed("measure_eng", "measure")
)

result.columns

['month',
 'country',
 'code',
 'value',
 'netto',
 'quantity',
 'region',
 'district',
 'direction',
 'measure',
 'load_date']

### Select

In [9]:
result.select('country').distinct().show(10, truncate=False)

                                                                                

+-------+
|country|
+-------+
|LT     |
|MM     |
|DZ     |
|CI     |
|TC     |
|FI     |
|SC     |
|AZ     |
|PM     |
|UA     |
+-------+
only showing top 10 rows



### GroupBy

In [10]:
(
    result
    .groupBy('country')
    .agg(F.count('*').alias('total_rows'))
    .orderBy(F.col('total_rows').desc())
    .show()
)

                                                                                

+-------+----------+
|country|total_rows|
+-------+----------+
|     BY|   3509568|
|     KZ|   2519896|
|     CN|   2454792|
|     DE|   1542311|
|     UA|   1158498|
|     IT|   1102837|
|     US|    835936|
|     PL|    666690|
|     FR|    593040|
|     JP|    571756|
|     TR|    463432|
|     KR|    446907|
|     GB|    443091|
|     AM|    438705|
|     CZ|    407360|
|     KG|    403565|
|     ES|    401644|
|     IN|    374151|
|     NL|    365193|
|     UZ|    329707|
+-------+----------+
only showing top 20 rows



### Filter

In [None]:
# Два варианта написании фильтрации
df_de = (
    result
    .where(F.col('country') == 'DE')
    .where(F.col('value').isNotNull())
)

df_de2 = (
    result
    .where(''' country == "DE" ''')
    .where(''' value IS NOT NULL ''')
)

print(df_de.count() == df_de2.count())

### Save to CSV

In [None]:
df_de.show(2, truncate=False)

In [None]:
df_de.columns

In [None]:
final = (
    df_de
    .select(
        'month',
        'country',
        'code',
        'value',
        'netto',
        'quantity',
        'region',
        'district',
        'direction',
        'measure',
        F.col('load_date').cast('date'),
    )
)

final.show(2, truncate=False)

In [None]:
# Сохранение неконтроллируемое по кол-ву файлов
(
    final
    .write
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_no_control')
)

partition_num = final.rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

# Сохранение контроллируемое по кол-ву файлов - ОДИН ФАЙЛ
(
    final
    .coalesce(1)
    .write
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_one_file')
)

partition_num = final.coalesce(1).rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')


# Сохранения с партицированием
(
    final
    .write
    .partitionBy('load_date')
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_partitioned')
)

print_df = final.select('load_date').distinct()
print(f'Load_date distinct: {print_df.count()}')


# Сохранения с партицированием и repartition внутри самой партиции
(
    final
    .repartition(1, 'load_date')
    .write
    .partitionBy('load_date')
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_partitioned_repart')
)

partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

### Read Transformed

In [None]:
reader_no_control = (
    spark
    .read
    .csv('data/final_no_control/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_final_one_file = (
    spark
    .read
    .csv('data/final_one_file/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned = (
    spark
    .read
    .csv('data/final_partitioned', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned_repart = (
    spark
    .read
    .csv('data/final_partitioned_repart', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)


reader_no_control.count() # number of files read: 16 | size of files read: 88.4 MiB | 2.5 s (90 ms, 301 ms, 384 ms)

reader_final_one_file.count() # number of files read: 1 | size of files read: 88.4 MiB | 3.2 s (306 ms, 407 ms, 420 ms )

reader_partitioned.count() # number of files read: 16 | size of files read: 16.4 MiB | 305 ms (32 ms, 39 ms, 54 ms )

reader_partitioned_repart.count() # number of files read: 1 | size of files read: 16.4 MiB | 179 ms (9 ms, 43 ms, 44 ms )

### JOIN

In [None]:
data = [
    (14000, "Северный"),
    (11000, "Южный"),
    (10000, "Восточный"),
    (26000, "Западный"),
    (56000, "Центральный"),
]

region_df = spark.createDataFrame(data, schema='region_id long, name string')

region_df.show()


customs_data = (
    spark
    .read
    .csv('data/customs_data.csv', header=True, sep=';')
)

customs_data.show(2)

In [None]:
# Отключим автоматический Broadcast JOIN
import time
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# Замерим выполнение запроса без broadcast join

start_time = time.time()

customs_data.join(region_df, customs_data.region==region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for join operation: {end_time - start_time:.2f} seconds")

In [None]:
# Замерим выполнение запроса c broadcast join

start_time = time.time()

customs_data.join(F.broadcast(region_df), customs_data.region == region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for broadcast join operation: {end_time - start_time:.2f} seconds")

### Cache | Persist

In [None]:
customs_data.cache().count()

In [None]:
customs_data.unpersist()

In [None]:
from pyspark.storagelevel import StorageLevel

customs_data.persist(StorageLevel.DISK_ONLY).count()

### Repartition & Coalesce

In [None]:
data = [
    (1,'one'),
    (2,'two'),
    (3,'three'),
    (4,'four'),
    (5,'five'),
    (6,'six'),
    (7, 'seven'),
    (8, 'eight'),
    (9, 'nine'),
]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

In [None]:
# Намеренно перемешаем и поделим на 8 разделов
mix = df.repartition(8)
mix.rdd.glom().collect()

In [None]:
mix.repartition(3).rdd.glom().collect()

In [None]:
mix.coalesce(3).rdd.glom().collect()

In [None]:
mix.toPandas().head()

In [None]:
# OUT OF MEMORY

d = spark.read.csv('data/customs_data.csv', header=True, sep='\t')
d.collect()

In [None]:
spark.stop()