In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# specify substep parameters for interactive run
# this cell will be replaced during job run with the parameters from json within params subfolder
substep_params={}

In [3]:
# load pipeline and step parameters - do not edit
from sinara.substep import get_pipeline_params, get_step_params
pipeline_params = get_pipeline_params(pprint=True)
step_params = get_step_params(pprint=True)

**Pipeline params:**


{'X': 'something',
 'env_name': 'user',
 'pipeline_name': 'pipeline',
 'zone_name': 'zone'}




**Step params:**


{'Y': 'something_else'}




In [4]:
#3 define substep interface
from sinara.substep import NotebookSubstep, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params)

substep.interface(
   
    tmp_outputs =
    [
        { ENTITY_NAME: "test_data" },
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

**STEP NAME:**


'sinara_quick_test'




**TMP OUTPUTS:**


[{'tmp:user.pipeline.zone.sinara_quick_test.test_data': '/tmp/env/user/pipeline/zone/sinara_quick_test/run-24-08-12-072026/test_data'}]




In [5]:
from sinara.spark import SinaraSpark

spark = SinaraSpark.run_session(0)
SinaraSpark.ui_url()

Session is run


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/12 07:20:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 34698)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pyspark/serializers.py",

In [6]:
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
from pyspark.sql import functions as F

In [7]:
tmp_outputs = substep.tmp_outputs()

# Генерация данных

In [8]:
n_rows = 30000 #3000000
n_cols = 200

df_spark = spark.createDataFrame(np.random.sample((n_rows, 1)))
for i in tqdm(range(n_cols-1), total=n_cols-1):
    df_spark = df_spark.withColumn(f'value_{i}', F.col('value'))

df_spark \
    .write \
    .mode("overwrite") \
    .parquet(tmp_outputs.test_data)

  0%|          | 0/199 [00:00<?, ?it/s]

24/08/12 07:21:55 WARN BaseSessionStateBuilder$$anon$2: Max iterations (100) reached for batch LocalRelation early, please set 'spark.sql.optimizer.maxIterations' to a larger value.
24/08/12 07:22:05 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/08/12 07:22:06 WARN TaskSetManager: Stage 0 contains a task of very large size (4420 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

# Чтение данных через pandas

In [9]:
%%time

df_pandas_1 = pd.read_parquet(tmp_outputs.test_data)

CPU times: user 263 ms, sys: 556 ms, total: 819 ms
Wall time: 245 ms


In [10]:
df_pandas_1.shape

(30000, 200)

In [11]:
df_pandas_1.head()

Unnamed: 0,value,value_0,value_1,value_2,value_3,value_4,value_5,value_6,value_7,value_8,...,value_189,value_190,value_191,value_192,value_193,value_194,value_195,value_196,value_197,value_198
0,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,...,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726
1,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,...,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577
2,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,...,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396
3,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,...,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841
4,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,...,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637


# Чтение данных через spark и преобразование в pandas

In [12]:
%%time

df_pandas_2 = spark \
    .read \
    .parquet(tmp_outputs.test_data) \
    .toPandas()

[Stage 2:>                                                        (0 + 11) / 11]

CPU times: user 85.2 ms, sys: 322 ms, total: 407 ms
Wall time: 2.83 s


                                                                                

In [13]:
df_pandas_2.head()

Unnamed: 0,value,value_0,value_1,value_2,value_3,value_4,value_5,value_6,value_7,value_8,...,value_189,value_190,value_191,value_192,value_193,value_194,value_195,value_196,value_197,value_198
0,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,...,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832,0.093832
1,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,...,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171,0.382171
2,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,...,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135,0.357135
3,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,...,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755,0.312755
4,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,...,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973,0.687973


In [14]:
df_pandas_2.shape

(30000, 200)

In [15]:
%%time

df_pandas_1.equals(df_pandas_2)

CPU times: user 769 µs, sys: 30.3 ms, total: 31.1 ms
Wall time: 31.4 ms


False

In [16]:
(df_pandas_1.dtypes == df_pandas_2.dtypes).mean()

1.0

In [17]:
(df_pandas_1.values == df_pandas_2.values).mean()

0.0

In [18]:
df_pandas_3 = df_pandas_2 \
    .set_index('value').loc[df_pandas_1['value'].values] \
    .reset_index()

In [19]:
df_pandas_3.head()

Unnamed: 0,value,value_0,value_1,value_2,value_3,value_4,value_5,value_6,value_7,value_8,...,value_189,value_190,value_191,value_192,value_193,value_194,value_195,value_196,value_197,value_198
0,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,...,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726,0.6726
1,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,...,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577,0.30577
2,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,...,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396,0.140396
3,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,...,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841,0.417841
4,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,...,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637,0.766637


In [20]:
%%time

df_pandas_1.equals(df_pandas_3)

CPU times: user 20.3 ms, sys: 653 µs, total: 20.9 ms
Wall time: 19.8 ms


True

# Чтение данных через spark -> pandas_api -> pandas

In [21]:
%%time

df_pandas = spark \
    .read \
    .parquet(tmp_outputs.test_data) \
    .pandas_api() \
    .to_pandas()

                                                                                

CPU times: user 2.09 s, sys: 551 ms, total: 2.64 s
Wall time: 3.35 s


# Сохранение датафрейма pandas при помощи spark

In [22]:
%%time

# выполняется очень долго без arrow оптимизации
spark.createDataFrame(df_pandas) \
    .write \
    .mode("overwrite") \
    .parquet(tmp_outputs.test_data)

24/08/12 07:22:17 WARN TaskSetManager: Stage 6 contains a task of very large size (4420 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

CPU times: user 105 ms, sys: 79.3 ms, total: 185 ms
Wall time: 2.16 s


# Уберем arrow оптимизацию

## Чтение

In [23]:
spark.version

'3.4.0'

In [24]:
spark.sparkContext.getConf().getAll()

[('spark.app.name', 'SinaraML Spark App'),
 ('spark.app.id', 'local-1723447230087'),
 ('spark.executor.id', 'driver'),
 ('spark.master', 'local[11]'),
 ('spark.driver.maxResultSize', '1g'),
 ('spark.app.submitTime', '1723447228860'),
 ('spark.driver.host', '9316bc880a1a'),
 ('spark.app.startTime', '1723447228972'),
 ('spark.driver.memory', '3g'),
 ('spark.sql.warehouse.dir',
  'file:/home/sinarian/work/pipeline-sinara_quick_test/spark-warehouse'),
 ('spark.executor.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --

In [25]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'false')

In [26]:
%%time

df_pandas_4 = spark \
    .read \
    .parquet(tmp_outputs.test_data) \
    .toPandas()

CPU times: user 1.9 s, sys: 228 ms, total: 2.12 s
Wall time: 3 s


## Сохранение

In [27]:
%%time

spark.createDataFrame(df_pandas) \
    .write \
    .mode("overwrite") \
    .parquet(tmp_outputs.test_data)

24/08/12 07:23:04 WARN TaskSetManager: Stage 9 contains a task of very large size (3613 KiB). The maximum recommended task size is 1000 KiB.

CPU times: user 41.6 s, sys: 94.4 ms, total: 41.7 s
Wall time: 45 s


                                                                                

# Эксперименты с разными типами

In [28]:
spark

In [29]:
import os
import json
from pyspark.sql.types import StructType, StructField, StringType, StringType, DataType, ArrayType, LongType, DoubleType, TimestampType
from pyspark.sql import functions as F

## Генерируем данные как в MES Honeywell

In [30]:
path_row_data = 'row_data'

In [31]:
def data_generator(ts_start, ts_end, freq, n_tags, n_devices, path):
    os.makedirs(path, exist_ok=True)
    ts = pd.date_range(ts_start, ts_end, freq=freq)
    ts = ts.astype(str).tolist()
    
    tags = [f'tag_{i}_device_{j}' for i in range(n_devices) for j in range(n_tags)]
    for tag in tqdm(tags):
        data = [{
            'tag': tag,
            'vl': list(np.random.sample(len(ts))),
            'ts': ts,
        }]
        data = json.dumps(data)
    
        with open(f'{path}/{tag}.json', 'w') as f:
            f.write(data)

In [32]:
data_generator(
    ts_start='2023-01-01 00:00:00', 
    ts_end='2024-01-01 00:00:00', 
    freq='15s', 
    n_tags=4, 
    n_devices=2, 
    path=path_row_data,
)

  0%|          | 0/8 [00:00<?, ?it/s]

In [33]:
schema = StructType([
    StructField('tag', StringType()), 
    StructField('ts', ArrayType(TimestampType())), 
    StructField('vl', ArrayType(DoubleType())), 
])

In [34]:
df_test_spark = spark.read.json(path_row_data, schema=schema, multiLine=True)

In [35]:
df_test_spark.show()

                                                                                

+--------------+--------------------+--------------------+
|           tag|                  ts|                  vl|
+--------------+--------------------+--------------------+
|tag_1_device_1|[2023-01-01 00:00...|[0.94215778586690...|
|tag_0_device_2|[2023-01-01 00:00...|[0.89420245064995...|
|tag_1_device_0|[2023-01-01 00:00...|[0.22069186908002...|
|tag_1_device_3|[2023-01-01 00:00...|[0.17303179917034...|
|tag_0_device_1|[2023-01-01 00:00...|[4.12911518605008...|
|tag_1_device_2|[2023-01-01 00:00...|[0.48173084303695...|
|tag_0_device_3|[2023-01-01 00:00...|[0.65951874847680...|
|tag_0_device_0|[2023-01-01 00:00...|[0.07290907681984...|
+--------------+--------------------+--------------------+



In [36]:
vl = F.col('tmp.vl').alias('vl')
ts = F.col('tmp.ts').alias('ts')

df_test_spark_2 = df_test_spark \
    .withColumn('tmp', F.arrays_zip('ts', 'vl')) \
    .withColumn('tmp', F.explode('tmp')) \
    .select('tag', vl, ts)

In [37]:
df_test_spark_2.show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------------+-------------------+-------------------+
|           tag|                 vl|                 ts|
+--------------+-------------------+-------------------+
|tag_1_device_1| 0.9421577858669075|2023-01-01 00:00:00|
|tag_1_device_1|  0.840270745087203|2023-01-01 00:00:15|
|tag_1_device_1|0.31965728693470064|2023-01-01 00:00:30|
|tag_1_device_1| 0.4137678799237364|2023-01-01 00:00:45|
|tag_1_device_1| 0.9927946192378685|2023-01-01 00:01:00|
|tag_1_device_1| 0.7339230699200996|2023-01-01 00:01:15|
|tag_1_device_1| 0.5846028291030303|2023-01-01 00:01:30|
|tag_1_device_1| 0.8306693181577471|2023-01-01 00:01:45|
|tag_1_device_1| 0.5909516805792417|2023-01-01 00:02:00|
|tag_1_device_1| 0.7403757799634568|2023-01-01 00:02:15|
|tag_1_device_1| 0.1925497310544838|2023-01-01 00:02:30|
|tag_1_device_1|  0.319616781456898|2023-01-01 00:02:45|
|tag_1_device_1|  0.204129309887762|2023-01-01 00:03:00|
|tag_1_device_1| 0.5600396858431801|2023-01-01 00:03:15|
|tag_1_device_1|  0.85847028380

                                                                                

## Преобразование в pandas

In [38]:
%%time

df_test_pandas = df_test_spark_2.toPandas()

Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.status.AppStatusStore.activeStages(AppStatusStore.scala:169)
	at org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:64)
	at org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:52)
	at java.base/java.util.TimerThread.mainLoop(Timer.java:556)
	at java.base/java.util.TimerThread.run(Timer.java:506)
24/08/12 07:25:07 ERROR Executor: Exception in task 7.0 in stage 14.0 (TID 95)
java.lang.OutOfMemoryError: Java heap space
24/08/12 07:25:07 ERROR Executor: Exception in task 5.0 in stage 14.0 (TID 93)
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:492)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source)
	at org.apache.

ConnectionRefusedError: [Errno 111] Connection refused

In [39]:
df_test_pandas.head()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-src/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-src/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-src/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


NameError: name 'df_test_pandas' is not defined

In [None]:
df_test_pandas.shape

In [None]:
df_test_pandas.dtypes

## Преобразование в spark

In [None]:
%%time

df_spark_ts = spark.createDataFrame(df_test_pandas)

In [None]:
df_spark_ts.show()

In [None]:
df_spark_ts.printSchema()

типы данных `string`, `double`, `timestamp` корректно распознаны

In [None]:
SinaraSpark.stop_session()

# Итого

Конвертация датафрейма из pandas в spark и обратно занимает значительное время. Это нужно перед подачей данных в модель.  
**Проблема заметна только при работе с большими датафреймами**

Проблему можно решить двумя способами:
- делать промежуточное сохранение на диск в parquet и считывать уже с диска при помощи pandas или spark.
- задать параметр `spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')`

Для датафрейма 3 000 000 х 200

|Действие|Время, секунды|
|-|--------|
|Чтение parquet файлов через pandas|1|
|Чтение parquet файлов через spark и преобразование в pandas|3|
|Чтение parquet файлов через spark, преобразование pandas_api, преобразование в pandas|180|
|Сохнанение pandas датафрейма при помощи spark|$\infty$|
|Чтение parquet файлов через spark и преобразование в pandas (arrow=True)|12|
|Сохнанение pandas датафрейма при помощи spark (arrow=True)|35|