
### Retrieve data from g-sheet by using Service Account

In [1]:
%pip install gspread gspread_dataframe

Collecting gspread
  Downloading gspread-6.2.1-py3-none-any.whl.metadata (11 kB)
Collecting gspread_dataframe
  Downloading gspread_dataframe-4.0.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting google-auth-oauthlib>=0.4.1 (from gspread)
  Downloading google_auth_oauthlib-1.2.4-py3-none-any.whl.metadata (3.1 kB)
Collecting requests-oauthlib>=0.7.0 (from google-auth-oauthlib>=0.4.1->gspread)
  Downloading requests_oauthlib-2.0.0-py2.py3-none-any.whl.metadata (11 kB)
Collecting oauthlib>=3.0.0 (from requests-oauthlib>=0.7.0->google-auth-oauthlib>=0.4.1->gspread)
  Downloading oauthlib-3.3.1-py3-none-any.whl.metadata (7.9 kB)
Downloading gspread-6.2.1-py3-none-any.whl (59 kB)
Downloading gspread_dataframe-4.0.0-py2.py3-none-any.whl (9.0 kB)
Downloading google_auth_oauthlib-1.2.4-py3-none-any.whl (19 kB)
Downloading requests_oauthlib-2.0.0-py2.py3-none-any.whl (24 kB)
Downloading oauthlib-3.3.1-py3-none-any.whl (160 kB)
Installing collected packages: oauthlib, requests-oauthlib, google-

## Step1: Extract Data

In [2]:
import json
import gspread
from gspread_dataframe import get_as_dataframe
import pandas as pd
from pyspark.sql.functions import current_timestamp
from datetime import date


#config Variable
SPREADSHEET_ID = '1iWmtI5iHzgxA9S9HdXloq7ak8VHA5mYO01f1MMaZ5-A'
SHEET_NAME ='ticket'
#Using Databricks secrets to access the service account key for security
SERVICE_ACCOUNT_JSON = dbutils.secrets.get(scope="ingestdata", key="service-account-key")
service_account_dict = json.loads(SERVICE_ACCOUNT_JSON)


gc = gspread.service_account_from_dict(service_account_dict)
sh = gc.open_by_key(SPREADSHEET_ID)
ws = sh.worksheet(SHEET_NAME)

# Extract to DataFrame
df = get_as_dataframe(ws, parse_date=True, evaluate_formulas=True)

#Convert to Spark, add metadata
sheet_df = spark.createDataFrame(df)
sheet_df = sheet_df.withColumn('Extracted At', current_timestamp())

# Validate extracted data
assert not sheet_df.count() < 0, "Extracted data is empty"

print(f"Extracted {sheet_df.count()} rows")


Extracted 7195 rows


## Step 2: Cleaning Column Names

In [None]:
# Sanitize column names: replace spaces and special characters with underscores
def sanitize_column(col_name):
    return (
        col_name.replace(" ", "_")
        .replace(".", "_")
        .replace("(", "")
        .replace(")", "")
        .replace("`", "")
        .replace(",", "")
        .replace(";", "")
        .replace("{", "")
        .replace("}", "")
        .replace("\n", "")
        .replace("\t", "")
        .replace("=", "_")
    )

raw_jira_data_clean = sheet_df.toDF(
    *[sanitize_column(col) for col in sheet_df.columns]
)

raw_jira_data_clean.createOrReplaceTempView("raw_jira_data_clean")

## Step 3: Loading Raw Data to SCR_JIRA table

In [None]:
from delta.tables import DeltaTable
import pyspark.sql.functions as F
from datetime import date


TARGET_TABLE = "looker_management_prod.bronze.scr_jira"
LOAD_DATE = date.today().isoformat()

##CREATE TABLE IF NOT EXISTS (infers schema from source)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {TARGET_TABLE} 
        USING DELTA
        COMMENT 'Jira Data - Weekly Load'
        TBLPROPERTIES (
            delta.autoOptimize.optimizeWrite = true,
            delta.autoOptimize.autoCompact = true
        )
        AS SELECT * FROM raw_jira_data_clean
