In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import lit, when, count, lpad, col, sum as _sum, expr, broadcast
from pyspark.sql import functions as F
from pyspark.sql.functions import round
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import datetime, timedelta

In [0]:
# 1. 데이터 로드 및 전처리
time_df = spark.table("`1team-postgresql-connection_catalog`.silver.silver_hourly")
month_df = spark.table("`1team-postgresql-connection_catalog`.silver.silver_monthly")
week_df = spark.table("`1team-postgresql-connection_catalog`.silver.silver_weekday")
airline_df = spark.table("`1team-postgresql-connection_catalog`.silver.silver_airline")
flight_df = spark.table("`1team-postgresql-connection_catalog`.silver.silver_flight_info")

time_df.display()
month_df.display()
week_df.display()
airline_df.display()
flight_df.display()

In [0]:
# 2. 데이터 형식 변환 : 1 -> 01
time_df = time_df.withColumn(
    "month",
    lpad(col("month"), 2, "0")
)
month_df = month_df.withColumn(
    "month",
    lpad(col("month"), 2, "0")
)
flight_df = flight_df.withColumn(
    "month",
    lpad(col("month"), 2, "0")
)

time_df = time_df.withColumn(
    "day",
    lpad(col("day"), 2, "0")
)
month_df = month_df.withColumn(
    "day",
    lpad(col("day"), 2, "0")
)
flight_df = flight_df.withColumn(
    "day",
    lpad(col("day"), 2, "0")
)

time_df.display()
month_df.display()
flight_df.display()

In [0]:
# 3. 데이터 형식 변환 : 한글, 숫자 -> 영어
week_df = week_df.withColumn(
    "weekday",
    when(col("weekday") == "월요일", "Monday")
    .when(col("weekday") == "화요일", "Tuesday")
    .when(col("weekday") == "수요일", "Wednesday")
    .when(col("weekday") == "목요일", "Thursday")
    .when(col("weekday") == "금요일", "Friday")
    .when(col("weekday") == "토요일", "Saturday")
    .when(col("weekday") == "일요일", "Sunday")
)
flight_df = flight_df.withColumn(
    "day_of_week",
    when(col("day_of_week") == "2", "Monday")
    .when(col("day_of_week") == "3", "Tuesday")
    .when(col("day_of_week") == "4", "Wednesday")
    .when(col("day_of_week") == "5", "Thursday")
    .when(col("day_of_week") == "6", "Friday")
    .when(col("day_of_week") == "7", "Saturday")
    .when(col("day_of_week") == "1", "Sunday")
)

week_df.display()
flight_df.display()

In [0]:
# 4. 데이터 변환 : 조기 출발 시 출발 시간을 계획 시각으로 변환
flight_df = flight_df.withColumn(
    "departure_time",
    when(col("scheduled_time") > col("departure_time"), col("scheduled_time"))
    .otherwise(col("departure_time"))
)

flight_df = flight_df.withColumn(
    "delayed_time",
    when(col("scheduled_time") == col("departure_time"), "0")
    .otherwise(col("delayed_time"))
)

flight_df.display()

In [0]:
# 5. 컬럼 추가 : 계획 시각, 출발 시각 열 추가
flight_df = flight_df.withColumn("departure_hour", F.substring("departure_time", 1, 2))
flight_df = flight_df.withColumn("scheduled_hour", F.substring("scheduled_time", 1, 2))

flight_df.display()

In [0]:
flight_df.count()

In [0]:
# 6. 날짜 생성
month_year_pairs = month_df.select("year", "month").distinct().collect()

date_list = []
for row in month_year_pairs:
    y, m = int(row['year']), int(row['month'])
    start_date = datetime(y, m, 1)
    # 다음달 첫날 - 1일 = 말일
    if m == 12:
        end_date = datetime(y + 1, 1, 1) - timedelta(days=1)
    else:
        end_date = datetime(y, m + 1, 1) - timedelta(days=1)
    
    while start_date <= end_date:
        date_list.append((y, f"{m:02}", start_date.strftime("%Y-%m-%d"), start_date.strftime("%A")))  # ISO date, 요일
        start_date += timedelta(days=1)

