### Start SparkSession

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession \
            .builder \
            .appName("halltape_pyspark_local")\
            .config("spark.ui.showConsoleProgress", "true") \
            .getOrCreate()

print("Активные Spark сессии:", spark.sparkContext.uiWebUrl)

Активные Spark сессии: http://WIN-0SEIC1SUITU:4040


### Read

In [2]:
PATH = 'data/customs_data.csv'

In [3]:
spark.read.csv(PATH).show()

+--------------------+
|                 _c0|
+--------------------+
|month;country;cod...|
|01/2016;IT;620469...|
|01/2016;CN;900190...|
|01/2016;BY;841430...|
|01/2016;US;901850...|
|01/2016;EE;902110...|
|01/2016;FR;381600...|
|01/2016;MX;852351...|
|01/2016;JP;620452...|
|01/2016;KR;611020...|
|01/2016;KG;852713...|
|01/2016;ZA;842123...|
|01/2016;CN;851810...|
|01/2016;TR;841790...|
|01/2016;IT;390610...|
|01/2016;CZ;870840...|
|01/2016;ES;640419...|
|01/2016;IT;940490...|
|01/2016;UA;820780...|
|01/2016;CN;330410...|
+--------------------+
only showing top 20 rows



In [4]:
spark.read.csv(PATH, sep=';', header=True).show(5)

+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|  month|country|      code| value|netto|quantity|region|district|direction_eng|measure_eng|           load_date|
+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|01/2016|     IT|6204695000|   131|    1|       7| 46000|      01|           IM|        ShT|2024-07-01T00:00:...|
|01/2016|     CN|9001900009|112750|   18|       0| 46000|      01|           IM|          1|2024-01-01T00:00:...|
|01/2016|     BY|8414302004|   392|   57|       8| 50000|      06|           IM|        ShT|2024-06-01T00:00:...|
|01/2016|     US|9018509000| 54349|  179|       0| 40000|      02|           IM|          1|2024-04-01T00:00:...|
|01/2016|     EE|9021101000| 17304|  372|       0| 46000|      01|           IM|          1|2024-02-01T00:00:...|
+-------+-------+----------+------+-----+--------+------+--------+-------------+--------

In [5]:
df = spark.read.csv(PATH, sep=';', header=True)

df.show(truncate=False)

df.show(2, False, True)

+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|month  |country|code      |value |netto |quantity|region|district|direction_eng|measure_eng|load_date                    |
+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|01/2016|IT     |6204695000|131   |1     |7       |46000 |01      |IM           |ShT        |2024-07-01T00:00:00.000+03:00|
|01/2016|CN     |9001900009|112750|18    |0       |46000 |01      |IM           |1          |2024-01-01T00:00:00.000+03:00|
|01/2016|BY     |8414302004|392   |57    |8       |50000 |06      |IM           |ShT        |2024-06-01T00:00:00.000+03:00|
|01/2016|US     |9018509000|54349 |179   |0       |40000 |02      |IM           |1          |2024-04-01T00:00:00.000+03:00|
|01/2016|EE     |9021101000|17304 |372   |0       |46000 |01      |IM           |1          |2024-02-01T00:00:00.000+03:00|
|01/2016

### PrintSchema

In [6]:
df.printSchema()

root
 |-- month: string (nullable = true)
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)
 |-- netto: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- direction_eng: string (nullable = true)
 |-- measure_eng: string (nullable = true)
 |-- load_date: string (nullable = true)



In [7]:
result = df\
            .withColumnRenamed("direction_eng", "direction")\
            .withColumnRenamed("measure_eng", "measure")

result.columns

['month',
 'country',
 'code',
 'value',
 'netto',
 'quantity',
 'region',
 'district',
 'direction',
 'measure',
 'load_date']

### Select

In [8]:
result.select('country').distinct().show(10, truncate=False)

+-------+
|country|
+-------+
|LT     |
|MM     |
|DZ     |
|CI     |
|TC     |
|FI     |
|SC     |
|AZ     |
|UA     |
|RO     |
+-------+
only showing top 10 rows



### GroupBy

In [9]:
result\
    .groupBy('country')\
    .agg(F.count('*').alias('total_rows'))\
    .orderBy(F.col('total_rows').desc())\
    .show()

