In [54]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark Oracle Connection").config("spark.driver.extraClassPath", "/home/jovyan/work/mysql-connector-java-8.4.0.jar") \
    .getOrCreate()

try:
    #url = "jdbc:oracle:thin:@mysql-container:1521/test"
    url = "jdbc:mysql://mysql-container:3306/test"
    connection = spark._jvm.java.sql.DriverManager.getConnection(url, "pppsh", "1234")
    print("mysql 데이터베이스 연결 성공!")
    connection.close()
except Exception as e:
    print("mysql 데이터베이스 연결 실패:", e)
finally:
    spark.stop()

mysql 데이터베이스 연결 성공!


In [None]:
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

# Attribution 3.0 Unported (CC BY 3.0)
# https://www.kaggle.com/datasets/csanhueza/the-marvel-universe-social-network

spark = SparkSession.builder.appName("df_most_popular").getOrCreate()
csv_file_path = "file:///home/jovyan/work/sample/hero-network.csv"
# read file
df = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true").csv(csv_file_path)



In [None]:
df.show()

In [56]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import random
import string
from datetime import datetime, timedelta

# SparkSession 생성
spark = SparkSession.builder.appName("Sample DataFrame Generation").getOrCreate()

# 데이터 스키마 정의
data_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("email", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("salary", DoubleType(), True),  # DoubleType()으로 변경
    StructField("is_active", BooleanType(), True),
    StructField("registration_date", TimestampType(), True)
])

# 샘플 데이터 생성
def generate_random_data(num_rows):
    data = []
    for _ in range(num_rows):
        id = random.randint(1, 100000)
        name = ''.join(random.choices(string.ascii_letters, k=10))
        age = random.randint(18, 65)
        gender = random.choice(["Male", "Female"])
        email = f"{name}@{random.choice(['gmail.com', 'yahoo.com', 'outlook.com'])}"
        date_of_birth = (datetime.now() - timedelta(days=random.randint(365*18, 365*65))).date()
        salary = random.uniform(30000, 100000) # round 제거
        is_active = random.choice([True, False])
        registration_date = datetime.now() - timedelta(days=random.randint(0, 365)) 
        data.append((id, name, age, gender, email, date_of_birth, salary, is_active, registration_date))
    return data

# 샘플 데이터 (10,000건) 생성 및 DataFrame 생성
sample_data = generate_random_data(1000000)
df = spark.createDataFrame(sample_data, schema=data_schema)

# 생성된 DataFrame 출력 (선택 사항)
df.show()


+-----+----------+---+------+--------------------+-------------+------------------+---------+--------------------+
|   id|      name|age|gender|               email|date_of_birth|            salary|is_active|   registration_date|
+-----+----------+---+------+--------------------+-------------+------------------+---------+--------------------+
|40142|CCfUflVopl| 22|  Male|CCfUflVopl@gmail.com|   1995-05-16| 92407.39332873066|    false|2023-09-24 13:32:...|
|80773|aRcdknCgfA| 27|  Male|aRcdknCgfA@outloo...|   1976-08-10| 36622.11028766857|    false|2024-02-19 13:32:...|
|93016|ECDQmEhUgE| 33|Female|ECDQmEhUgE@outloo...|   1971-04-22| 82296.71382900636|    false|2023-10-16 13:32:...|
|60745|WOfRkFtwDT| 24|  Male|WOfRkFtwDT@yahoo.com|   1963-01-28| 52258.76529687612|     true|2024-01-11 13:32:...|
|58565|kPsJrHiawB| 44|  Male|kPsJrHiawB@yahoo.com|   1976-06-21|  85310.3197032046|     true|2024-02-20 13:32:...|
|71447|weqweVyZlI| 61|  Male|weqweVyZlI@gmail.com|   1980-08-29| 72488.143059601

In [57]:
df.count()

1000000

In [None]:
df.groupBy('registration_date').count().orderBy('count').show()

In [63]:
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
import time

# MySQL 연결 정보 설정
url = "jdbc:mysql://mysql-container:3306/test"
properties = {
    "user": "pppsh",
    "password": "1234",
    "driver": "com.mysql.cj.jdbc.Driver",
    "rewriteBatchedStatements": "true",  # 배치 삽입 활성화
    "useServerPrepStmts": "false",
    "cachePrepStmts": "true",
    "prepStmtCacheSize": "250",
    "prepStmtCacheSqlLimit": "2048",
    "useSSL": "false"
}

# 데이터 삽입 및 로깅
batch_size = 100000  # 한 번에 삽입할 row 수

start_time = time.time()



df = df.repartition(10, col("id")).cache()