date_schema = StructType([
    StructField("year", StringType()),
    StructField("month", StringType()),
    StructField("date", StringType()),
    StructField("weekday", StringType())
])

date_df = spark.createDataFrame(date_list, schema=date_schema)

date_df.display()

In [0]:
# 7. 시간대 정의 (0~23시 기준)
hours = [(f"{h:02}",) for h in range(24)]
hour_schema = StructType([StructField("hour", StringType())])
hour_df = spark.createDataFrame(hours, hour_schema)

hour_df.display()

In [0]:
# 8. 항공사 목록
airline_list_df = airline_df.select("year", "month", "airline").distinct()

airline_list_df.display()

In [0]:
# 9. 누락된 항공사 처리
missing_airlines = [
    "공군기", "아틀라스화물항공", "에어아시아 엑스", "투르크메니스탄 항공", "아메리젯인터내셔널", "사우디아라비아 항공",
    "장쑤징동화물항공사", "실크웨이항공", "프랑스항공", "독일항공", "미얀마 항공", "유피에스화물항공", "폴라에어카고 월드와이드",
    "상해항공", "기타", "타이거 에어 타이완(스마트캣)", "에어홍콩화물항공", "DHL", "화물기", "스리랑칸항공", "뱀부항공",
    "천진에어라인", "사우디아라비아항공", "젯스타 에어웨이즈", "네덜란드항공", "옴니 에어 인터내셔널", "브루나이항공",
    "페덱스화물항공", "카고룩스 이탈리아", "핀란드항공", "말린도항공", "기타타", "전일본공수 주식회사", "하이난항공", 
    "말레이시아항공", "웨스턴글로벌항공", "싱가폴항공", "에티오피안항공", "에어인디아", "어썸카고", "에어아스타나항공",
    "한에어시스템즈", "카놋샤크항공", "내셔널 항공", "스쿠트타이거", "에어아시아버하드", "중국길상항공", "에어로케이항공(주)",
    "천진항공화물", "대만구항공", "롱하오 항공", "마스에어", "순풍항공"
]

ym_df = airline_list_df.select("year", "month").distinct()

# 누락된 항공사 목록 → DataFrame
missing_airlines_df = spark.createDataFrame([(a,) for a in missing_airlines], ["airline"])

# year, month 조합과 cross join
manual_df = ym_df.crossJoin(missing_airlines_df)

# 기존 airline_list_df와 합치기
airline_list_df_fixed = airline_list_df.unionByName(manual_df).dropDuplicates(["year", "month", "airline"])

In [0]:
# 10. 모든 조합 만들기
base_df = (
    date_df
    .join(hour_df, on=[], how="cross")
    .join(airline_list_df_fixed, on=['year', 'month'], how="cross")
)
base_df = base_df.withColumnRenamed("hour", "scheduled_hour")

base_df.display()

In [0]:
base_df.count()

In [0]:
# 11. 월별 데이터 조인
base_df = base_df.join(broadcast(month_df.select("year", "month", "flight_arrival", "flight_departure", "flight_total", "passenger_arrival", "passenger_departure", "passenger_total", "cargo_arrival", "cargo_departure", "cargo_total")),
                        on=["year", "month"], how="left")

base_df.display()

In [0]:
base_df.count()

