In [1]:
import os
import sys
from pyspark.sql import SparkSession

jar_path = r"C:\Users\Administrator\Downloads\sqljdbc_12.10.0.0_enu\sqljdbc_12.10\enu\jars\mssql-jdbc-12.10.0.jre8.jar"

spark = SparkSession.builder \
    .appName("SQLServerJDBC") \
    .master("local[*]") \
    .config("spark.driver.extraClassPath", jar_path) \
    .config("spark.jars", jar_path) \
    .getOrCreate()

spark.sparkContext._jvm.System.setProperty("spark.driver.extraClassPath", jar_path)

try:
    spark.sparkContext._jvm.java.lang.Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
    print("Successfully loaded SQLServerDriver class!")
except Exception as e:
    print(f"Failed to load SQLServerDriver class: {e}")

sql_server_db_config = {
    'server': '',
    'user': '',
    'password': '',
    'database': ''
}

jdbc_url = f"jdbc:sqlserver://{sql_server_db_config['server']}:1433;databaseName={sql_server_db_config['database']};encrypt=false;trustServerCertificate=true"

connection_properties = {
    "user": sql_server_db_config['user'],
    "password": sql_server_db_config['password'],
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
    "encrypt": "false",
    "trustServerCertificate": "true"
}

try:
    query = "(SELECT 1 AS test) AS test_query"
    df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)
    print("Connection successful!")
    df.show()
except Exception as e:
    print(f"Error connecting to database: {e}")


Successfully loaded SQLServerDriver class!
Connection successful!
+----+
|test|
+----+
|   1|
+----+



In [41]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def get_ranked_customer_orders(customerID = None, outletID = None):
    o_df = spark.read.jdbc(url = jdbc_url, table="gert_sibande.orders", properties=connection_properties)
    c_df = spark.read.jdbc(url= jdbc_url, table="gert_sibande.customers", properties=connection_properties)
    od_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.order_details", properties=connection_properties)
    s_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.services", properties=connection_properties)
    st_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.service_types", properties=connection_properties)

    joined_df = o_df\
        .join(c_df, o_df.customer_id == c_df.id)\
        .join(od_df, o_df.id == od_df.order_id)\
        .join(s_df, od_df.service_id == s_df.id)\
        .join(st_df, od_df.service_type_id == st_df.id)\
        .select(
            o_df.id,
            o_df.order_number,
            o_df.total,
            o_df.customer_id,
            o_df.order_date,
            c_df.name.alias('customer_name'),
            o_df.outlet_id,
            s_df.service_name,
            st_df.service_type_name,
            od_df.service_price
        )

    window_spec = Window.partitionBy('order_number').orderBy(F.desc('order_date'))
    ranked_df = joined_df.withColumn(
        'RankedPriceByCustomer',
        F.row_number().over(window_spec)
    )

    if outletID is not None:
        ranked_df = ranked_df.filter(ranked_df.outlet_id == outletID)
    if customerID is not None:
        ranked_df = ranked_df.filter(ranked_df.customer_id == customerID)
        

    result_df = ranked_df.select(
        'order_number',
        'service_name',
        'service_type_name',
        'customer_name',
        'service_price',
        'total',
        'customer_id',
        'order_date',
        
        'RankedPriceByCustomer'
    ).orderBy('order_number')

    return result_df

customer_id = 285
outlet_id = None

ranked_orders_df = get_ranked_customer_orders(customerID=customer_id, outletID=outlet_id)
ranked_orders_df.show(55)

+------------+------------+--------------------+-------------+-------------+-----+-----------+-------------------+---------------------+
|order_number|service_name|   service_type_name|customer_name|service_price|total|customer_id|         order_date|RankedPriceByCustomer|
+------------+------------+--------------------+-------------+-------------+-----+-----------+-------------------+---------------------+
|    GSC-0187|  TYRE PATCH|XTRA SEAL unirves...|      NO NAME|        60.00|60.00|        285|2024-09-04 00:00:00|                    1|
|    GSC-0355|  TYRE PATCH|XTRA SEAL unirves...|      NO NAME|        60.00|60.00|        285|2024-10-11 00:00:00|                    1|
+------------+------------+--------------------+-------------+-------------+-----+-----------+-------------------+---------------------+



In [37]:
def get_guesthouse_room_availability(startdate, enddate, outletID):
    s_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.services", properties=connection_properties).alias("s")
    sd_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.service_details", properties=connection_properties).alias("sd")
    st_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.service_types", properties=connection_properties).alias("st")
    sc_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.service_categories", properties=connection_properties).alias("sc")
    o_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.orders", properties=connection_properties).alias("o")
    od_df = spark.read.jdbc(url=jdbc_url, table="gert_sibande.order_details", properties=connection_properties).alias("od")


    base_df = s_df \
        .join(sd_df, F.col("s.id") == F.col("sd.service_id"), "left") \
        .join(st_df, F.col("sd.service_type_id") == F.col("st.id"), "left") \
        .join(sc_df, F.col("st.category_id") == F.col("sc.id"), "left") \
        .filter(F.col("st.service_type_name").like("room%")) \
        .select(
            F.col("s.id").alias("service_id"),
            F.col("s.service_name"),
            F.col("st.id").alias("service_type_id"),
            F.col("st.service_type_name"),
            F.col("s.outlet_id")
        )

 
    booked_df = o_df \
        .join(od_df, F.col("o.id") == F.col("od.order_id")) \
        .filter((F.col("o.order_date") < F.lit(enddate)) & (F.col("o.check_out_date") > F.lit(startdate))) \
        .select(F.col("od.service_type_id").alias("booked_service_type_id")) \
        .distinct()

    availability_df = base_df \
        .join(booked_df, base_df.service_type_id == booked_df.booked_service_type_id, "left") \
        .withColumn("is_booked", F.when(F.col("booked_service_type_id").isNotNull(), 1).otherwise(0))


    if outletID is not None:
        availability_df = availability_df.filter(F.col("outlet_id") == outletID)


    #availability_df = availability_df.filter(F.col("is_booked") == 0)

    return availability_df


    

In [38]:
startdate = '2025-05-01'
enddate = '2025-05-10'
outletID = 3

available_rooms_df = get_guesthouse_room_availability(startdate, enddate, outletID)
available_rooms_df.show(100)

+----------+------------+---------------+-----------------+---------+----------------------+---------+
|service_id|service_name|service_type_id|service_type_name|outlet_id|booked_service_type_id|is_booked|
+----------+------------+---------------+-----------------+---------+----------------------+---------+
|       221|       ROOMS|            474|           Room 1|        3|                   474|        1|
|       221|       ROOMS|            487|          Room 14|        3|                   487|        1|
|       221|       ROOMS|            486|          Room 13|        3|                   486|        1|
|       221|       ROOMS|            488|          Room 15|        3|                   488|        1|
|       221|       ROOMS|            485|          Room 12|        3|                   485|        1|
|       221|       ROOMS|            490|          Room 17|        3|                  NULL|        0|
|       221|       ROOMS|            481|           Room 8|        3|    