## Import Libraries

In [2]:
import pandas as pd
from sqlalchemy.engine import create_engine
from sqlalchemy.types import NVARCHAR
import datetime as dt
from datetime import datetime
import sqlalchemy
import geopandas as gpd
from shapely.wkt import loads as wkt_loads
from shapely.geometry import Point
import time
import warnings
warnings.filterwarnings("ignore")

## Import WKT

In [3]:
engine_wkt = sqlalchemy.create_engine(
        "mssql+pyodbc://dataanalytics:HajiSaab_456@10.13.75.13:1433/Analytics"
        "?driver=ODBC+Driver+17+for+SQL+Server")

In [4]:
q_wkt = """
SELECT * FROM [Analytics].[buses_track].[WKT_Bus_stops]
"""
df_wkt = pd.read_sql(q_wkt, con=engine_wkt)

### Engine and Query for Buses Data

In [5]:
engine1 = sqlalchemy.create_engine(
        "mssql+pyodbc://dataanalytics:HajiSaab_456@10.13.75.13:1433/ELM-DWH"
        "?driver=ODBC+Driver+17+for+SQL+Server")
engine2 = sqlalchemy.create_engine(
        "mssql+pyodbc://dataanalytics:HajiSaab_456@10.13.75.13:1433/naqaba"
        "?driver=ODBC+Driver+17+for+SQL+Server")
q1 = """
SELECT  DISTINCT 
        bi_plate_no AS bus_plate_no,  
        ht_trip_date AS trip_date, 
        stg_name_ar AS trip_name, 
        ht_path_id  AS path_id,
        nt_nationality_name_ar AS nationality_ar, 
        nt_nationality_name_la AS nationality_en
FROM        [dbo].[hajtafweej.hajj_trips]
INNER JOIN  [dbo].[LU_BUSES_INFOS]                  ON ht_bus_ser_no = bi_id
INNER JOIN  [dbo].[hajtafweej.hajj_trips_stages]    ON HT_id=hts_ht_id
INNER JOIN  [dbo].[hajtafweej.lu_stages]            ON HTS_STG_ID=STG_ID
INNER JOIN  [DBO].[hajtafweej.hajj_trips_details]   ON HTD_HT_ID = HT_ID
INNER JOIN  [DBO].[HAJ_DATA]                        ON HTD_HD_ID = HD_ID
LEFT JOIN   [DBO].[CMD_CENTER_EH_LU_NATIONALITIES]  ON HD_CURRENT_NATIONALITY_ID = NT_ID
WHERE       CAST(ht_trip_date AS DATE) = CAST(GETDATE() AS DATE)
AND         ht_season = 1445
UNION
SELECT  DISTINCT 
        BI_PLATE_NO AS bus_plate_no, 
        MANIFESTS_CLOSE_DATE AS trip_date, 
        HFD_AIRPORT_CODE AS trip_name, 
        CASE WHEN HFD_AIRPORT_CODE = 'JED' THEN 10
             WHEN HFD_AIRPORT_CODE = 'MED' THEN 11
        ELSE 99 END AS path_id,
        nt_nationality_name_ar AS nationality_ar, 
        nt_nationality_name_la AS nationality_en
FROM        DBO.HAJ_DATA
LEFT JOIN   DBO.HM_HC_PRE_ARRIVAL_HAJ_DATA_MAP      ON HD_ID = HHPAHDM_HD_ID
LEFT JOIN   DBO.HM_HC_PRE_ARRIVAL_DATA              ON HHPAHDM_HHPAD_ID = HHPAD_ID
LEFT JOIN   DBO.MANIFEST_DETAILS                    ON MD_HD_ID = HD_ID
LEFT JOIN   DBO.MANIFESTS                           ON MD_MANIFEST_ID = MANIFEST_ID
LEFT JOIN   DBO.HAJJ_FLIGHTS_DETAILS                ON HHPAD_HFD_ID = HFD_ID
INNER JOIN  DBO.LU_AIR_TRANSPORTATION               ON HFD_AT_ID = AT_ID
INNER JOIN  DBO.LU_BUSES_INFOS                      ON BI_ID = MANIFEST_BI_ID
LEFT JOIN   DBO.CMD_CENTER_EH_LU_NATIONALITIES      ON HD_CURRENT_NATIONALITY_ID = NT_ID
WHERE        MANIFESTS_CLOSE_DATE is NOT NULL
AND         MANIFEST_STATE != 512
AND         HFD_DIRECTION = 'A'
AND         HFD_AIRPORT_CODE in ('JED', 'MED')
AND         CAST(HFD_FLIGHT_DATE AS DATE) = CAST(GETDATE() AS DATE)
AND         CAST(MANIFESTS_CLOSE_DATE AS DATE) = CAST(GETDATE() AS DATE)

"""
q2 = """
WITH temp AS(
SELECT  MAX(CAST(fetchTime AS DATETIME)) as max_dt
from naqaba.dbo.naqabalivelocations
)
SELECT  fetchTime, busLicense, long, lati, speed,
        bus_category_name_la, bus_category_name_ar, bus_plate_no, bus_is_active, bus_id, tc_name_ar, manufacturer_name_ar, 
        season_id, bus_model, bus_type_name_ar, tc_id, bus_plate_la, bus_seats
        --bi_naqaba_bus_id
FROM        naqaba.dbo.naqabalivelocations
INNER JOIN   naqaba.dbo.buses                            ON bus_id = busId
WHERE       CAST(fetchTime AS DATETIME) = (SELECT max_dt FROM temp)
AND         season_id = 1445
"""

