## Кеширование исходных данных в оперативную память

Витрины должны иметь приемлемое время отклика, спарк это медленый инструмент и не подходит для дашбордов. Спарк используется для многопоточной загрузки данных в оперативную память.

## Формирование исходных данных

Предварительно, необходимо сформировать исходный файл. Пример создания файла ./docs/examples/parquet/create_parquet.ipynb

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, LongType, StringType, IntegerType, DateType, TimestampType, FloatType, BooleanType
from pyspark.sql.functions import col, cast, date_trunc, sum, dayofweek, hour, dayofmonth, lit, sequence

import random

# запуск spark
spark = SparkSession.builder.appName('parquet').getOrCreate()


schemaData = StructType([ \
    StructField("label",StringType(),True),
    StructField("feature1",IntegerType(),True), 
    StructField("feature2",FloatType(),True), 
    StructField("feature3",BooleanType(),True), 
  ])

pathParquet = "/tmp/parquetFiles"

# формирование dataframe
values= []
df = spark.createDataFrame(values,schema=schemaData)
# запишем пустые данные, необходимо для очистки
df.write.mode("overwrite").parquet(pathParquet)

schemaIndex = StructType([ \
    StructField("indx1",LongType(),True),
    StructField("indx2",LongType(),True),
  ])

# увеличиваем набор данных
batchSize = 30
labelSize = 10
def writeParquet():
    indx = list((idx,idx) for idx in range(batchSize))
    df = spark.createDataFrame(indx, schemaIndex)
    def extract_features(row):
        return (
            'label'+str(random.randint(1, labelSize)), #label
            random.randint(1, N), #feature1
            random.random(), #feature2
            bool(random.getrandbits(1)) , #feature3
        )
    rdd = df.rdd.map(extract_features)
    df = rdd.toDF( ["label","feature1","feature2", "feature3"])
    # запись в parquet
    df.write.mode("append").parquet(pathParquet)


# делаем некоторое количество пачек
for i in range(5):
    writeParquet()

localPathParquet = "/tmp/singleparquet"
dfSingle = spark.read.parquet(pathParquet)
fileCount = 1 # число файлов
dfSingle.coalesce(1).write.mode("overwrite").parquet(localPathParquet)

spark.stop()

import os
import boto3

# имя файла в s3
filePathParquet = "/tmp/dataset.parquet"
# имя корзины в s3
bucketNameParquet = 'data'

s3_target = boto3.resource('s3', 
    endpoint_url=os.environ["AWS_ENDPOINT_URL"],
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
    aws_session_token=None,
    config=boto3.session.Config(signature_version='s3v4'),
    verify=False
)

# создание корзины
bucket = s3_target.Bucket(bucketNameParquet)
if (not bucket.creation_date):
   s3_target.create_bucket(Bucket=bucketNameParquet)

# загрузка файла на s3 
# в локальной папке находится единственный файл и закидывается на S3
fileNameParquetLocal = [x for x in os.listdir(localPathParquet) if x.endswith(".parquet")][0]
print('write to s3', 'backet='+bucketNameParquet, 'path='+filePathParquet)
s3_target.Bucket(bucketNameParquet).upload_file(localPathParquet+'/'+fileNameParquetLocal, filePathParquet)

## Кеширование данных

In [None]:
import os
import boto3
import pandas

bucket_name = 'data'

s3_target = boto3.resource('s3', 
    endpoint_url=os.environ["AWS_ENDPOINT_URL"],
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
    aws_session_token=None,
    config=boto3.session.Config(signature_version='s3v4'),
    verify=False
)

# чтение файла с s3 файла tmp/data.parquet в локальный файл 
localPath = '/tmp/data.parquet'
remotePath = 'tmp/dataset.parquet'
s3_target.Bucket(bucket_name).download_file(remotePath, localPath)



In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, LongType, StringType, IntegerType, DateType, TimestampType, FloatType
from pyspark.sql.functions import col, cast, date_trunc, sum, dayofweek, hour, dayofmonth, lit, monotonically_increasing_id

# запуск spark
spark = SparkSession.builder.appName('fill').getOrCreate()

