In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
import time
import os
import pandas as pd

# Датасет
## DAIGT V2 Train Dataset - датасет для обучения детекции сгенерированного текста.

In [9]:
pd.set_option('display.max_colwidth', 200)
df = pd.read_csv("data.csv")
df.head(3)

Unnamed: 0,text,label,prompt_name,source,RDizzl3_seven
0,Phones\n\nModern humans today are always on their phone. They are always on their phone more than 5 hours a day no stop .All they do is text back and forward and just have group Chats on social me...,0,Phones and driving,persuade_corpus,False
1,"This essay will explain if drivers should or should not be able to use electronic devices while operating a vehicle. Using a phone while driving can cause very bad wrecks, because they are putting...",0,Phones and driving,persuade_corpus,False
2,"Driving while the use of cellular devices\n\nToday, most of the society is thoughtless. Especially new drivers, all driver for that matter do not understand the dangers of looking at a cell phone ...",0,Phones and driving,persuade_corpus,False


### Функция, которая добавлет length случайных символов в text
```python
def random_insert(text, length):
    if(len(text) < length):
        length = len(text) - 10
    chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    result = list(text)
    positions = random.sample(range(len(text) + 1), length)
    positions.sort(reverse=True)
    for pos in positions:
        char = random.choice(chars)
        result.insert(pos, char)
    return "".join(result)
```
### Процесс создания dataXL.csv
```python
df = pd.read_csv("data.csv")
df_xl = df.copy()
for i in range(26):
    sample = df.sample(frac=1)
    df_xl = pd.concat([df_xl, sample], ignore_index=True)
df_xl.to_csv('dataXL.csv', index=False)
```
### Процесс создания dataXL_w_rand.csv
```python
df = pd.read_csv("data.csv")
df_xl = df.copy()
for i in range(26):
    sample = df.sample(frac=1)
    sample['text'] = sample['text'].apply(lambda x: random_insert(x, 64))
    df_xl = pd.concat([df_xl, sample], ignore_index=True)
df_xl.to_csv('dataXL_w_rand.csv', index=False)
```

In [2]:
def get_MiB(path):
    return round(os.path.getsize(path) / 2 ** 20, 2)
def get_dir_size(dir_path):
    files = os.listdir(dir_path)
    total_size = 0
    for file in files:
        file_path = os.path.join(dir_path, file)
        if os.path.isfile(file_path):
            file_size = os.path.getsize(file_path)
            total_size += file_size
    total_size_mb = round(total_size / 2**20, 2)
    return total_size_mb

# Работа с Apache Spark

In [None]:
# Создаем сессию
spark = SparkSession.builder.appName("Testing").master("local").config("spark.executor.memory", '2g').getOrCreate()
# .config("spark.executor.memory", '2g').config('spark.executor.cores', '6')
schema = StructType([
    StructField("text", StringType(), True),
    StructField("label", IntegerType(), True),
    StructField("prompt_name", StringType(), True),
    StructField("source", StringType(), True),
    StructField("RDizzl3_seven", BooleanType(), True)
])

# Читаем датасеты
dfXL = spark.read.csv("dataXL.csv", header=True, inferSchema=False, schema = schema)
dfXL_w_rand = spark.read.csv("dataXL_w_rand.csv", header=True, inferSchema=False, schema = schema)

# Сохраняем данные в ORC
time_orc_wr_start = time.time()
dfXL.write.orc("file//orc/dataXL", mode="overwrite", compression="snappy")
time_orc_wr = time.time() - time_orc_wr_start

time_orc_wr_rand_start = time.time()
dfXL_w_rand.write.orc("file//orc/dataXL_w_rand", mode="overwrite", compression="snappy")
time_orc_wr_rand = time.time() - time_orc_wr_rand_start

# Сохраняем данные в Parquet
time_parc_wr_start = time.time()
dfXL.write.parquet("file//parc/dataXL", mode="overwrite", compression="snappy")
time_parc_wr = time.time() - time_parc_wr_start