### Engine to Push Data

In [6]:
engine_push = sqlalchemy.create_engine(
        "mssql+pyodbc://dataanalytics:HajiSaab_456@10.13.75.13:1433/Analytics"
        "?driver=ODBC+Driver+17+for+SQL+Server")

### Global Dataframe

In [7]:
df_bus_stops = pd.DataFrame()

### Function to Check Intersecting Polygon with In/Out Timestamp

In [8]:
def calculate_polygon_intersections(df_wkt, df):
    polygons = {}
    polygon_intersecting_points = {}

    for _, row in df_wkt.iterrows():
        base_name = row['KML_File']
        polygon_geom = wkt_loads(row['WKT'])
        polygon_gdf = gpd.GeoDataFrame(geometry=[polygon_geom], crs='EPSG:4326')
        polygons[base_name] = polygon_gdf
        polygon_intersecting_points[base_name] = 0

    all_polygons_gdf = gpd.GeoDataFrame(pd.concat(polygons.values(), ignore_index=True), crs='EPSG:4326')
    all_polygons_gdf['KML_File'] = [name for name, gdf in polygons.items() for _ in range(len(gdf))]
    spatial_index = all_polygons_gdf.sindex

    df['geometry'] = [Point(x, y) for x, y in zip(df['lati'], df['long'])]
    geo_locations = gpd.GeoDataFrame(df, crs='EPSG:4326')

    geo_locations['polygon_name'] = ''
    geo_locations['location'] = ''
    geo_locations['timestamp_in']=''
    geo_locations['timestamp_out']=''
    if 'time_hours' not in geo_locations.columns:
        geo_locations['time_hours'] = 0

    for i, point in geo_locations.iterrows():
        possible_matches_index = list(spatial_index.intersection(point['geometry'].bounds))
        possible_matches = all_polygons_gdf.iloc[possible_matches_index]
        intersecting_polygons = possible_matches[possible_matches.intersects(point['geometry'])]
        intersecting_names = [name for name in intersecting_polygons['KML_File'].tolist() if name is not None]
        if intersecting_names:
            geo_locations.at[i, 'polygon_name'] = ', '.join(intersecting_names)
            geo_locations.at[i, 'location'] = 'Inside'
            geo_locations.at[i, 'timestamp_in']=dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            geo_locations.at[i, 'time_hours'] += geo_locations.at[i, 'time_hours']
        else:
            geo_locations.at[i, 'polygon_name'] = ''
            geo_locations.at[i, 'location'] = 'Outside'
            geo_locations.at[i, 'timestamp_out']=dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            geo_locations.at[i, 'time_hours'] += geo_locations.at[i, 'time_hours']

    for names in geo_locations['polygon_name'].str.split(', '):
        for name in names:
            if name and name in polygon_intersecting_points:
                polygon_intersecting_points[name] += 1
    geo_locations['timestamp_in'] = pd.to_datetime(geo_locations['timestamp_in'])
    geo_locations['timestamp_out'] = pd.to_datetime(geo_locations['timestamp_out'])
    geo_locations['time_hours']=geo_locations['time_hours'].astype('float64')
    return geo_locations

## Function to Load Data, Function to update data to get total stop times and seperate stops times