In [None]:
dfInput = spark.read.parquet(localPath)
df = dfInput.select(monotonically_increasing_id().alias('id'.upper()), 
                    col('label').alias('label'.upper()), 
                    col('feature1').alias('feature1'.upper()), 
                    col('feature2').alias('feature2'.upper()), 
                    col('feature3').alias('feature3'.upper()) ) 
df.printSchema

In [None]:
# подключение к БД Tarantool
HOST=os.environ['TARANTOOL_HOST']
PORT=os.environ['TARANTOOL_PORT']
LOGIN=os.environ['TARANTOOL_LOGIN']
PASSWORD=os.environ['TARANTOOL_PASSWORD']

In [None]:
# для tarantol необходимо создавать таблицу
tableName = 'dataset'

import tarantool
print("подключение к tarantool")
connection = tarantool.connect(host = HOST, port=PORT, user=LOGIN, password=PASSWORD)
table = tableName.upper()
# текст запроса на создание таблицы, поля в верхнем регистре
sqlCreateTable = f"""
create table {table} (  
    ID int primary key, 
    LABEL string, 
    FEATURE1 int, 
    FEATURE2 number, 
    FEATURE3 boolean 
)"""    

print("проверяется таблица:" + table)
rst = connection.execute(f"""SELECT EXISTS (select true from "_space" where "name" = '{table}')""")
tableExist = rst[0][0]
print('таблица'+tableName.upper()+' существует '+str(tableExist))
if tableExist: 
    print("удаляется таблица:"+tableName)
    connection.execute(f"drop table {table}")
print("запрос создания: "+sqlCreateTable)
connection.execute(sqlCreateTable)
print("создана таблица: "+table)
#создание дополнительного индекса
createIndexSQL = f'''CREATE INDEX {table}_label ON {table} (label)'''
print(createIndexSQL)
connection.execute(createIndexSQL)
#отключение
connection.close()

In [None]:
df.show()

In [None]:
# заполняем таблицу
jdbcURLTarantool = f'jdbc:tarantool://{HOST}:{PORT}?user={LOGIN}&password={PASSWORD}'

df.write.format("jdbc").mode("append") \
              .option('url', jdbcURLTarantool) \
              .option('driver', 'org.tarantool.jdbc.SQLDriver') \
              .option("dbtable",tableName.upper()).save()

In [None]:
# спарк завершает работу
spark.stop()

## Работа с кешированными данными

In [None]:
# открытие соединения
connection = tarantool.connect(host = HOST, port=int(PORT), user=LOGIN, password=PASSWORD)

sql = f"""
SELECT ID, LABEL, FEATURE1, FEATURE2, FEATURE3 from {tableName.upper()}
"""
rst = connection.execute(sql)
df = pandas.DataFrame( rst, columns=['ID', 'LABEL', 'FEATURE1', 'FEATURE2', 'FEATURE3'])
display(df)
# обязательное закрытие соединения
connection.close()

In [None]:
# открытие соединения
connection = tarantool.connect(host = HOST, port=int(PORT), user=LOGIN, password=PASSWORD)

sql = f"""
SELECT distinct label 
from dataset
order by label
"""
rst = connection.execute(sql)
pdf = pandas.DataFrame( rst, columns=['label'])
display(pdf)


In [None]:
label = pdf['label'][0]
print('выбрана запись: '+ label)

In [None]:
# все сложности кода предварительного кеширования данных раскрываются на этом примере
# для выбора данных используется sql запрос, что упрощает работу с данными
# запрос по индексированному полу в памяти будет выполняться на порядки быстрее, пользователь получит почти мгновенный отклик
# управление памятью обработки перекладывается на сторону СУБД, тем самым увеличивается предсказуемость выделения ресурсов
# СУБД в большинстве случаев для обработки использует многоботочные алгоритмы в отличии от pandas
# в данном случае pandas будет обрабатывать ограниченную выборку данных
sql = f"""
SELECT  feature1, feature2
from dataset
where label = '{label}'
order by feature3
"""
rst = connection.execute(sql)
df = pandas.DataFrame( rst, columns=[ 'feature1', 'feature2'])
# обязательное закрытие соединения
connection.close()

display(df)

In [None]:
import matplotlib.pyplot as plt
df.boxplot(meanline=True, showmeans=True)