+-------+----------+
|country|total_rows|
+-------+----------+
|     BY|   3509568|
|     KZ|   2519896|
|     CN|   2454792|
|     DE|   1542311|
|     UA|   1158498|
|     IT|   1102837|
|     US|    835936|
|     PL|    666690|
|     FR|    593040|
|     JP|    571756|
|     TR|    463432|
|     KR|    446907|
|     GB|    443091|
|     AM|    438705|
|     CZ|    407360|
|     KG|    403565|
|     ES|    401644|
|     IN|    374151|
|     NL|    365193|
|     UZ|    329707|
+-------+----------+
only showing top 20 rows



### Filter

In [10]:
# Два варианта написании фильтрации
df_de = result\
            .where(F.col('country') == 'DE')\
            .where(F.col('value').isNotNull())

df_de2 = result\
            .where(''' country == "DE" ''')\
            .where(''' value IS NOT NULL ''')

print(df_de.count() == df_de2.count())

True


### Save to CSV

In [11]:
df_de.show(2, truncate=False)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
|month  |country|code      |value|netto|quantity|region|district|direction|measure|load_date                    |
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
|01/2016|DE     |4016995709|5901 |172  |0       |46000 |01      |IM       |1      |2024-01-01T00:00:00.000+03:00|
|01/2016|DE     |8708809109|1213 |94   |0       |45000 |01      |IM       |1      |2024-01-01T00:00:00.000+03:00|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+-----------------------------+
only showing top 2 rows



In [12]:
df_de.columns

['month',
 'country',
 'code',
 'value',
 'netto',
 'quantity',
 'region',
 'district',
 'direction',
 'measure',
 'load_date']

In [13]:
final = df_de\
            .select('month',
                    'country',
                    'code',
                    'value',
                    'netto',
                    'quantity',
                    'region',
                    'district',
                    'direction',
                    'measure',
                    F.col('load_date').cast('date'))

final.show(2, truncate=False)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|month  |country|code      |value|netto|quantity|region|district|direction|measure|load_date |
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|01/2016|DE     |4016995709|5901 |172  |0       |46000 |01      |IM       |1      |2024-01-01|
|01/2016|DE     |8708809109|1213 |94   |0       |45000 |01      |IM       |1      |2024-01-01|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
only showing top 2 rows



In [14]:
final.printSchema()

root
 |-- month: string (nullable = true)
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- value: string (nullable = true)
 |-- netto: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- region: string (nullable = true)
 |-- district: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- load_date: date (nullable = true)



In [15]:
final.show(4)

+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|  month|country|      code|value|netto|quantity|region|district|direction|measure| load_date|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
|01/2016|     DE|4016995709| 5901|  172|       0| 46000|      01|       IM|      1|2024-01-01|
|01/2016|     DE|8708809109| 1213|   94|       0| 45000|      01|       IM|      1|2024-01-01|
|01/2016|     DE|7013419000| 7020| 1611|     492| 45000|      01|       IM|    ShT|2024-02-01|
|01/2016|     DE|3923309090|46294| 8048|       0| 45000|      01|       IM|      1|2024-04-01|
+-------+-------+----------+-----+-----+--------+------+--------+---------+-------+----------+
only showing top 4 rows



In [16]:
# Сохранение неконтроллируемое по кол-ву файлов
final\
    .write\
    .format('csv')\
    .mode('overwrite')\
    .options(header='True', sep=';')\
    .csv('data/final_no_control')

partition_num = final.rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')


Кол-во партиций 20


In [17]:
# Сохранение контроллируемое по кол-ву файлов - ОДИН ФАЙЛ
final\
    .coalesce(5)\
    .write\
    .format('csv')\
    .mode('overwrite')\
    .options(header='True', sep=';')\
    .csv('data/final_one_file_5files') 
#аналогично с coalesce можно использовать repartition. Разница в том, что repartition раскидает информацию равномерно, чтобы файлы были одинакового размера
#т.е. он дополнительно перетасовывает файлы, что вычислительные ресурсы
partition_num = final.coalesce(5).rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

Кол-во партиций 5


In [18]:
final.select('load_date').distinct().show()

