In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import time as timer
import pandas as pd
import numpy as np
import argparse
import datetime
import json
import os

# 'value' 열의 JSON 문자열을 파싱하여 각각의 열로 만들기
schema = StructType(
    [
        StructField("index", IntegerType()),
        StructField("blk_no", StringType()),
        StructField("press3", IntegerType()),
        StructField("calc_press2", DoubleType()),
        StructField("press4", IntegerType()),
        StructField("calc_press1", DoubleType()),
        StructField("calc_press4", DoubleType()),
        StructField("calc_press3", DoubleType()),
        StructField("bf_gps_lon", DoubleType()),
        StructField("gps_lat", DoubleType()),
        StructField("speed", DoubleType()),
        StructField("in_dt", StringType()),
        StructField("move_time", DoubleType()),
        StructField("dvc_id", StringType()),
        StructField("dsme_lat", DoubleType()),
        StructField("press1", IntegerType()),
        StructField("press2", IntegerType()),
        StructField("work_status", IntegerType()),
        StructField("timestamp", StringType()),
        StructField("is_adjust", StringType()),
        StructField("move_distance", IntegerType()),
        StructField("weight", DoubleType()),
        StructField("dsme_lon", DoubleType()),
        StructField("in_user", StringType()),
        StructField("eqp_id", IntegerType()),
        StructField("blk_get_seq_id", IntegerType()),
        StructField("lot_no", StringType()),
        StructField("proj_no", StringType()),
        StructField("gps_lon", DoubleType()),
        StructField("seq_id", LongType()),
        StructField("bf_gps_lat", DoubleType()),
        StructField("blk_dvc_id", StringType()),
    ]
)

print("FILES IN THIS DIRECTORY")
print(os.listdir(os.getcwd()))

FILES IN THIS DIRECTORY
['.bashrc', '.bash_logout', '.profile', '.ipython', '.npm', '.bash_history', '.local', '.ipynb_checkpoints', 'config.json', '.jupyter', 'jars', '.conda', '.cache', '.config', '.wget-hsts', 'work']


In [2]:
# config.json 파일 읽기
with open("config.json", "r") as f:
    config = json.load(f)

jar_urls = ",".join(config["KAFKA_JAR_URLS"])
repartition_num = config["NUM_EXECUTORS"] * config["EXECUTOR_CORES"] * 2

In [3]:
# SparkSession 생성
spark = (
    SparkSession.builder.master("spark://spark-master-service:7077")
    .config("spark.driver.host", "10.42.2.119")
    .config("spark.driver.port", "39337")
    .config("spark.executor.instances", config["NUM_EXECUTORS"])
    .config("spark.executor.cores", config["EXECUTOR_CORES"])
    .config("spark.executor.memory", config["EXECUTOR_MEMORY"])
    .config("spark.defaul.parallelism", repartition_num)
    .config("spark.sql.shuffle.partitions", repartition_num)
    .config("spark.jars", jar_urls)  # JAR 파일 포함
    .appName("asdf")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("ERROR")

24/05/05 17:07:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
sc

In [5]:
print("Current Spark configuration:")
for key, value in sorted(sc._conf.getAll(), key=lambda x: x[0]):
    print(f"{key} = {value}")

Current Spark configuration:
spark.app.id = app-20240505170756-0059
spark.app.initial.jar.urls = spark://10.42.2.119:39337/jars/commons-pool2-2.6.2.jar,spark://10.42.2.119:39337/jars/commons-logging-1.1.3.jar,spark://10.42.2.119:39337/jars/hadoop-client-runtime-3.3.1.jar,spark://10.42.2.119:39337/jars/jsr305-3.0.0.jar,spark://10.42.2.119:39337/jars/htrace-core4-4.1.0-incubating.jar,spark://10.42.2.119:39337/jars/kafka-clients-2.8.1.jar,spark://10.42.2.119:39337/jars/spark-streaming-kafka-0-10_2.12-3.2.4.jar,spark://10.42.2.119:39337/jars/hadoop-client-api-3.3.1.jar,spark://10.42.2.119:39337/jars/spark-sql-kafka-0-10_2.12-3.2.4.jar,spark://10.42.2.119:39337/jars/spark-token-provider-kafka-0-10_2.12-3.2.4.jar
spark.app.name = asdf
spark.app.startTime = 1714928875560
spark.defaul.parallelism = 96
spark.driver.host = 10.42.2.119
spark.driver.port = 39337
spark.executor.cores = 16
spark.executor.id = driver
spark.executor.instances = 3
spark.executor.memory = 24G
spark.jars = jars/jsr305-3.

In [6]:
# 그냥 가져오기
df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "my-cluster-kafka-bootstrap.kafka.svc:9092")
    .option("subscribe", "my-topic")
    .load()
)  # 밀리초 단위 에포치 시간endingTimestamp
df = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")
df = df.withColumnRenamed("timestamp", "createTime")
df = df.withColumn("value", from_json(df["value"], schema))
for field in schema.fields:
    df = df.withColumn(field.name, df["value." + field.name])
