In [1]:
import os
import socket

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [2]:
current_pod_ip = socket.gethostbyname(socket.gethostname())

sc = SparkSession.builder.master("k8s://https://kubernetes.default.svc.cluster.local")\
        .appName("spark")\
        .config("spark.executor.instances", 1) \
        .config("spark.submit.deployMode", "client") \
        .config("spark.driver.host", current_pod_ip) \
        .config("spark.driver.bindAddress", "0.0.0.0") \
        .config("spark.driver.port", "29413") \
        .config("spark.ui.port", "4040") \
        .config("spark.kubernetes.namespace", "spark") \
        .config("spark.kubernetes.driver.label.app", "spark") \
        .config("spark.kubernetes.executor.label.app", "spark") \
        .config("spark.kubernetes.container.image", "apache/spark:3.5.3") \
        .config("spark.kubernetes.container.image.pullPolicy", "Always") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:bundle:2.31.30,org.apache.hadoop:hadoop-common:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio.minio:80") \
        .config("spark.hadoop.fs.s3a.access.key", "admin") \
        .config("spark.hadoop.fs.s3a.secret.key", "admin123") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.kubernetes.debug", "true") \
        .getOrCreate()

sc.sparkContext.setLogLevel("INFO")
print('Spark context started.')

Spark context started.


In [3]:
df = sc.read \
    .options(delimeter=',',header=True, inferSchema=True) \
    .csv("s3a://weather/weather_data_random.csv")

In [4]:
df.show()
df.printSchema()

+----------+----------+----------+----------+-----+----------+--------+--------+---------+----+------+----------+----------+--------+----+----+-------+---------+
|  datetime|        dt|   sunrise|    sunset| temp|feels_like|pressure|humidity|dew_point| uvi|clouds|visibility|wind_speed|wind_deg|rain|snow|weather|isHoliday|
+----------+----------+----------+----------+-----+----------+--------+--------+---------+----+------+----------+----------+--------+----+----+-------+---------+
|2024-01-28|1706407200|1706395174|1706431791| 0.51|      0.51|    1030|      51|    -7.51|NULL|    75|   10000.0|      1.03|     350| 0.0| 0.0| Clouds|    false|
|2024-01-11|1704974400|1704926810|1704961915|  3.2|      0.27|    1016|      81|     0.26|NULL|    75|    8000.0|      3.09|     250| 0.0| 0.0| Clouds|    false|
|2024-01-13|1705158000|1705185974|1705221291|-2.71|     -2.71|    1026|      80|    -5.34|NULL|     0|   10000.0|      1.03|     200| 0.0| 0.0|  Clear|    false|
|2024-01-30|1706605200|17065

In [5]:
df = df.drop('uvi')

visibility_mean = df.select(F.mean('visibility')).collect()[0][0]
df = df.na.fill({
    'visibility': visibility_mean
})


df = df.withColumn('isHoliday', 
    F.when(F.col('isHoliday') == False, "No Holiday")
     .when(F.col('isHoliday') == True, "Holiday")
)

df.show()

print(f"num of null visibility: {df.filter(df.visibility.isNull()).count()}")

+----------+----------+----------+----------+-----+----------+--------+--------+---------+------+-----------------+----------+--------+----+----+-------+----------+
|  datetime|        dt|   sunrise|    sunset| temp|feels_like|pressure|humidity|dew_point|clouds|       visibility|wind_speed|wind_deg|rain|snow|weather| isHoliday|
+----------+----------+----------+----------+-----+----------+--------+--------+---------+------+-----------------+----------+--------+----+----+-------+----------+
|2024-01-28|1706407200|1706395174|1706431791| 0.51|      0.51|    1030|      51|    -7.51|    75|          10000.0|      1.03|     350| 0.0| 0.0| Clouds|No Holiday|
|2024-01-11|1704974400|1704926810|1704961915|  3.2|      0.27|    1016|      81|     0.26|    75|           8000.0|      3.09|     250| 0.0| 0.0| Clouds|No Holiday|
|2024-01-13|1705158000|1705185974|1705221291|-2.71|     -2.71|    1026|      80|    -5.34|     0|          10000.0|      1.03|     200| 0.0| 0.0|  Clear|No Holiday|
|2024-01-3

In [7]:
df = df.withColumn('dt_timestamp', F.to_timestamp(F.col('dt')))
df = df.withColumn('hour diff', 
    (F.unix_timestamp('dt_timestamp') - F.unix_timestamp(F.to_timestamp(F.lit('2017-12-01')))) / 3600)

df.select('dt_timestamp', 'hour diff').show()

+-------------------+---------+
|       dt_timestamp|hour diff|
+-------------------+---------+
|2024-01-28 02:00:00|  53978.0|
|2024-01-11 12:00:00|  53580.0|
|2024-01-13 15:00:00|  53631.0|
|2024-01-30 09:00:00|  54033.0|
|2024-01-24 16:00:00|  53896.0|
|2024-01-08 17:00:00|  53513.0|
|2024-01-15 18:00:00|  53682.0|
|2024-01-17 09:00:00|  53721.0|
|2024-01-31 05:00:00|  54053.0|
|2024-01-05 06:00:00|  53430.0|
|2024-01-10 14:00:00|  53558.0|
|2024-01-04 13:00:00|  53413.0|
|2024-01-17 05:00:00|  53717.0|
|2024-01-05 01:00:00|  53425.0|
|2024-01-29 04:00:00|  54004.0|
|2024-01-13 06:00:00|  53622.0|
|2024-01-31 04:00:00|  54052.0|
|2024-01-16 03:00:00|  53691.0|
|2024-01-18 04:00:00|  53740.0|
|2024-01-27 07:00:00|  53959.0|
+-------------------+---------+
only showing top 20 rows