In [0]:
# 12. 시간대 비율 계산
# (1) 월별 총합을 기준으로 각 시간대별 비율 계산
time_total_df = (
    time_df.groupBy("year", "month")
    .agg(_sum("flight_arrival").alias("time_flight_arrival_sum"),
         _sum("flight_departure").alias("time_flight_departure_sum"),
         _sum("flight_total").alias("time_flight_total_sum"),
         _sum("passenger_arrival").alias("time_passenger_arrival_sum"),
         _sum("passenger_departure").alias("time_passenger_departure_sum"),
         _sum("passenger_total").alias("time_passenger_total_sum"),
         _sum("cargo_arrival").alias("time_cargo_arrival_sum"),
         _sum("cargo_departure").alias("time_cargo_departure_sum"),
         _sum("cargo_total").alias("time_cargo_total_sum"))
)

time_total_df.display()

In [0]:
# (2) 시간대별 데이터에 총합 조인 후 비율 계산
time_ratio_df = (
    time_df
    .join(time_total_df, on=["year", "month"])
    .withColumn("time_ratio_flight_arrival", col("flight_arrival") / col("time_flight_arrival_sum"))
    .withColumn("time_ratio_flight_departure", col("flight_departure") / col("time_flight_departure_sum"))
    .withColumn("time_ratio_flight_total", col("flight_total") / col("time_flight_total_sum"))
    .withColumn("time_ratio_passenger_arrival", col("passenger_arrival") / col("time_passenger_arrival_sum"))
    .withColumn("time_ratio_passenger_departure", col("passenger_departure") / col("time_passenger_departure_sum"))
    .withColumn("time_ratio_passenger_total", col("passenger_total") / col("time_passenger_total_sum"))
    .withColumn("time_ratio_cargo_arrival", col("cargo_arrival") / col("time_cargo_arrival_sum"))
    .withColumn("time_ratio_cargo_departure", col("cargo_departure") / col("time_cargo_departure_sum"))
    .withColumn("time_ratio_cargo_total", col("cargo_total") / col("time_cargo_total_sum"))
    .select("year", "month", col("hour_of_day").alias("scheduled_hour"),
            "time_ratio_flight_arrival", "time_ratio_flight_departure", "time_ratio_flight_total", "time_ratio_passenger_arrival", "time_ratio_passenger_departure", "time_ratio_passenger_total", "time_ratio_cargo_arrival", "time_ratio_cargo_departure", "time_ratio_cargo_total")
)

time_ratio_df.display()

In [0]:
# 13. 요일별 비율 계산
weekday_total_df = (
    week_df.groupBy("year", "month")
    .agg(_sum("flight_arrivals").alias("w_flight_arrival_sum"),
         _sum("flight_departures").alias("w_flight_departure_sum"),
         _sum("flight_total").alias("w_flight_total_sum"),
         _sum("passenger_arrivals").alias("w_passenger_arrival_sum"),
         _sum("passenger_departures").alias("w_passenger_departure_sum"),
         _sum("passenger_total").alias("w_passenger_total_sum"),
         _sum("cargo_arrivals").alias("w_cargo_arrival_sum"),
         _sum("cargo_departures").alias("w_cargo_departure_sum"),
         _sum("cargo_total").alias("w_cargo_total_sum"))
)

weekday_total_df.display()

In [0]:
weekday_ratio_df = (
    week_df
    .join(weekday_total_df, on=["year", "month"])
    .withColumn("weekday_ratio_flight_arrival", col("flight_arrivals") / col("w_flight_arrival_sum"))
    .withColumn("weekday_ratio_flight_departure", col("flight_departures") / col("w_flight_departure_sum"))
    .withColumn("weekday_ratio_flight_total", col("flight_total") / col("w_flight_total_sum"))
    .withColumn("weekday_ratio_passenger_arrival", col("passenger_arrivals") / col("w_passenger_arrival_sum"))
    .withColumn("weekday_ratio_passenger_departure", col("passenger_departures") / col("w_passenger_departure_sum"))
    .withColumn("weekday_ratio_passenger_total", col("passenger_total") / col("w_passenger_total_sum"))
    .withColumn("weekday_ratio_cargo_arrival", col("cargo_arrivals") / col("w_cargo_arrival_sum"))
    .withColumn("weekday_ratio_cargo_departure", col("cargo_departures") / col("w_cargo_departure_sum"))
    .withColumn("weekday_ratio_cargo_total", col("cargo_total") / col("w_cargo_total_sum"))
    .select("year", "month", "weekday",
            "weekday_ratio_flight_arrival", "weekday_ratio_flight_departure", "weekday_ratio_flight_total", "weekday_ratio_passenger_arrival", "weekday_ratio_passenger_departure", "weekday_ratio_passenger_total", "weekday_ratio_cargo_arrival", "weekday_ratio_cargo_departure", "weekday_ratio_cargo_total")
)