df = df.drop("value")
# 이거쓰면 df가 repartition_num 수만큼 쪼개져서 병렬처리가능한 상태가 됨.
df = df.repartition(repartition_num)
df.printSchema()

root
 |-- createTime: string (nullable = true)
 |-- index: integer (nullable = true)
 |-- blk_no: string (nullable = true)
 |-- press3: integer (nullable = true)
 |-- calc_press2: double (nullable = true)
 |-- press4: integer (nullable = true)
 |-- calc_press1: double (nullable = true)
 |-- calc_press4: double (nullable = true)
 |-- calc_press3: double (nullable = true)
 |-- bf_gps_lon: double (nullable = true)
 |-- gps_lat: double (nullable = true)
 |-- speed: double (nullable = true)
 |-- in_dt: string (nullable = true)
 |-- move_time: double (nullable = true)
 |-- dvc_id: string (nullable = true)
 |-- dsme_lat: double (nullable = true)
 |-- press1: integer (nullable = true)
 |-- press2: integer (nullable = true)
 |-- work_status: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- is_adjust: string (nullable = true)
 |-- move_distance: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- dsme_lon: double (nullable = true)
 |-- in_user: string (nul

In [7]:
df.show(truncate=False)

                                                                                

+-----------------------+-----+------+------+-----------+------+-----------+-----------+-----------+----------+--------+-----+-----------------------+---------+-----------+----------------+------+------+-----------+-----------------------+---------+-------------+-------+----------------+-------------------------------------------------+------+--------------+------+-------+----------+---------+----------+----------+
|createTime             |index|blk_no|press3|calc_press2|press4|calc_press1|calc_press4|calc_press3|bf_gps_lon|gps_lat |speed|in_dt                  |move_time|dvc_id     |dsme_lat        |press1|press2|work_status|timestamp              |is_adjust|move_distance|weight |dsme_lon        |in_user                                          |eqp_id|blk_get_seq_id|lot_no|proj_no|gps_lon   |seq_id   |bf_gps_lat|blk_dvc_id|
+-----------------------+-----+------+------+-----------+------+-----------+-----------+-----------+----------+--------+-----+-----------------------+---------+--

타임스탬프를 밀리초 단위로 에포치(epoch)하는 것은 Unix epoch 시간 형식을 사용하는 것을 의미합니다. Unix epoch 시간은 1970년 1월 1일 00:00:00 UTC를 기준으로 경과된 시간을 초 또는 밀리초 단위로 나타낸 것입니다.

Unix 에포치 시간
Unix 에포치 시간은 특정 시점의 시간을 나타내는 일반적인 방식으로, 아래와 같이 두 가지 방법으로 나타낼 수 있습니다:

초 단위로 표현:
예: 1633052800 (이는 2021년 10월 1일 00:00:00 UTC에 해당합니다)
밀리초 단위로 표현:
예: 1633052800000 (이는 2021년 10월 1일 00:00:00 UTC에 해당합니다)
밀리초 단위는 초 단위보다 정확한 표현으로, 초 단위의 값에 1000을 곱하여 밀리초 단위로 변환할 수 있습니다.

startingTimestamp에 밀리초 단위 Unix 타임스탬프 사용
startingTimestamp 옵션은 Unix epoch 시간을 밀리초 단위로 받습니다. 예를 들어, 2021년 10월 1일 00:00:00 UTC 이후에 생성된 메시지를 읽으려면 startingTimestamp를 1633052800000으로 설정해야 합니다.

In [10]:
from datetime import datetime


def to_milliseconds_epoch(timestamp_str):
    # 주어진 문자열을 datetime 객체로 변환
    dt = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")
    # 에포치 시간을 구한 뒤 밀리초 단위로 변환
    milliseconds_epoch = int(dt.timestamp() * 1000)
    return milliseconds_epoch

In [11]:
# 테스트
result = to_milliseconds_epoch("2024-05-05 05:50:20.652")
result

1714888220652

In [8]:
# 메세지 생성 시간을 Unix 밀리초 단위로 에포치한 값으로 변환하여, 시간 범위로 데이터 가져오기!!
# 메세지 생성이 2024-05-05 05:50:20.652 이때 된 메세지의 createTime을 startingTimestamp로 설정
# endingTimestamp는 대충 임의로 10_000_000 밀리초정도 더 높게 설정.
# startingOffsetsByTimestampStrategy 설정을 통해서, No offset matched from request of topic 오류 해결.
df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "my-cluster-kafka-bootstrap.kafka.svc:9092")
    .option("subscribe", "my-topic")
    .option("startingOffsetsByTimestampStrategy", "latest")
    .option("startingTimestamp", "1714888220652")
    .option("endingTimestamp", "1714898220652")
    .load()
)  # 밀리초 단위 에포치 시간endingTimestamp
df = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")
df = df.withColumnRenamed("timestamp", "createTime")
df = df.withColumn("value", from_json(df["value"], schema))
for field in schema.fields:
    df = df.withColumn(field.name, df["value." + field.name])
