In [7]:
import os, subprocess, json
from datetime import datetime, timedelta
from pyarrow import fs
import pyarrow as pa
import pyspark
from pyspark.sql import SparkSession

In [20]:
def connect_hdfs(hdfs_info):
    user = hdfs_info["user"]
    host = hdfs_info["host"]
    port = hdfs_info["port"]
    
    try:
        classpath = subprocess.Popen([hdfs_info["hdfs_path"], "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0]
        os.environ["CLASSPATH"] = classpath.decode("utf-8")
        hdfs = fs.HadoopFileSystem(host=hdfs_info["host"], port=hdfs_info["port"], user=hdfs_info["user"])
        
        return hdfs
    except Exception as e:
        #print(f"Failed to connect hdfs {user}@{host}:{port}")
        #log(f"Failed to connect hdfs {user}@{host}:{port}", 1)
        return None


def compare_file_date(filename, target_date):
    # 파일명에서 날짜 추출 (예: kbs_2024-07-04_0012.csv)
    try:
        file_date_str = filename.split('_')[1]  # "2024-07-04"
        file_date = datetime.strptime(file_date_str, "%Y-%m-%d").date()  # datetime.date(2024, 7, 4)
        
        # 현재 날짜 가져오기
        current_date = target_date.date()  # 현재 날짜 (예: datetime.date(2024, 7, 4))
        
        # 날짜 비교
        if file_date == current_date:
            return True
        else:
            return False
    except Exception as e:
        return False


def get_file_list_from_hdfs(hdfs, hdfs_path):
    # HDFS 경로에서 파일 목록 가져오기
    try:
        file_infos = hdfs.get_file_info(pa.fs.FileSelector(hdfs_path, recursive=False))
        # if not file_info_list.is_directory:
        #     raise Exception(f"{hdfs_path} is not a directory")
        
        file_list = [file_info.path for file_info in file_infos]
        
        return file_list
    except Exception as e:
        print(f"Error getting file list from HDFS: {e}")
    
def filter_files_by_date(file_list, target_date):
    filtered_files = [file for file in file_list if compare_file_date(file, target_date)]
    return filtered_files

def str_preprocess(_str):
    return ' '.join(_str.replace("\n", " ").replace("\t", " ").replace("/", "").split(" "))

In [3]:
hdfs_info_path = "./API_KEYS/HDFS_INFO.json"
target_hdfs_dir_path = '/P3T5'
with open(hdfs_info_path, 'r') as header_f:
    hdfs_info = json.load(header_f)

conf = pyspark.SparkConf() \
            .setAppName("hdfs2db") \
            .setMaster("spark://master:7077") \
            .set("spark.blockManager.port", "10025") \
            .set("spark.driver.blockManager.port", "10026") \
            .set("spark.driver.port", "10027") \
            .set("spark.cores.max", "2") \
            .set("spark.jars", "/opt/spark/jars")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
hdfs_connection = connect_hdfs(hdfs_info)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/05 09:08:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2024-07-05 09:08:46,191 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [37]:
#spark
spark.stop()

In [9]:
target_date = datetime.now() - timedelta(1)
total_file_list = get_file_list_from_hdfs(hdfs_connection, target_hdfs_dir_path)
target_file_list = filter_files_by_date(total_file_list, target_date)

In [10]:
target_file_list

['/P3T5/cnn_2024-07-04_1715.csv',
 '/P3T5/cnn_2024-07-04_1736.csv',
 '/P3T5/cnn_2024-07-04_1759.csv',
 '/P3T5/cnn_2024-07-04_2300.csv',
 '/P3T5/kbs_2024-07-04_1720.csv',
 '/P3T5/kbs_2024-07-04_1721.csv',
 '/P3T5/kbs_2024-07-04_1725.csv',
 '/P3T5/kbs_2024-07-04_1745.csv',
 '/P3T5/kbs_2024-07-04_1746.csv',
 '/P3T5/kbs_2024-07-04_2300.csv',
 '/P3T5/mbc_2024-07-04_1705.csv',
 '/P3T5/mbc_2024-07-04_1720.csv',
 '/P3T5/mbc_2024-07-04_1742.csv',
 '/P3T5/mbc_2024-07-04_1754.csv',
 '/P3T5/mbc_2024-07-04_1804.csv',
 '/P3T5/mbc_2024-07-04_2300.csv']

In [None]:
target_file_list = [file for file in target_file_list if file.startwith("kbs")]

In [15]:
read_options = {
            "header": True,
            "inferSchema": True,
            "sep": "|",
            "ignoreLeadingWhiteSpace": True,
            "ignoreTrailingWhiteSpace": True,
            "multiLine": False,
        }

if target_file_list:
    df_list = [spark.read.csv(f"hdfs://{file}", header=True, inferSchema=True, sep="|", ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, multiLine=True) for file in target_file_list]
    combined_df = df_list[0]
    for df in df_list[1:]:
        combined_df = combined_df.union(df)
    
    # 결과 DataFrame 보여주기
    combined_df.show()
else:
    print("No files found for the current date.")

                                                                                

+-----------+---------------------------------+-----------------------------------+--------+-------------------+--------------------+
|institution|                     articleTitle|                    articleContents|category|            regDate|             getDate|
+-----------+---------------------------------+-----------------------------------+--------+-------------------+--------------------+
|        KBS|    "(슈퍼5시)""집값 추세 상승...|       " [앵커]   정부가 올해 하...|    경제|2024-07-04 17:14:18|2024-07-04 17:20:...|
|        KBS|   국토부, 65세이상 버스·택시 ...|     정부가 만 65세 이상 운수업 ...|    경제|2024-07-04 17:13:03|2024-07-04 17:20:...|
|        KBS|  금감원 '뻥튀기 상장' 파두 관...|"금융감독원 자본시장특별사법경찰...|    경제|2024-07-04 17:13:02|2024-07-04 17:20:...|
|        KBS|    코스피 연고점 경신 2,820대...|  간밤 미국의 기술주 강세와 국채...|    경제|2024-07-04 17:08:44|2024-07-04 17:20:...|
|        KBS|    "금감원, ""직원 사칭 '가상...|  "금융감독원 직원을 사칭해 가상...|    경제|2024-07-04 16:42:26|2024-07-04 17:20:...|
|        KBS|   "'화성 아리셀 화재' 피의자 ...|     "31

In [16]:
combined_df.limit(1).show(truncate=False)

+-----------+---------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [27]:
from pyspark.sql.functions import col, udf, date_format
from pyspark.sql.types import StringType

udf_str_preprocess = udf(str_preprocess, StringType())
processed_df = combined_df.\
                withColumn("articleTitle", udf_str_preprocess(col("articleTitle"))).\
                withColumn("articleContents", udf_str_preprocess(col("articleContents"))).\
                withColumn("regDate", date_format(col("regDate"), "yyyy-MM-dd HH:mm:ss"))

In [28]:
processed_df.limit(1).show()

+-----------+-----------------------------+----------------------------+--------+-------------------+--------------------+
|institution|                 articleTitle|             articleContents|category|            regDate|             getDate|
+-----------+-----------------------------+----------------------------+--------+-------------------+--------------------+
|        KBS|"(슈퍼5시)""집값 추세 상승...|" [앵커]   정부가 올해 하...|    경제|2024-07-04 17:14:18|2024-07-04 17:20:...|
+-----------+-----------------------------+----------------------------+--------+-------------------+--------------------+



In [29]:
processed_df.printSchema()

root
 |-- institution: string (nullable = true)
 |-- articleTitle: string (nullable = true)
 |-- articleContents: string (nullable = true)
 |-- category: string (nullable = true)
 |-- regDate: string (nullable = true)
 |-- getDate: string (nullable = true)



In [None]:
jdbc_url = "jdbc:mariadb://:3306/your_database"
table_name = "crawling"
connection_properties = {
    "user": "encore",
    "password": "3playdata!!",
    "port": ""
    "driver": "org.mariadb.jdbc.Driver"
}