# Setup

In [None]:
### Temporarily functions for making this code to run

def edw_pull(sql:str, numPartitions:int = 8, fetchsize:int = 10000):  # pragma: no cover
    """
    Pulls data from Oracle to Databricks.

    Args:
        sql: The SQL query to execute.
        fetchsize: The JDBC fetch size.
        NOT USED: num_partitions: The number of partitions for the DataFrame.

    Returns:
        A Spark DataFrame.
    """
    import logging

    try:
        # Retrieve secrets.  Embeds credentials so they are consistent in workflows.  If credentials change, they only need to be updated here and not in each individual workflow.
        un = dbutils.secrets.get(scope="CommercialAnalytics", key="edw-commarsa")
        pw = dbutils.secrets.get(scope="CommercialAnalytics", key="edw-commarsa-pw")
    except Exception as e:
        logging.error(f"Failed to retrieve secrets: {e}")
        raise

    df = (
        spark.read.format("jdbc")
        .option("url", "jdbc:oracle:thin:@reporting.datawarehouse.db.insideaag.com:1522/edwprod")
        .option("user", un)
        .option("password", pw)
        .option("query", sql)
        .option("driver", "oracle.jdbc.driver.OracleDriver")
        .option("fetchsize", fetchsize) # default is 10
        .option("numPartitions", numPartitions) # max of 8 for oracle
        # partitionColumn, lowerBound, upperBound
        .load()
    )
    # display(df)
    return df

In [0]:
import os
import sys

In [0]:
local_lib = os.getcwd()
# module_src_path = os.path.join(local_lib, "../projects", "src")

print(f"local_lib: {local_lib}")
print("File paths have been set.")
sys.path.append(local_lib)

#import functions as fn
from pyspark.sql.functions import col

In [0]:
env = dbutils.secrets.get(scope="CommercialAnalytics", key="env-databricks")
if env:
  db_env = "_" + env
else:
  db_env = ""

In [0]:
spark.sql(f"USE business_revenuemanagement{db_env}.an_revenuemanagement_ods")

# Industry Schedule - OA ASMs
> loading INDUSTRY_DW.SCHEDULE_FUTURE_AND_HISTORICAL_FACT from Oracle

In [0]:
oa_sql = """
select max(flight_dt)
from INDUSTRY_DW.SCHEDULE_FUTURE_AND_HISTORICAL_FACT
where flight_dt > TRUNC(SYSDATE)
"""

fn.edw_pull(oa_sql)

In [0]:
oa_sql = """
 select
    trunc(s.flight_dt, 'MONTH') as dptr_month,
    concat(s.orig_apt_cd, s.dest_apt_cd) as OD,
    CASE
      WHEN s.oper_carr_cd IN ('AS', 'QX', 'VX') THEN 'AS'
      WHEN s.oper_carr_cd = 'HA' THEN 'HA'
      WHEN s.mktg_carr_cd IN ('AS', 'QX', 'VX') AND s.oper_carr_cd IN ('OO') THEN 'AS'
      WHEN s.oper_carr_cd IS NULL AND s.mktg_carr_cd IN ('AS', 'QX', 'VX') THEN 'AS'
      WHEN s.oper_carr_cd IS NULL AND s.mktg_carr_cd IN ('HA') THEN 'HA'
      WHEN s.oper_carr_cd is null and s.mktg_carr_cd is null THEN null
    ELSE 'OA' END as carr_group,
    sum(s.RPT_TOTAL_ASM_CNT) as ASM,
    sum(s.TOTAL_DISTANCE_MILE) as TOTAL_DISTANCE_MILE,
    sum(s.TOTAL_SEAT_CNT) as TOTAL_SEAT_CNT
  from INDUSTRY_DW.SCHEDULE_FUTURE_AND_HISTORICAL_FACT s
  where
    -- CICD TESTING --
    trunc(s.flight_dt, 'MONTH') BETWEEN TO_DATE('2025-01-01', 'YYYY-MM-DD') and ADD_MONTHS(TRUNC(SYSDATE, 'month'), 2)
    AND s.SERVICE_TP_CD = 'J' -- iata service type codes
    AND s.IS_CODE_SHARE_IND = 0 -- removes codeshare flight stats
    AND s.INTERMEDIATE_STOP_CNT = 0
  group by
    trunc(s.flight_dt, 'MONTH'),
    concat(s.orig_apt_cd, s.dest_apt_cd),
    CASE
      WHEN s.oper_carr_cd IN ('AS', 'QX', 'VX') THEN 'AS'
      WHEN s.oper_carr_cd = 'HA' THEN 'HA'
      WHEN s.mktg_carr_cd IN ('AS', 'QX', 'VX') AND s.oper_carr_cd IN ('OO') THEN 'AS'
      WHEN s.oper_carr_cd IS NULL AND s.mktg_carr_cd IN ('AS', 'QX', 'VX') THEN 'AS'
      WHEN s.oper_carr_cd IS NULL AND s.mktg_carr_cd IN ('HA') THEN 'HA'
      WHEN s.oper_carr_cd is null and s.mktg_carr_cd is null THEN null
    ELSE 'OA' END
"""

ind_schd = fn.edw_pull(oa_sql)

In [0]:
ind_schd = ind_schd.withColumn("dptr_month", ind_schd["dptr_month"].cast("date")) \
                   .withColumn("ASM", ind_schd["ASM"].cast("int")) \
                   .withColumn("TOTAL_DISTANCE_MILE", ind_schd["TOTAL_DISTANCE_MILE"].cast("int")) \
                   .withColumn("TOTAL_SEAT_CNT", ind_schd["TOTAL_SEAT_CNT"].cast("int"))

ind_schd = ind_schd.select([col(c).alias(c.lower()) for c in ind_schd.columns])

display(ind_schd)

In [0]:

ind_schd.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("cicd_schd_fut_hist_mth_test")