weekday_ratio_df.display()

In [0]:
# 14. 항공사별 비율 계산
airline_total_df = (
    airline_df.groupBy("year", "month")
    .agg(_sum("flight_arrivals").alias("a_flight_arrival_sum"),
         _sum("flight_departures").alias("a_flight_departure_sum"),
         _sum("flight_total").alias("a_flight_total_sum"),
         _sum("passenger_arrivals").alias("a_passenger_arrival_sum"),
         _sum("passenger_departures").alias("a_passenger_departure_sum"),
         _sum("passenger_total").alias("a_passenger_total_sum"),
         _sum("cargo_arrivals").alias("a_cargo_arrival_sum"),
         _sum("cargo_departures").alias("a_cargo_departure_sum"),
         _sum("cargo_total").alias("a_cargo_total_sum"))
)

airline_total_df.display()

In [0]:
airline_ratio_df = (
    airline_df
    .join(airline_total_df, on=["year", "month"])
    .withColumn("airline_ratio_flight_arrival", col("flight_arrivals") / col("a_flight_arrival_sum"))
    .withColumn("airline_ratio_flight_departure", col("flight_departures") / col("a_flight_departure_sum"))
    .withColumn("airline_ratio_flight_total", col("flight_total") / col("a_flight_total_sum"))
    .withColumn("airline_ratio_passenger_arrival", col("passenger_arrivals") / col("a_passenger_arrival_sum"))
    .withColumn("airline_ratio_passenger_departure", col("passenger_departures") / col("a_passenger_departure_sum"))
    .withColumn("airline_ratio_passenger_total", col("passenger_total") / col("a_passenger_total_sum"))
    .withColumn("airline_ratio_cargo_arrival", col("cargo_arrivals") / col("a_cargo_arrival_sum"))
    .withColumn("airline_ratio_cargo_departure", col("cargo_departures") / col("a_cargo_departure_sum"))
    .withColumn("airline_ratio_cargo_total", col("cargo_total") / col("a_cargo_total_sum"))
    .select("year", "month", "airline",
            "airline_ratio_flight_arrival", "airline_ratio_flight_departure", "airline_ratio_flight_total", "airline_ratio_passenger_arrival", "airline_ratio_passenger_departure", "airline_ratio_passenger_total", "airline_ratio_cargo_arrival", "airline_ratio_cargo_departure", "airline_ratio_cargo_total")
)

airline_ratio_df.display()

In [0]:
# 15. 모든 비율 조인
result_df = (
    base_df
    .join(time_ratio_df, on=["year", "month", "scheduled_hour"], how="left")
    .join(weekday_ratio_df, on=["year", "month", "weekday"], how="left")
    .join(airline_ratio_df, on=["year", "month", "airline"], how="left")
)

result_df.display()

In [0]:
result_df.count()