time_parc_wr_rand_start = time.time()
dfXL_w_rand.write.parquet("file//dataXL_w_rand", mode="overwrite", compression="snappy")
time_parc_wr_rand = time.time() - time_parc_wr_rand_start

# Загружаем данные в датафреймы
df_orc = spark.read.orc("file//orc/dataXL")
df_orc_rand = spark.read.orc("file//orc/dataXL_w_rand")
df_parquet = spark.read.parquet("file//parc/dataXL")
df_parquet_rand = spark.read.parquet("file//parc/dataXL_w_rand")

# Получаем размеры в МиБ   
size_csv = get_MiB("dataXL.csv")
size_csv_rand = get_MiB("dataXL_w_rand.csv")
size_orc = get_dir_size("file\orc\dataXL")
size_orc_rand = get_dir_size("file\orc\dataXL_w_rand")
size_par = get_dir_size("file\parc\dataXL")
size_par_rand = get_dir_size("file\parc\dataXL_w_rand")

# Измеряем скорость чтения

# Для csv
t_s = time.time()
dfXL.show()
t_e = time.time()
t_csv = t_e - t_s
speed_csv = dfXL.count() / t_csv

t_s = time.time()
dfXL_w_rand.show()
t_e = time.time()
t_csv_rand = t_e - t_s
speed_csv_rand = dfXL_w_rand.count() / t_csv_rand

# Для orc
t_s = time.time()
df_orc.show()
t_e = time.time()
t_orc = t_e - t_s
speed_orc = df_orc.count() / t_orc

t_s = time.time()
df_orc_rand.show()
t_e = time.time()
t_orc_rand = t_e - t_s
speed_orc_rand = df_orc_rand.count() / t_orc_rand

# Для parquet
t_s = time.time()
df_parquet.show()
t_e = time.time()
t_parc = t_e - t_s
speed_parc = df_parquet.count() / t_parc

t_s = time.time()
df_parquet_rand.show()
t_e = time.time()
t_parc_rand = t_e - t_s
speed_parc_rand = df_parquet_rand.count() / t_parc_rand

# Завершаем сессию
spark.stop()

# Результаты

In [4]:
results = pd.DataFrame(
    {
        'Формат' : ['CSV', 'CSV rand', 'ORC', 'ORC rand', 'Parquet', 'Parquet rand'],
        'Время записи, s' : ['-', '-' , round(time_orc_wr, 4),  round(time_orc_wr_rand, 4), round(time_parc_wr, 4), round(time_parc_wr_rand, 4)],
        'Занимаеиое место, МиБ' : [size_csv, size_csv_rand, size_orc, size_orc_rand, size_par, size_par_rand],
        'Время чтения, s': [round(t_csv, 4), round(t_csv_rand, 4), round(t_orc, 4), round(t_orc_rand, 4), round(t_parc, 4), round(t_parc_rand, 4)],
        'Скорость чтения, строк/s' : [round(speed_csv), round(speed_csv_rand), round(speed_orc), round(speed_orc_rand), round(speed_parc), round(speed_parc_rand)]
    }
)
results

Unnamed: 0,Формат,"Время записи, s","Занимаеиое место, МиБ","Время чтения, s","Скорость чтения, строк/s"
0,CSV,-,2625.21,0.8133,8473088
1,CSV rand,-,2696.41,0.0413,170640018
2,ORC,86.8813,658.8,0.2717,25362459
3,ORC rand,84.2769,969.52,0.0784,89960237
4,Parquet,76.8745,912.21,0.1655,41645160
5,Parquet rand,75.1775,1061.09,0.2102,33557619


## Вывод: при выполнении работы были получены следующие результаты:
* ### Преобразование в формат ORC занимает больше времени, чем в Parquet ~ на 10%
* ### На ORC значительно повлияло наличие повт. значений в датасете. Размер вырос в 1.5 раза, по сравнению с уникальными данными. Размер же Parquet не сильно изменился.
* ### Результаты по времени чтения неоднозначны. С одной стороны ORC оказался медленее Parquet при чтении датасета с потв. значениями, с другой результат противоположный, когда датасет получается уникальным. 