df = df.drop("value")
# 이거쓰면 df가 repartition_num 수만큼 쪼개져서 병렬처리가능한 상태가 됨.
df = df.repartition(repartition_num)
df.printSchema()

root
 |-- createTime: string (nullable = true)
 |-- index: integer (nullable = true)
 |-- blk_no: string (nullable = true)
 |-- press3: integer (nullable = true)
 |-- calc_press2: double (nullable = true)
 |-- press4: integer (nullable = true)
 |-- calc_press1: double (nullable = true)
 |-- calc_press4: double (nullable = true)
 |-- calc_press3: double (nullable = true)
 |-- bf_gps_lon: double (nullable = true)
 |-- gps_lat: double (nullable = true)
 |-- speed: double (nullable = true)
 |-- in_dt: string (nullable = true)
 |-- move_time: double (nullable = true)
 |-- dvc_id: string (nullable = true)
 |-- dsme_lat: double (nullable = true)
 |-- press1: integer (nullable = true)
 |-- press2: integer (nullable = true)
 |-- work_status: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- is_adjust: string (nullable = true)
 |-- move_distance: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- dsme_lon: double (nullable = true)
 |-- in_user: string (nul

In [9]:
df.show(truncate=False)



+-----------------------+-----+------+------+-----------+------+-----------+-----------+-----------+----------+--------+-----+-----------------------+---------+-----------+----------------+------+------+-----------+-----------------------+---------+-------------+-------+----------------+-------------------------------------------------+------+--------------+------+-------+----------+---------+----------+----------+
|createTime             |index|blk_no|press3|calc_press2|press4|calc_press1|calc_press4|calc_press3|bf_gps_lon|gps_lat |speed|in_dt                  |move_time|dvc_id     |dsme_lat        |press1|press2|work_status|timestamp              |is_adjust|move_distance|weight |dsme_lon        |in_user                                          |eqp_id|blk_get_seq_id|lot_no|proj_no|gps_lon   |seq_id   |bf_gps_lat|blk_dvc_id|
+-----------------------+-----+------+------+-----------+------+-----------+-----------+-----------+----------+--------+-----+-----------------------+---------+--

                                                                                

In [12]:
# SparkSession 종료
spark.stop()