In [0]:
# 16. 추정치 계산 (각 총합 × 비율 × 비율 × 비율)
result_df = (result_df
             .withColumn("estimated_flight_arrival", col("flight_arrival") * col("time_ratio_flight_arrival") * col("weekday_ratio_flight_arrival") * col("airline_ratio_flight_arrival"))
             .withColumn("estimated_flight_departure", col("flight_departure") * col("time_ratio_flight_departure") * col("weekday_ratio_flight_departure") * col("airline_ratio_flight_departure"))
             .withColumn("estimated_flight_total", col("flight_total") * col("time_ratio_flight_total") * col("weekday_ratio_flight_total") * col("airline_ratio_flight_total"))
             .withColumn("estimated_passenger_arrival", col("passenger_arrival") * col("time_ratio_passenger_arrival") * col("weekday_ratio_passenger_arrival") * col("airline_ratio_passenger_arrival"))
             .withColumn("estimated_passenger_departure", col("passenger_departure") * col("time_ratio_passenger_departure") * col("weekday_ratio_passenger_departure") * col("airline_ratio_passenger_departure"))
             .withColumn("estimated_passenger_total", col("passenger_total") * col("time_ratio_passenger_total") * col("weekday_ratio_passenger_total") * col("airline_ratio_passenger_total"))
             .withColumn("estimated_cargo_arrival", col("cargo_arrival") * col("time_ratio_cargo_arrival") * col("weekday_ratio_cargo_arrival") * col("airline_ratio_cargo_arrival"))
             .withColumn("estimated_cargo_departure", col("cargo_departure") * col("time_ratio_cargo_departure") * col("weekday_ratio_cargo_departure") * col("airline_ratio_cargo_departure"))
             .withColumn("estimated_cargo_total", col("cargo_total") * col("time_ratio_cargo_total") * col("weekday_ratio_cargo_total") * col("airline_ratio_cargo_total"))
             .withColumn("day", F.substring(col("date"), 9, 2))
             )

result_df.display()

In [0]:
result_df.count()

In [0]:
# 17. 실제 비행 데이터와 조인
flight_df2 = flight_df.drop('date')
result_df2 = (
    flight_df2
    .join(result_df, on=["year", "month", "day", "scheduled_hour", "airline"], how="left")
)

result_df2.display()

In [0]:
# 18. 월별 추정치 합계 구하기
monthly_estimate_sum_df = result_df2.groupBy("year", "month").agg(
    _sum("estimated_flight_arrival").alias("sum_est_flight_arrival"),
    _sum("estimated_flight_departure").alias("sum_est_flight_departure"),
    _sum("estimated_flight_total").alias("sum_est_flight_total"),
    _sum("estimated_passenger_arrival").alias("sum_est_passenger_arrival"),
    _sum("estimated_passenger_departure").alias("sum_est_passenger_departure"),
    _sum("estimated_passenger_total").alias("sum_est_passenger_total"),
    _sum("estimated_cargo_arrival").alias("sum_est_cargo_arrival"),
    _sum("estimated_cargo_departure").alias("sum_est_cargo_departure"),
    _sum("estimated_cargo_total").alias("sum_est_cargo_total"),
)

monthly_estimate_sum_df.display()

In [0]:
# 19. 월별 보정 계수 계산
scaling_df = (
    monthly_estimate_sum_df
    .join(month_df.select("year", "month", "flight_arrival", "flight_departure", "flight_total", "passenger_arrival", "passenger_departure", "passenger_total", "cargo_arrival", "cargo_departure", "cargo_total"),
          on=["year", "month"])
    .withColumn("scale_flight_arrival", col("flight_arrival") / col("sum_est_flight_arrival"))
    .withColumn("scale_flight_departure", col("flight_departure") / col("sum_est_flight_departure"))
    .withColumn("scale_flight_total", col("flight_total") / col("sum_est_flight_total"))
    .withColumn("scale_passenger_arrival", col("passenger_arrival") / col("sum_est_passenger_arrival"))
    .withColumn("scale_passenger_departure", col("passenger_departure") / col("sum_est_passenger_departure"))
    .withColumn("scale_passenger_total", col("passenger_total") / col("sum_est_passenger_total"))
    .withColumn("scale_cargo_arrival", col("cargo_arrival") / col("sum_est_cargo_arrival"))
    .withColumn("scale_cargo_departure", col("cargo_departure") / col("sum_est_cargo_departure"))
    .withColumn("scale_cargo_total", col("cargo_total") / col("sum_est_cargo_total"))
    .select("year", "month", "scale_flight_arrival", "scale_flight_departure", "scale_flight_total", "scale_passenger_arrival", "scale_passenger_departure", "scale_passenger_total", "scale_cargo_arrival", "scale_cargo_departure", "scale_cargo_total")
)