In [9]:
df_x = df.select(
    # Date와 Hour 컬럼 생성
    F.to_date('dt_timestamp').alias('Date'),
    F.hour('dt_timestamp').alias('Hour'),
    
    # 기존 컬럼들을 새로운 이름으로 매핑
    F.col('temp').alias('Temperature(C)'),
    F.col('humidity').alias('Humidity(%)'),
    F.col('wind_speed').alias('Wind speed (m/s)'),
    F.col('visibility').alias('Visibility (10m)'),
    F.col('dew_point').alias('Dew point temperature(C)'),
    
    # 비와 눈 데이터 변환
    F.col('rain').alias('Rainfall(mm)'),
    F.col('snow').alias('Snowfall (cm)'),
    
    # 계절 추가 (날짜 기반으로 계절 계산)
    F.when((F.month('datetime').isin(12, 1, 2)), 'Winter')
     .when((F.month('datetime').isin(3, 4, 5)), 'Spring')
     .when((F.month('datetime').isin(6, 7, 8)), 'Summer')
     .when((F.month('datetime').isin(9, 10, 11)), 'Fall')
     .alias('Seasons'),
    
    F.col('isHoliday').alias('Holiday'),
    F.col("hour diff"),
    # Functioning Day는 모두 'Yes'로 설정 (실제 상황에 맞게 수정 필요)
    F.lit('Yes').alias('Functioning Day')
)
df_x.show()

+----------+----+--------------+-----------+----------------+-----------------+------------------------+------------+-------------+-------+----------+---------+---------------+
|      Date|Hour|Temperature(C)|Humidity(%)|Wind speed (m/s)| Visibility (10m)|Dew point temperature(C)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|hour diff|Functioning Day|
+----------+----+--------------+-----------+----------------+-----------------+------------------------+------------+-------------+-------+----------+---------+---------------+
|2024-01-28|   2|          0.51|         51|            1.03|          10000.0|                   -7.51|         0.0|          0.0| Winter|No Holiday|  53978.0|            Yes|
|2024-01-11|  12|           3.2|         81|            3.09|           8000.0|                    0.26|         0.0|          0.0| Winter|No Holiday|  53580.0|            Yes|
|2024-01-13|  15|         -2.71|         80|            1.03|          10000.0|                   -5.34|         0.

In [10]:
import pickle
with open('label_encoders.pkl', 'rb') as f:
    encoders = pickle.load(f)

# 매핑 딕셔너리 생성
mappings = {
    'Seasons': dict(zip(encoders['Seasons'].classes_, encoders['Seasons'].transform(encoders['Seasons'].classes_))),
    'Holiday': dict(zip(encoders['Holiday'].classes_, encoders['Holiday'].transform(encoders['Holiday'].classes_))),
    'Functioning Day': dict(zip(encoders['Functioning Day'].classes_, encoders['Functioning Day'].transform(encoders['Functioning Day'].classes_)))
}

mapping_expr = {
    col: F.create_map([F.lit(x) for x in sum(mapping.items(), ())]) 
    for col, mapping in mappings.items()
}

# 한 번에 모든 컬럼에 매핑 적용
for col, expr in mapping_expr.items():
    df_x = df_x.withColumn(col, expr.getItem(F.col(col)).cast('integer'))



In [11]:
df_y = sc.read \
    .options(delimeter=',',header=True, inferSchema=True) \
    .csv("s3a://weather/rental_bike_count.csv")

df_y = df_y.withColumn("datetime", F.to_timestamp(F.col("datetime").cast("string"), "yyyyMMdd"))


df_y = df_y.withColumn("date_hour", 
    F.concat(
        F.date_format("datetime", "yyyy-MM-dd"),
        F.lit(" "),
        F.lpad(F.col("hour").cast("string"), 2, "0")  # 기존 hour 컬럼 사용
    ))

df_x = df_x.withColumn("date_hour", 
    F.concat(
        F.col("Date"),
        F.lit(" "), 
        F.lpad(F.col("Hour").cast("string"), 2, "0")
    ))

merged_df = df_x.join(df_y, 
    (df_x.date_hour == df_y.date_hour),
    "left")  # left join 사용, 필요에 따라 'inner', 'right' 등으로 변경 가능


merged_df = merged_df.drop(df_y.hour, "Date", "date_hour", "datetime")
merged_df.show()

+----+--------------+-----------+----------------+-----------------+------------------------+------------+-------------+-------+-------+---------+---------------+----------+
|Hour|Temperature(C)|Humidity(%)|Wind speed (m/s)| Visibility (10m)|Dew point temperature(C)|Rainfall(mm)|Snowfall (cm)|Seasons|Holiday|hour diff|Functioning Day|rental_cnt|
+----+--------------+-----------+----------------+-----------------+------------------------+------------+-------------+-------+-------+---------+---------------+----------+
|   2|          0.51|         51|            1.03|          10000.0|                   -7.51|         0.0|          0.0|      3|      1|  53978.0|              1|       761|
|  12|           3.2|         81|            3.09|           8000.0|                    0.26|         0.0|          0.0|      3|      1|  53580.0|              1|      3153|
|  15|         -2.71|         80|            1.03|          10000.0|                   -5.34|         0.0|          0.0|      3|  

In [12]:
merged_df.write \
    .mode('overwrite') \
    .parquet('s3a://weather/weather_data_random_preprocessing')

sc.stop()