+----------+
| load_date|
+----------+
|2024-08-01|
|2024-09-01|
|2024-10-01|
|2024-04-01|
|2024-05-01|
|2024-07-01|
|2024-02-01|
|2024-03-01|
|2024-01-01|
|2024-06-01|
+----------+



In [19]:
# # Сохранения с партицированием. По факту делаем словарь по полюю. Например по дате. 
final\
    .write\
    .partitionBy('load_date')\
    .format('csv')\
    .mode('overwrite')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned_date')

print_df = final.select('load_date').distinct()
print(f'Load_date distinct: {print_df.count()}')


Load_date distinct: 10


In [20]:
# # Сохранения с партицированием и repartition внутри самой партиции. Кстати, колонка с датой удалится.
final\
    .repartition(1, 'load_date')\
    .write\
    .partitionBy('load_date')\
    .format('csv')\
    .mode('overwrite')\
    .options(header='True', sep=';')\
    .csv('data/final_partitioned_repart')

partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
print(f'Кол-во партиций {partition_num}')

Кол-во партиций 1


### Read Transformed

In [21]:
reader_no_control = spark\
                        .read\
                        .csv('data/final_no_control/', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')
reader_no_control.count() # number of files read: 16 | size of files read: 88.4 MiB | 2.5 s (90 ms, 301 ms, 384 ms)

350998

In [22]:
reader_final_one_file = spark\
                            .read\
                            .csv('data/final_one_file_5files/', header=True, sep=';')\
                            .where(''' load_date = "2024-01-01" ''')
reader_final_one_file.count() # number of files read: 5 | size of files read: 88.4 MiB | 3.2 s (306 ms, 407 ms, 420 ms )

350998

In [23]:
reader_partitioned = spark\
                        .read\
                        .csv('data/final_partitioned_date', header=True, sep=';')\
                        .where(''' load_date = "2024-01-01" ''')
reader_partitioned.count() # number of files read: 16 | size of files read: 16.4 MiB | 305 ms (32 ms, 39 ms, 54 ms )

350998

In [24]:
reader_partitioned_repart = spark\
                                .read\
                                .csv('data/final_partitioned_repart', header=True, sep=';')\
                                .where(''' load_date = "2024-01-01" ''')

reader_partitioned_repart.count() # number of files read: 1 | size of files read: 16.4 MiB | 179 ms (9 ms, 43 ms, 44 ms )

350998

### JOIN

In [25]:
from pyspark.sql.types import StructType, StructField, LongType, StringType

data = [
    (14000, "Северный"),
    (11000, "Южный"),
    (10000, "Восточный"),
    (26000, "Западный"),
    (56000, "Центральный")
]

my_schema = StructType([
    StructField("region_id", LongType(), True),
    StructField("name", StringType(), True)
])

# region_df = spark.createDataFrame(data, schema=my_schema)
region_df = spark.createDataFrame(data, schema='region_id long, name string')

region_df.show()


customs_data = spark\
                .read\
                .csv('data/customs_data.csv', header=True, sep=';')

customs_data.show(2)

+---------+-----------+
|region_id|       name|
+---------+-----------+
|    14000|   Северный|
|    11000|      Южный|
|    10000|  Восточный|
|    26000|   Западный|
|    56000|Центральный|
+---------+-----------+

+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|  month|country|      code| value|netto|quantity|region|district|direction_eng|measure_eng|           load_date|
+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
|01/2016|     IT|6204695000|   131|    1|       7| 46000|      01|           IM|        ShT|2024-07-01T00:00:...|
|01/2016|     CN|9001900009|112750|   18|       0| 46000|      01|           IM|          1|2024-01-01T00:00:...|
+-------+-------+----------+------+-----+--------+------+--------+-------------+-----------+--------------------+
only showing top 2 rows



In [26]:
# Отключим автоматический Broadcast JOIN (таблицы джоинятся в каждом экзекъюторе отдельно. При этом в данном случае малая таблица скопируется на все экзекъюторы)
import time
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [27]:
# Замерим выполнение запроса без broadcast join

start_time = time.time()

customs_data.join(region_df, customs_data.region==region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for join operation: {end_time - start_time:.2f} seconds")

Elapsed time for join operation: 15.49 seconds


In [28]:
# Замерим выполнение запроса c broadcast join

start_time = time.time()

customs_data.join(F.broadcast(region_df), customs_data.region == region_df.region_id, "left").count()

end_time = time.time()

print(f"Elapsed time for broadcast join operation: {end_time - start_time:.2f} seconds")

Elapsed time for broadcast join operation: 11.24 seconds


### Cache | Persist

In [29]:
customs_data.cache().count() #если count не вызвать, то ничего не произойдет, поскольку spark использует lazy-концепцию

26392290

In [30]:
customs_data.unpersist() #раскэширование

DataFrame[month: string, country: string, code: string, value: string, netto: string, quantity: string, region: string, district: string, direction_eng: string, measure_eng: string, load_date: string]

In [31]:
from pyspark.storagelevel import StorageLevel

customs_data.persist(StorageLevel.DISK_ONLY).count() #тоже кэшерование. Отличие в том, что есть дополнительная настройка. Например мы можем сказать, куда кэшировать

26392290

### Repartition & Coalesce

In [46]:
data = [(1,'one'), (2,'two'), (3,'three'), (4,'four'),
        (5,'five'), (6,'six'), (7, 'seven'), (8, 'eight'),
        (9, 'nine')]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

+---+------+
| id|number|
+---+------+
|  1|   one|
|  2|   two|
|  3| three|
|  4|  four|
|  5|  five|
|  6|   six|
|  7| seven|
|  8| eight|
|  9|  nine|
+---+------+



In [47]:
df.rdd.getNumPartitions()

20

In [48]:
df.rdd.glom().collect()

[[],
 [],
 [Row(id=1, number='one')],
 [],
 [Row(id=2, number='two')],
 [],
 [Row(id=3, number='three')],
 [],
 [Row(id=4, number='four')],
 [],
 [],
 [Row(id=5, number='five')],
 [],
 [Row(id=6, number='six')],
 [],
 [Row(id=7, number='seven')],
 [],
 [Row(id=8, number='eight')],
 [],
 [Row(id=9, number='nine')]]

In [49]:
# Намеренно перемешаем и поделим на 8 разделов
mix = df.repartition(8) #можно указать как больше, так и меньше, чем было. А вот coalesce может только уменьшать
mix.rdd.glom().collect() #на драйвере(т.е. на этом компьютере) с большими объёмами инфы так делать не надо

[[Row(id=1, number='one')],
 [Row(id=8, number='eight')],
 [],
 [],
 [],
 [Row(id=2, number='two'),
  Row(id=4, number='four'),
  Row(id=6, number='six'),
  Row(id=7, number='seven')],
 [Row(id=5, number='five'), Row(id=9, number='nine')],
 [Row(id=3, number='three')]]

In [50]:
mix.repartition(3).rdd.glom().collect()

[[Row(id=3, number='three'),
  Row(id=4, number='four'),
  Row(id=5, number='five'),
  Row(id=6, number='six'),
  Row(id=8, number='eight')],
 [Row(id=9, number='nine')],
 [Row(id=1, number='one'), Row(id=2, number='two'), Row(id=7, number='seven')]]

In [51]:
mix.coalesce(3).rdd.glom().collect()

[[Row(id=1, number='one')],
 [Row(id=8, number='eight'),
  Row(id=5, number='five'),
  Row(id=9, number='nine')],
 [Row(id=2, number='two'),
  Row(id=4, number='four'),
  Row(id=6, number='six'),
  Row(id=7, number='seven'),
  Row(id=3, number='three')]]

In [53]:
mix.toPandas().head() #но с большими даннами не надо, будет out of memory

Unnamed: 0,id,number
0,1,one
1,8,eight
2,2,two
3,4,four
4,6,six


In [2]:
# OUT OF MEMORY

d = spark.read.csv('data/customs_data.csv', header=True, sep='\t')
d.collect()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\ladez\AppData\Local\Programs\Python\Python39\lib\site-packages\pyspark\errors\exceptions\captured.py", line 179, in deco
    return f(*a, **kw)
  File "C:\Users\ladez\AppData\Local\Programs\Python\Python39\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\ladez\AppData\Local\Programs\Python\Python39\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\ladez\AppData\Local\Programs\Python\Python39\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] Удаленный хост принудительно разорвал существующее подключение

During handling of the ab

ConnectionRefusedError: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение

In [3]:
spark.stop()

ConnectionRefusedError: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение