In [9]:
import adal, datetime, json, re, time
from delta.tables import *
from pyspark.sql.types import *

from pyspark.sql.functions import *

import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [10]:
%run Functions/ReferenceFunctions

In [13]:
def generate_sql_inserts(df, table_name, stmt):
    insert_statements = []
    for idx, row in df.iterrows():
        columns = ', '.join(f"[{col}]" for col in row.index)
        values = []
        for val in row.values:
            # Handle booleans
            print(val)
            if isinstance(val, bool):
                val = 1 if val else 0
            # Handle nulls
            if pd.isna(val):
                values.append("NULL")
            # Handle strings (with escaping)
            elif isinstance(val, str):
                safe_val = val.replace("'", "''")
                values.append(f"'{safe_val}'")
            # Handle numbers and others
            else:
                values.append(str(val))
        values_str = ', '.join(values)
        sql = f"INSERT INTO [{table_name}] ({columns}) VALUES ({values_str});"
        insert_statements.append(sql)
        stmt.executeUpdate(sql)

In [None]:
TenantID = ""
SynapseSPNClientID = ""
KeyVaultName = ""
ClientIDKeyVaultSecretName = ""
AzureSQLJDBCURLSecretName = ""
# Step 1: Get the Azure SQL JDBC URL

tokenlibrary = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary  

AzureSQLjdbcurl = tokenlibrary.getSecret(f"{KeyVaultName}", f"{AzureSQLJDBCURLSecretName}", "ls_kv_dag")  


# Step 2: Get the Access Token

access_token,exec_statement,con = jdbc_authorization(KeyVaultName, AzureSQLjdbcurl, ClientIDKeyVaultSecretName, TenantID, SynapseSPNClientID, "SELECT 1") 
#stmt = con.createStatement()

create_table = """
CREATE TABLE file_metadata (
    doc_id NVARCHAR(100),
    cabinet NVARCHAR(100),
    folder_id NVARCHAR(100),
    doc_name NVARCHAR(4000),
    doc_number NVARCHAR(100),
    doc_type NVARCHAR(100),
    created_date NVARCHAR(100),
    updated_date NVARCHAR(100),
    created_by NVARCHAR(100),
    facility NVARCHAR(100),
    sub_type NVARCHAR(100),
    sheet NVARCHAR(50),
    major_rev NVARCHAR(50),
    minor_rev NVARCHAR(50),
    status_date NVARCHAR(100),
    status NVARCHAR(50),
    doc_date NVARCHAR(100),
    unit NVARCHAR(50),
    d_code NVARCHAR(50),
    rev_index NVARCHAR(50),
    sec_flag NVARCHAR(50),
    r_version_label NVARCHAR(255),
    title NVARCHAR(4000),
    subject NVARCHAR(255),
    authors NVARCHAR(MAX),
    r_content_size FLOAT,
    owner_name NVARCHAR(100),
    r_modifier NVARCHAR(100),
    r_access_date NVARCHAR(100),
    a_storage_type NVARCHAR(100),
    r_immutable_flag NVARCHAR(50),
    active_flag NVARCHAR(10) DEFAULT 'active'
);
 
"""
stmt = None  # Initialize to None

data_path = spark.read.json('')
df = data_path.toPandas()
try:
    print(f"Connection object type: {type(con)}")
    print("Creating statement and executing DDL...")
    stmt = con.createStatement()
    stmt.executeUpdate(create_table)
    generate_sql_inserts(df,"file_metadata", stmt)
     
except Exception as e:
    # Print the full Java error if it's a Py4J exception    print("An error occurred:")
    print(e)
finally:
    # This block ensures resources are closed even if an error occurs    print("Closing resources...")
    if stmt is not None:
        stmt.close()
    if con is not None:
        con.close()
    print("Resources closed.")
 

In [None]:
schema = StructType([
    StructField("doc_uid", StringType(), True),
    StructField("stage", StringType(), True),
    StructField("status", StringType(), True),
    StructField("event_ts", StringType(), True),  # keep as string to avoid parsing issues
    StructField("error_msg", StringType(), True)
])
TenantID = ""
SynapseSPNClientID = ""
KeyVaultName = ""
ClientIDKeyVaultSecretName = ""
AzureSQLJDBCURLSecretName = ""
# Step 1: Get the Azure SQL JDBC URL

tokenlibrary = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary  

AzureSQLjdbcurl = tokenlibrary.getSecret(f"{KeyVaultName}", f"{AzureSQLJDBCURLSecretName}", "ls_kv_dag")  


access_token,exec_statement,con = jdbc_authorization(KeyVaultName, AzureSQLjdbcurl, ClientIDKeyVaultSecretName, TenantID, SynapseSPNClientID, "SELECT 1") 

create_table = """CREATE TABLE pipeline_activity (
    doc_uid UNIQUEIDENTIFIER NOT NULL,
    stage NVARCHAR(100),
    status NVARCHAR(50),
    event_ts DATETIMEOFFSET,
    error_msg NVARCHAR(MAX)
);

 """
path = ""
df_spark = (spark.read
                  .option("multiline", "true")     
                  .schema(schema)                   
                  .json(path))
df = df_spark.toPandas()
print(df)
try:
    print(f"Connection object type: {type(con)}")
    print("Creating statement and executing DDL...")
    stmt = con.createStatement()
    #stmt.executeUpdate(create_table)
    generate_sql_inserts(df,"pipeline_activity", stmt)
     
except Exception as e:
    print(e)
finally:
    if stmt is not None:
        stmt.close()
    if con is not None:
        con.close()
    print("Resources closed.")
