# **Data Retrieval and Preprocessing**
**Purpose**

This code retrieves and preprocesses construction schedule data from Databricks to create a dataset for machine learning. The final dataset includes trade classifications for activities, sampled to ensure balance across categories.

**Process Overview**
1. Database Connection:
* Connect to Databricks using secure credentials.
2. SQL Query:
* Merges activity data with trade classifications
* Assigns custom trade labels based on activity names and codes.
* Filters out irrelevant or invalid entries.
* Samples up to 9,500 rows per trade category for balanced training data and to achieve minimum of half a million samples.
3. Data Storage:
* Saves the processed data as an HDF5 file for use in the modeling pipeline.

**Key Notes**
* Credentials should be securely managed using environment variables or a secrets file.
* The query can be customized to include additional filters or classifications.


In [None]:
from databricks import sql
import pandas as pd

# Function to fetch project data from Databricks
def fetch_project_data():
    """
    Connects to the Databricks SQL endpoint and executes a query to retrieve
    and preprocess construction project data. The query performs:
    1. Merging activity codes with relevant classifications.
    2. Sampling 9500 activities per trade classification for training purposes.
    """
    # Establish connection to Databricks
    with sql.connect(
        server_hostname="insert_hostname",
        http_path="insert_http_path",
        access_token="insert_access_token",
    ) as connection:
        with connection.cursor() as cursor:
            # SQL query to preprocess and fetch project data
            query = f"""
            WITH MergedActivities AS (
                SELECT
                    A.ProjectObjectID,
                    A.ActivityObjectID,
                    A.ActivityCode,
                    A.ActivityName,
                    A.WbsobjectID,
                    W.Name,
                    -- Assigning custom trade labels based on ActivityName patterns
                    CASE
                        WHEN LOWER(A.ActivityName) RLIKE '%b(&)?g%' THEN 'GC' -- General Contractor activities
                        WHEN LOWER(A.ActivityName) LIKE '%inspect%' THEN 'INSP' -- Inspection tasks
                        WHEN LOWER(A.ActivityName) LIKE '%a/e%' OR LOWER(A.ActivityName) LIKE '%approv%' THEN 'ARCH' -- Architectural tasks
                        WHEN LOWER(A.ActivityName) LIKE '%resubmit%' THEN 'GC'
                        WHEN LOWER(A.ActivityName) LIKE '%pricing%' THEN 'OWN' -- Owner-related activities
                        WHEN LOWER(A.ActivityName) LIKE '%fire alarm%' THEN 'SPRK' -- Fire Sprinkler tasks
                        WHEN LOWER(A.ActivityName) LIKE '%waterproof%' THEN 'WTS' -- Waterproofing
                        -- Check for location-independent concrete-related keywords
                        WHEN LOWER(A.ActivityName) NOT LIKE '% - %' AND LOWER(A.ActivityName) NOT LIKE '% @ %' AND
                             (LOWER(A.ActivityName) LIKE '%pour%' OR LOWER(A.ActivityName) LIKE '%f/r/p%' OR
                              LOWER(A.ActivityName) LIKE '%sog%' OR LOWER(A.ActivityName) LIKE '%slab%') THEN 'CON'
                        -- Location-specific concrete-related keywords
                        WHEN (LOWER(A.ActivityName) LIKE '% - %' AND
                              (LOWER(A.ActivityName) LIKE '%pour%' OR LOWER(A.ActivityName) LIKE '%f/r/p%') AND
                              CHARINDEX('pour', LOWER(A.ActivityName)) < CHARINDEX(' - ', LOWER(A.ActivityName))) THEN 'CON'
                        ELSE T.ActivityCodeValue -- Default to the original activity code
                    END AS MergedActivityCodeValue,
                    T.ActivityCodeDescription,
                    T.ActivityCodeTypeName,
                    T.ActivityCodeTypeScope,
                    T.UpdateDate,
                    T.IngestedDateTime
                FROM standardized.p6.dbo_activityspread AS A
                INNER JOIN standardized.p6.dbo_taskactvx AS T ON A.ActivityObjectID = T.TaskID
                INNER JOIN standardized.p6.dbo_wbsspread AS W ON A.Wbsobjectid = W.Objectid
                WHERE T.ActivityCodeValue IN ('ACC','ARCH','CON','ELEC','GC','MEP') -- Include relevant trade codes
            ),
            SampledActivities AS (
                SELECT *,
                    ROW_NUMBER() OVER(PARTITION BY MergedActivityCodeValue ORDER BY RAND()) AS rn
                FROM MergedActivities
                WHERE LOWER(ActivityCodeTypeName) LIKE '%resp%' -- Filter responsibility codes
                  AND NOT (ActivityName LIKE '(N/A)%' OR ActivityName LIKE '(Original)%') -- Exclude invalid activity names
            )
            SELECT * FROM SampledActivities
            WHERE rn <= 9500; -- Sample 9500 rows per trade classification
            """
            # Execute the query
            cursor.execute(query)
            # Extract column names and data
            columns = [desc[0] for desc in cursor.description]
            data = cursor.fetchall()
            # Convert the data to a Pandas DataFrame
            if data:
                return pd.DataFrame(data, columns=columns)
            else:
                print("No data found")

# Fetch and save the dataset
df = fetch_project_data()
df = df.astype(str)  # Ensure all data is converted to string format

# Save the data to an HDF5 file for later use
df.to_hdf('training_data_V4.h5', key='df', mode='w')