In [9]:
def load_data():
    df_ELM = pd.read_sql(q1, con=engine1)
    bus_data = pd.read_sql(q2, con=engine2)
    bus_data = bus_data.merge(df_ELM[['bus_plate_no', 'trip_name']], on='bus_plate_no', how='left')
    column = 'trip_name'
    bus_data = bus_data.dropna(subset=[column])
    bus_data = bus_data[['bus_plate_no','lati','long','trip_name']]
    bus_data = calculate_polygon_intersections(df_wkt,bus_data)
    ################ Testing with Mock Data #############################
    # engine = sqlalchemy.create_engine(
    #     "mssql+pyodbc://dataanalytics:HajiSaab_456@10.13.75.13:1433/Analytics"
    #     "?driver=ODBC+Driver+17+for+SQL+Server")
    # q = """
    # SELECT *FROM [Analytics].[buses_track].[test]
    # """
    # bus_data = pd.read_sql(q, con=engine)
    # bus_data = calculate_polygon_intersections(df_wkt,bus_data)
    #####################################################################
    bus_data['geometry'] = bus_data['geometry'].astype(str)
    return bus_data

def update_dataframe(old_df, new_df):
    global df_bus_stops
    updated_df = old_df.copy()
    for index, row in new_df.iterrows():
        bus_plate_no = row['bus_plate_no']
        new_location = row['location']
        new_polygon =row['polygon_name']
        new_timestamp_in =row['timestamp_in']
        new_timestamp_out =row['timestamp_out']
        new_time_hours = row['time_hours']
        new_geometry = row['geometry']

        if bus_plate_no in updated_df['bus_plate_no'].values:
            old_row = updated_df[updated_df['bus_plate_no'] == bus_plate_no]
            old_location = old_row['location'].values[0]
            old_timestamp_in=old_row['timestamp_in'].values[0]
            old_time_hours = old_row['time_hours'].values[0]
            old_polygon = old_row['polygon_name'].values[0]

            if old_location != new_location:
                updated_df.loc[updated_df['bus_plate_no'] == bus_plate_no, 'location'] = new_location
                updated_df.loc[updated_df['bus_plate_no'] == bus_plate_no, 'polygon_name'] = new_polygon
                updated_df.loc[updated_df['bus_plate_no'] == bus_plate_no, 'timestamp_in'] = new_timestamp_in
                updated_df.loc[updated_df['bus_plate_no'] == bus_plate_no, 'timestamp_out'] = new_timestamp_out
                updated_df.loc[updated_df['bus_plate_no'] == bus_plate_no, 'geometry'] = new_geometry
                time_delta = new_timestamp_out - old_timestamp_in
                new_time_hours += time_delta.total_seconds() / 3600
                if pd.isnull(new_time_hours):
                    updated_df.loc[updated_df['bus_plate_no'] == bus_plate_no, 'time_hours'] == old_time_hours
                else:
                    updated_df.loc[updated_df['bus_plate_no'] == bus_plate_no, 'time_hours'] += new_time_hours
                # Appending each row to get seprate times spent in each stop
                if old_location=='Inside' and new_time_hours!=0 and new_location=='Outside':
                    new_row = pd.DataFrame({'bus_plate_no': [bus_plate_no], 'location': [new_location],'polygon':[old_polygon],'time_hours':[new_time_hours]})
                    df_bus_stops = pd.concat([df_bus_stops, new_row], ignore_index=True)

        else:
            updated_df = pd.concat([updated_df, pd.DataFrame([row])], ignore_index=True)
    
    return updated_df


initial_df = load_data()
saved_df = initial_df.copy()

## Main

In [10]:
while True:
    new_df = load_data()
    saved_df = update_dataframe(saved_df, new_df)
    saved_df['date_latest']=datetime.now().strftime('%m-%d-%Y')
    txt_cols=saved_df.select_dtypes(include = ['object']).columns
    saved_df.to_sql('live_buses',engine_push, schema='buses_track',if_exists="replace",index=False,dtype={col_name: NVARCHAR for col_name in txt_cols})
    if df_bus_stops.empty:
        print('Bus Stops will be pushed on the next run !')
    else:
        df_bus_stops['date_latest']=datetime.now().strftime('%m-%d-%Y')
        txt_cols=df_bus_stops.select_dtypes(include = ['object']).columns
        df_bus_stops.to_sql('live_buses_stops',engine_push, schema='buses_track',if_exists="append",index=False,dtype={col_name: NVARCHAR for col_name in txt_cols})
        df_bus_stops = pd.DataFrame()
    print(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    time.sleep(300)

Bus Stops will be pushed on the next run !
2024-06-15 20:05:49


KeyboardInterrupt: 