""")

#MERGE (UPSERT)
spark.sql(f"""
    MERGE INTO {TARGET_TABLE} AS t
    USING (
        SELECT * FROM raw_jira_data_clean
    ) s
    ON t.key = s.key
    AND t.Updated = s.Updated
    WHEN MATCHED THEN 
      UPDATE SET *
    
    WHEN NOT MATCHED THEN 
      INSERT *
          """)

print(f"Merged {spark.table(TARGET_TABLE).count()} rows")
display(spark.table(TARGET_TABLE))

# Step 4: Loading data to SCR_JIRA_TRNS
Unpivot Sprint column in SCR_JIRA Table and load to SCR_JIRA_TRNS


In [None]:
TARGET_TABLE = "looker_management_prod.bronze.scr_jira_trns"

spark.sql(f"""
CREATE OR REPLACE TABLE {TARGET_TABLE} 
USING DELTA
COMMENT 'Transformed Jira Data'
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = true,
    delta.autoOptimize.autoCompact = true
)
AS
WITH JIRA_TRANSF_01 AS (
    SELECT 
        CAST(Issue_Type AS STRING) AS TCK_TP,
        CAST(Summary AS STRING) AS TCK_NM,
        CAST(Assignee AS STRING) AS ASN_NM,
        CAST(Status AS STRING) AS TCK_STS,
        CAST(Sprint AS STRING) AS PI_ID,
        CAST(Est__Story_Points AS FLOAT) AS STR_PNT,
        CAST(`Key` AS STRING) AS TCK_ID,
        CAST(parent AS STRING) AS PRN_ID,
        TRY_CAST(Start_date AS DATE) AS STR_DT,
        TRY_CAST(End_date AS TIMESTAMP) AS END_DT,
        TRY_CAST(Updated AS TIMESTAMP) AS UPD_DT,
        CAST(Assignee_accountId AS STRING) AS DEV_ID
    FROM looker_management_prod.bronze.scr_jira
),
JIRA_TRNSF_02 AS (
    SELECT
        TCK_ID,
        COALESCE(PRN_ID,'Undefined')  AS PRN_ID,
        COALESCE(DEV_ID,'Undefined')  AS DEV_ID,
        COALESCE(PI_ID,'Undefined')   AS PI_ID,
        COALESCE(ASN_NM,'Undefined')  AS ASN_NM,
        COALESCE(TCK_NM,'Undefined')  AS TCK_NM,
        COALESCE(TCK_TP,'Undefined')  AS TCK_TP,
        COALESCE(TCK_STS,'Undefined') AS TCK_STS,
        COALESCE(STR_PNT,0)           AS STR_PNT,
        COALESCE(STR_DT,'1900-01-01') AS STR_DT,
        COALESCE(END_DT,'1900-01-01') AS END_DT,
        COALESCE(UPD_DT,'1900-01-01') AS UPD_DT
    FROM JIRA_TRANSF_01
),
JIRA_TRNSF_03 AS (
    SELECT 
        TCK_ID,
        PRN_ID,
        DEV_ID,
        PI_ID_EXPLODED AS PI_ID,
        ASN_NM,
        TCK_NM,
        TCK_TP,
        TCK_STS,
        STR_PNT,
        STR_DT,
        END_DT,
        UPD_DT
    FROM JIRA_TRNSF_02
    LATERAL VIEW explode(split(PI_ID, ';')) AS PI_ID_EXPLODED
)
SELECT
    TCK_ID,
    PRN_ID,
    DEV_ID,
    PI_ID,
    ASN_NM,
    TCK_NM,
    TCK_TP,
    TCK_STS,
    STR_PNT,
    STR_DT,
    END_DT,
    UPD_DT
FROM JIRA_TRNSF_03
WHERE TCK_ID IS NOT NULL
""")