scaling_df.display()

In [0]:
# 20. 최종 추정치 구하기
final_df = (
    result_df2
    .join(scaling_df, on=["year", "month"], how="inner")
    .withColumn("final_estimated_flight_arrival", col("estimated_flight_arrival") * col("scale_flight_arrival"))
    .withColumn("final_estimated_flight_departure", col("estimated_flight_departure") * col("scale_flight_departure"))
    .withColumn("final_estimated_flight_total", col("estimated_flight_total") * col("scale_flight_total"))
    .withColumn("final_estimated_passenger_arrival", col("estimated_passenger_arrival") * col("scale_passenger_arrival"))
    .withColumn("final_estimated_passenger_departure", col("estimated_passenger_departure") * col("scale_passenger_departure"))
    .withColumn("final_estimated_passenger_total", col("estimated_passenger_total") * col("scale_passenger_total"))
    .withColumn("final_estimated_cargo_arrival", col("estimated_cargo_arrival") * col("scale_cargo_arrival"))
    .withColumn("final_estimated_cargo_departure", col("estimated_cargo_departure") * col("scale_cargo_departure"))
    .withColumn("final_estimated_cargo_total", col("estimated_cargo_total") * col("scale_cargo_total"))
)

final_df.display()

In [0]:
final_df.count()

In [0]:
# 21. 소수점 반올림 및 컬럼 처리
final_df2 = (final_df
            .withColumn("final_estimated_flight_arrival", round(final_df.final_estimated_flight_arrival, 2))
            .withColumn("final_estimated_flight_departure", round(final_df.final_estimated_flight_departure, 2))
            .withColumn("final_estimated_flight_total", round(final_df.final_estimated_flight_total, 2))
            .withColumn("final_estimated_passenger_arrival", round(final_df.final_estimated_passenger_arrival, 2))
            .withColumn("final_estimated_passenger_departure", round(final_df.final_estimated_passenger_departure, 2))
            .withColumn("final_estimated_passenger_total", round(final_df.final_estimated_passenger_total, 2))
            .withColumn("final_estimated_cargo_arrival", round(final_df.final_estimated_cargo_arrival, 2))
            .withColumn("final_estimated_cargo_departure", round(final_df.final_estimated_cargo_departure, 2))
            .withColumn("final_estimated_cargo_total", round(final_df.final_estimated_cargo_total, 2))
            )
final_df2 = final_df2.drop('flight_arrival', 'flight_departure', 'flight_total', 'passenger_arrival', 'passenger_departure', 'passenger_total', 'cargo_arrival', 'cargo_departure', 'cargo_total', 'time_ratio_flight_arrival', 'time_ratio_flight_departure', 'time_ratio_flight_total', 'time_ratio_passenger_arrival', 'time_ratio_passenger_departure', 'time_ratio_passenger_total', 'time_ratio_cargo_arrival', 'time_ratio_cargo_departure', 'time_ratio_cargo_total', 'weekday_ratio_flight_arrival', 'weekday_ratio_flight_departure', 'weekday_ratio_flight_total', 'weekday_ratio_passenger_arrival', 'weekday_ratio_passenger_departure', 'weekday_ratio_passenger_total', 'weekday_ratio_cargo_arrival', 'weekday_ratio_cargo_departure', 'weekday_ratio_cargo_total', 'airline_ratio_flight_arrival', 'airline_ratio_flight_departure', 'airline_ratio_flight_total', 'airline_ratio_passenger_arrival', 'airline_ratio_passenger_departure', 'airline_ratio_passenger_total', 'airline_ratio_cargo_arrival', 'airline_ratio_cargo_departure', 'airline_ratio_cargo_total','estimated_flight_arrival', 'estimated_flight_departure', 'estimated_flight_total', 'estimated_passenger_arrival', 'estimated_passenger_departure', 'estimated_passenger_total', 'estimated_cargo_arrival', 'estimated_cargo_departure', 'estimated_cargo_total', 'scale_flight_arrival', 'scale_flight_departure', 'scale_flight_total', 'scale_passenger_arrival', 'scale_passenger_departure', 'scale_passenger_total', 'scale_cargo_arrival', 'scale_cargo_departure', 'scale_cargo_total')