# DataFrame 캐싱
#df.cache()
# 또는 메모리 부족 시 디스크 캐싱: df.persist(StorageLevel.DISK_ONLY)

total_rows = df.count()
inserted_rows = 0

for i in range(0, total_rows, batch_size):
    batch_start_time = time.time()

    batch_df = df.limit(batch_size)
    batch_df.write.jdbc(url=url, table="test.new_table", mode="append", properties=properties)
    inserted_rows += batch_size

    batch_end_time = time.time()
    batch_elapsed_time = batch_end_time - batch_start_time

    print(f"{inserted_rows}/{total_rows} rows inserted (Batch time: {batch_elapsed_time:.2f} seconds)")

# 캐시 해제
df.unpersist()

end_time = time.time()
elapsed_time = end_time - start_time
print(f"\n전체 데이터 삽입 완료 (총 시간: {elapsed_time:.2f} seconds)")


100000/1000000 rows inserted (Batch time: 1.85 seconds)
200000/1000000 rows inserted (Batch time: 1.69 seconds)
300000/1000000 rows inserted (Batch time: 1.86 seconds)
400000/1000000 rows inserted (Batch time: 1.64 seconds)
500000/1000000 rows inserted (Batch time: 1.70 seconds)
600000/1000000 rows inserted (Batch time: 1.71 seconds)
700000/1000000 rows inserted (Batch time: 1.79 seconds)
800000/1000000 rows inserted (Batch time: 1.98 seconds)
900000/1000000 rows inserted (Batch time: 1.69 seconds)
1000000/1000000 rows inserted (Batch time: 1.69 seconds)

전체 데이터 삽입 완료 (총 시간: 20.08 seconds)


In [None]:
df.groupBy('registration_date').count().show()

# 병렬 수행

In [None]:
df.show()

In [64]:
import time
from pyspark.sql import SparkSession

# 데이터프레임 설정
df = df  # 삽입할 DataFrame

# 테이블 이름 설정
table_name = "test.new_table"

start_time = time.time()

# Spark 설정
spark.conf.set("spark.sql.shuffle.partitions", "10")

# JDBC 연결 설정
target_url = "jdbc:mysql://mysql-container:3306/test"
target_properties = {
    "user": "pppsh",
    "password": "1234",
    "driver": "com.mysql.cj.jdbc.Driver",
    "rewriteBatchedStatements": "true",
    "useServerPrepStmts": "false",
    "cachePrepStmts": "true",
    "prepStmtCacheSize": "250",
    "prepStmtCacheSqlLimit": "2048",
    "useSSL": "false"
}

# 리파티셔닝 (필요한 경우)
df_partitioned = df.repartition(12, 'id')

# 데이터 쓰기 및 시간 측정
write_start = time.time()

# 데이터 쓰기 (최종 수정)
df_partitioned.write.mode("append").jdbc(url=target_url, table=table_name, properties=target_properties)

write_end = time.time()
print(f"Total write time for {table_name}: {write_end - write_start} seconds")

end_time = time.time()
total_time = end_time - start_time
print(f"Total execution time: {total_time} seconds")


Total write time for test.new_table: 8.831433773040771 seconds
Total execution time: 8.90052056312561 seconds


# 캐시 + 병렬 살짝 더 느려 ? 

In [66]:
import time
from pyspark.sql import SparkSession

# 데이터프레임 설정
df = df  # 삽입할 DataFrame

# 테이블 이름 설정
table_name = "test.new_table"

start_time = time.time()

# Spark 설정
spark.conf.set("spark.sql.shuffle.partitions", "10")

# JDBC 연결 설정
target_url = "jdbc:mysql://mysql-container:3306/test"
target_properties = {
    "user": "pppsh",
    "password": "1234",
    "driver": "com.mysql.cj.jdbc.Driver",
    "rewriteBatchedStatements": "true",
    "useServerPrepStmts": "false",
    "cachePrepStmts": "true",
    "prepStmtCacheSize": "250",
    "prepStmtCacheSqlLimit": "2048",
    "useSSL": "false"
}

# 리파티셔닝 및 캐싱
df_partitioned = df.repartition(12, 'id').cache()

# 데이터 쓰기 및 시간 측정
write_start = time.time()

# 데이터 쓰기
df_partitioned.write.mode("append").jdbc(url=target_url, table=table_name, properties=target_properties)

write_end = time.time()
print(f"Total write time for {table_name}: {write_end - write_start} seconds")

# 캐시 해제
df_partitioned.unpersist()

end_time = time.time()
total_time = end_time - start_time
print(f"Total execution time: {total_time} seconds")


Total write time for test.new_table: 10.109902381896973 seconds
Total execution time: 10.22282075881958 seconds
