# Minio 라이브러리를 이용한 Bucket 확인 및 생성

In [28]:
from minio import Minio
from glob import glob
import os

BUCKET_NAME = "savepaint-bucket"

client = Minio(
    "localhost:9000",
    access_key="admin", secret_key="changeme", secure=False
)

client.bucket_exists(BUCKET_NAME)

True

In [29]:
if not client.bucket_exists(BUCKET_NAME):
    client.make_bucket(BUCKET_NAME)

client.bucket_exists(BUCKET_NAME)

True

In [30]:
buckets = client.list_buckets()

for bucket in buckets:
    print(bucket.name)

savepaint-bucket


---
# Spark Session 생성 (Minio <-> Delta Lake)

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

minio_access_key = "admin"
minio_secret_key = "changeme"

# spark session 생성시 aws와 연동하기
def s3_connect_spark(minio_access_key, minio_secret_key):
    # 설정
    conf = (
        SparkConf()
        .setAppName("MY_APP") # replace with your desired name
        .set("spark.hadoop.fs.s3a.access.key", minio_access_key)
        .set("spark.hadoop.fs.s3a.secret.key", minio_secret_key)
        .set("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
        .set("spark.jars.packages", "io.delta:delta-core_2.12:2.1.1,org.apache.hadoop:hadoop-aws:3.3.1")
        .set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
        .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") # Deltalake로 Apache Spark 설정
        .set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") # Deltalake로 Apache Spark 설정
        .set("spark.databricks.delta.properties.defaults.columnMapping.mode","name") # header 공백 및 특수문자 인식
        .set('spark.sql.parquet.columnarReaderBatchSize',100) # ?
        .set("spark.executor.memory", "4g") # 각 Spark worker의 memory 크기
        .set("spark.memory.offHeap.enabled", False)
        # .set("spark.driver.memory", "50g") # Spark Driver의 크기
    )

    # spark 생성
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    return spark

In [2]:
spark = s3_connect_spark(minio_access_key, minio_secret_key)

your 131072x1 screen size is bogus. expect trouble


24/03/05 16:15:12 WARN Utils: Your hostname, DESKTOP-JJQA3IT resolves to a loopback address: 127.0.1.1; using 172.25.190.30 instead (on interface eth0)
24/03/05 16:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/donghee/work/deltalake1/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/donghee/.ivy2/cache
The jars for the packages stored in: /home/donghee/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a3600823-19c0-4bce-b04b-54e53da54b50;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.1 in central
	found io.delta#delta-storage;2.1.1 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 254ms :: artifacts dl 9ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	io.delta#delta-core_2.12;2.1.1 from central in [default]
	io.delta#delta-storage;2.1.1 from central in [default]
	org.antlr#antl

24/03/05 16:15:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

24/03/05 20:32:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 6998378 ms exceeds timeout 120000 ms
24/03/05 20:32:00 WARN SparkContext: Killing executors is not supported by current scheduler.
24/03/05 20:32:03 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.

---
# CSV Convert to pyspark.sql.dataframe

In [4]:
# local에서 spark로 csv 읽기
local_data_path = "/home/donghee/work/deltalakeproject/data/complaints.csv"

df = spark.read.format("csv").option("header",True).load(local_data_path)

df.show()
type(df)
df.count()

+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------------+--------------------+--------------------+----------------------------+--------------------+--------------------+------------+
|       Date received|             Product|         Sub-product|               Issue|           Sub-issue|Consumer complaint narrative|Company public response|             Company|               State|            ZIP code|                Tags|Consumer consent provided?|       Submitted via|Date sent to company|Company response to consumer|    Timely response?|  Consumer disputed?|Complaint ID|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+-----

                                                                                

6762146

---
# Save Delta Table to Minio

In [26]:
# minio에 csv -> delta table로 넣기
save_path = "s3a://savepaint-bucket/complaints-table"

df.write.format("delta").save(save_path)

24/03/04 14:20:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/03/04 14:20:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/04 14:20:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/03/04 14:20:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/03/04 14:20:31 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers


[Stage 64:>                                                       (0 + 12) / 23]

24/03/04 14:20:50 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers


[Stage 64:==>                                                     (1 + 12) / 23]

24/03/04 14:20:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/03/04 14:20:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers


[Stage 64:====>                                                   (2 + 12) / 23]

24/03/04 14:20:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/03/04 14:20:57 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers




24/03/04 14:20:58 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers




24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers




24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/04 14:20:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




24/03/04 14:21:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/03/04 14:21:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/04 14:21:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/03/04 14:21:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/03/04 14:21:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers




24/03/04 14:21:17 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/04 14:21:17 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

---

# Minio에서 다시 Delta Table 불러오기

In [5]:
from delta.tables import *
from pyspark.sql.functions import *

s3table_path = "s3a://savepaint-bucket/complaints-table"

# delta Table load
raw_deltaTable = DeltaTable.forPath(spark, s3table_path)
print(raw_deltaTable)

# convert to DataFrame
test = raw_deltaTable.toDF()
print(type(test))

<delta.tables.DeltaTable object at 0x7f59a1f3dbb0>
<class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
test.show()

[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+--------------------+
|       Date received|             Product|         Sub-product|               Issue|           Sub-issue|Consumer complaint narrative|Company public response|             Company|               State|            ZIP code|                Tags|Consumer consent provided?|Submitted via|Date sent to company|Company response to consumer|Timely response?|Consumer disputed?|        Complaint ID|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+---------------

                                                                                

---

## Version 별 불러오기

In [7]:
# verison 0 -> Raw Data
df0 = spark.read.format("delta").option("versionAsOf", 0).load(s3table_path)
df0.show()
df0.count()

+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+--------------------+
|       Date received|             Product|         Sub-product|               Issue|           Sub-issue|Consumer complaint narrative|Company public response|             Company|               State|            ZIP code|                Tags|Consumer consent provided?|Submitted via|Date sent to company|Company response to consumer|Timely response?|Consumer disputed?|        Complaint ID|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+---------------

6762146

In [8]:
# version 1 -> na drop version write
# version_1 = test.na.drop()
df1 = spark.read.format("delta").option("versionAsOf", 1).load(s3table_path)
df1.show()
df1.count()

                                                                                

+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+--------------------+
|       Date received|             Product|         Sub-product|               Issue|           Sub-issue|Consumer complaint narrative|Company public response|             Company|               State|            ZIP code|                Tags|Consumer consent provided?|Submitted via|Date sent to company|Company response to consumer|Timely response?|Consumer disputed?|        Complaint ID|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+--------------------+---------------

6762146

---

# Compare optimize Query Time : Compaction

#### Optimize : Not Compaction

In [9]:
(
    spark.read.format("delta")
    .option("versionAsOf","0")
    .load(s3table_path)
    .createOrReplaceTempView("x0")
)

In [10]:
spark.sql(
    "select * from x0 where Issue = 'Closing an account'"
).collect()

                                                                                

[Row(Date received='2023-03-04', Product='Checking or savings account', Sub-product='Checking account', Issue='Closing an account', Sub-issue='Company closed your account', Consumer complaint narrative='My accounts were improperly closed and falsely charged overdraft fees, the company closed my account and my account number was XXXX service fees in which Im eligible for refunds. Wells Fargo owes me between {$800.00} and {$2000.00} and they improperly closed my account and I want something to be done', Company public response='Company has responded to the consumer and the CFPB and chooses not to provide a public response', Company='WELLS FARGO & COMPANY', State='MD', ZIP code='207XX', Tags=None, Consumer consent provided?='Consent provided', Submitted via='Web', Date sent to company='2023-03-04', Company response to consumer='Closed with explanation', Timely response?='Yes', Consumer disputed?='N/A', Complaint ID='6645682'),
 Row(Date received='2022-02-19', Product='Checking or savings 

#### Optimize : Compaction

In [19]:
from delta.tables import *

# version 3 compaction
s3table_path = "s3a://savepaint-bucket/complaints-table"


# delta Table load
raw_deltaTable = DeltaTable.forPath(spark, s3table_path)
raw_deltaTable.optimize().executeCompaction()

                                                                                

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>>]

In [11]:
# 이전 버전 삭제
# raw_deltaTable.vacuum(0)

In [11]:
(
    spark.read.format("delta")
    .option("versionAsOf","1")
    .load(s3table_path)
    .createOrReplaceTempView("x1")
)

In [12]:
spark.sql(
    "select * from x1 where Issue = 'Closing an account'"
).collect()



24/03/05 16:10:09 ERROR Executor: Exception in task 0.0 in stage 42.0 (TID 560)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
	at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1696)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:925)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:956)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase$ParquetRowGroupReaderImpl.readNextRowGroup(SpecificParquetRecordReaderBase.java:271)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:388)
	at org.apache.spark.sql.executio

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/donghee/work/deltalake1/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/donghee/work/deltalake1/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/donghee/work/deltalake1/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/donghee/work/deltalake1/li

ConnectionRefusedError: [Errno 111] Connection refused

---

# Compare optimize Query Time : Z-ordering

In [27]:
(
    delta.DeltaTable.forPath(spark, s3table_path)
    .optimize()
    .executeZOrderBy("Issue")
)



24/03/04 16:53:54 ERROR Executor: Exception in task 3.0 in stage 81.0 (TID 1430)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
	at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1696)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:925)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:956)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase$ParquetRowGroupReaderImpl.readNextRowGroup(SpecificParquetRecordReaderBase.java:271)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:388)
	at org.apache.spark.sql.executi

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/donghee/work/deltalake1/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/donghee/work/deltalake1/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/donghee/work/deltalake1/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/donghee/work/deltalake1/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
  

ConnectionRefusedError: [Errno 111] Connection refused