final_df2.display()

In [0]:
final_df2.count()

In [0]:
flight_keys = (flight_df2
    .select("year", "month", "day", "scheduled_hour", "airline")
    .distinct()
)

final_keys = (result_df
    .select("year", "month", "day", "scheduled_hour", "airline")
    .distinct()
)

missing_keys = flight_keys.subtract(final_keys)
missing_keys.cache().count()
missing_keys.display()

In [0]:
missing_keys.select("airline").distinct().exceptAll(final_keys.select("airline").distinct()).display()

In [0]:
missing_keys.select("year", "month", "day").distinct().exceptAll(final_keys.select("year", "month", "day").distinct()).display()

In [0]:
missing_keys.select("scheduled_hour").distinct().exceptAll(final_keys.select("scheduled_hour").distinct()).display()

In [0]:
flight_df.join(missing_keys, on=["year", "month", "day", "scheduled_hour", "airline"], how="inner").count()

In [0]:
flight_df.join(missing_keys, on=["year", "month", "day", "scheduled_hour", "airline"], how="inner").display()

In [0]:
null_rows = final_df2.filter(
    (col("final_estimated_flight_total").isNull()) |
    (col("final_estimated_passenger_total").isNull()) |
    (col("final_estimated_cargo_total").isNull())
)

null_rows.select("year", "month", "day", "departure_hour", "airline").display()

In [0]:
null_rows.count()

In [0]:
final_df2.count()

In [0]:
holiday_df = spark.table("`1team-postgresql-connection_catalog`.silver.silver_holiday_list")
weather_df = spark.table("`1team-postgresql-connection_catalog`.silver.silver_temperature")

holiday_df.display()
weather_df.display()

In [0]:
holiday_df2 = holiday_df.drop('date', 'seq', 'date_name')
weather_df2 = weather_df.drop('dotw', 'sea_level_pressure', 'spot_atmos_pressure', 'datetime')

holiday_df2.display()
weather_df2.display()

In [0]:
gold_df = final_df2.join(
    holiday_df2.select("year", "month", "day").withColumn("is_holiday", F.lit(1)),
    ["year", "month", "day"],
    "left"
).withColumn(
    "date_kind",
    F.when(F.col("is_holiday").isNotNull(), "01").otherwise("02")
).drop("is_holiday")


gold_df.display()

In [0]:
gold_df_u.count()

In [0]:
weather_df2 = weather_df2.withColumnRenamed("time", "scheduled_hour")
weather_df2 = weather_df2.withColumn(
    "month",
    lpad(col("month"), 2, "0")
)

gold_df = (
    weather_df2
    .join(gold_df, on=["year", "month", "scheduled_hour", "day"], how="left")
)

gold_df.display()

In [0]:
gold_df.count()

In [0]:
host = "1dt-2nd-team1-postgres.postgres.database.azure.com"
port = "5432"
database = "postgres"
user = "azureuser"
password = "asdASD123!@#"
jdbc_url = f"jdbc:postgresql://{host}:{port}/{database}?sslmode=require"

gold_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "gold.gold_v1") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()