# Loading Config Manager data to the Azure Database

## Prerequisites

-  Python dependencies:
    - pandas : 2.0.0,
    - json   : 2.0.9,
    - pyarrow: 12.0.1,
    - pyodbc : 4.0.39,
    - azure  : 5.0.0
- `fun.json` for access credentials
- `/create_sql` folder for SQL code to create tables

I assigned us all an equal number of tables to load in the Historic Config Manager Summary excel document in our shared google drive. The names of the parquet files in Azure Blob Storage are also included there.
See the link below: https://docs.google.com/spreadsheets/d/1GTahzraMPXh9RTNQHnqzBrZaee4qBTkjZV6v58WjhfM/edit?usp=sharing



### Section 1 - Workflow Instructions 

1. Assemble prerequisites. Ensure the python dependencies listed above are installed in your working environment. Make a subdirectory in your working directory called `/assets` and place the `fun.json` file there.

2. Download the config manager parquet files from Azure Blob Storage. Use the code in section 2. Credentials needed to access the storage container are provided in `fun.json`. 

3. Create the SQL table in the Azure database for each table. Connect to the Azure SQL Database using DBeaver or another preferred database development environment. Run the SQL code for the corresponding table in `/assets/create_sql` to create each config manager database table. 

4. Load the SQL table with data from the parquet file. Use the code in section 3. Feel free to optimize this code to make the load process faster!

5. Repeat steps 2 through 4 for all Config manager tables that were assigned.



### Section 2 - Download parquet files from Azure Storage

In [1]:
import pandas as pd
import numpy as np
import json
import time

from azure.storage.blob import BlobServiceClient
import pyodbc
import pyarrow.parquet as pq

In [2]:
# Read in credentials
with open('assets/fun.json') as f:
    cred = json.load(f)

In [3]:
def download_blob(blob_name, local_file_name):
    '''
    Downloads the parquet file from the blob container.
    '''

    # Declare variables
    STORAGEACCOUNTURL= cred['in_the_sun']
    STORAGEACCOUNTKEY= cred['fun']
    LOCALFILENAME= local_file_name
    CONTAINERNAME= 'configmanagertest1'
    BLOBNAME= blob_name

    # Download from blob
    t1=time.time()
    blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
    blob_client_instance = blob_service_client_instance.get_blob_client(CONTAINERNAME, BLOBNAME, snapshot=None)
    with open(LOCALFILENAME, "wb") as my_blob:
        blob_data = blob_client_instance.download_blob()
        blob_data.readinto(my_blob)
    t2=time.time()
    print(("It takes %s seconds to download "+BLOBNAME) % (t2 - t1))

# Call the function
download_blob(blob_name='Persist_WorkstationStatus_Data.parquet', local_file_name='assets/WorkstationStatusData.parquet')

It takes 2.1784846782684326 seconds to download Persist_WorkstationStatus_Data.parquet


In [4]:
# Read in parquet file as DataFrame.
df = pd.read_parquet('./assets/WorkstationStatusData.parquet', engine='pyarrow')
df

Unnamed: 0,RWB_WorkstationStatus_Data_ID,RWB_ETL_EVENT_DESTINATION_IDENTIFIER,RWB_CREATE_TIMESTAMP,RWB_EFFECTIVE_DATE,MachineID,InstanceKey,RevisionID,AgentID,TimeKey,rowversion,LastHWScan,SystemDefaultLCID,TimezoneOffset,LastReportVersion
0,310,879,2023-01-19 09:35:25 -06:00,2023-01-19,16783564,5,27,1,2023-01-18 14:44:10,"b'\x00\x00\x00\x01\x1b$\xe8""'",2023-01-18 14:43:36,1033,-360,30064771098
1,311,879,2023-01-19 09:35:25 -06:00,2023-01-19,16783835,6,2,1,2023-01-18 12:45:55,b'\x00\x00\x00\x01\x1b\x1d\xd7\xcc',2023-01-18 12:44:59,1033,-360,34359738369
2,312,879,2023-01-19 09:35:25 -06:00,2023-01-19,16784339,4,29,1,2023-01-19 05:44:24,b'\x00\x00\x00\x01\x1bN=@',2023-01-19 05:43:05,1033,-360,30064771100
3,313,879,2023-01-19 09:35:25 -06:00,2023-01-19,16784657,4,29,1,2023-01-19 06:33:40,b'\x00\x00\x00\x01\x1bP\xd3`',2023-01-19 06:31:46,1033,-360,25769803804
4,314,879,2023-01-19 09:35:25 -06:00,2023-01-19,16785825,3,29,1,2023-01-18 05:28:00,b'\x00\x00\x00\x01\x1a\xf9q\xfd',2023-01-18 05:26:20,1033,-360,21474836508
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1071355,1257955,10723,2023-07-02 08:03:25 -05:00,2023-07-02,16819021,2,3,1,2023-07-02 03:15:40,b'\x00\x00\x00\x01ByE\x0f',2023-07-02 02:53:04,1033,-300,8589934594
1071356,1257956,10723,2023-07-02 08:03:25 -05:00,2023-07-02,16819022,1,1,1,2023-06-30 14:28:00,b'\x00\x00\x00\x01B8\x80c',2023-06-30 12:26:27,1033,-420,4294967296
1071357,1257957,10723,2023-07-02 08:03:25 -05:00,2023-07-02,16819023,1,4,1,2023-07-02 07:48:11,b'\x00\x00\x00\x01B~*\x88',2023-07-02 05:46:22,1033,-420,4294967299
1071358,1257958,10723,2023-07-02 08:03:25 -05:00,2023-07-02,16819024,1,3,1,2023-07-02 04:31:28,b'\x00\x00\x00\x01Bzs\x08',2023-07-02 02:30:37,1033,-420,4294967298


In [None]:
# Replace empty strings with None.
df = df.applymap(lambda x: None if x == "" else x)
count = (df.eq("")).sum().sum()
print("Number of empty strings:", count)

In [None]:
# Replace any np.inf values with np.nan.
df.replace({np.inf: np.nan, -np.inf: np.nan}, inplace=True)

In [None]:
# pq = pq.where(pd.notnull(pq), None)
# pq.fillna(0, inplace=True)

In [None]:
# Convert float64 columns to int.
float_columns = df.select_dtypes(include=['float64']).columns
df[float_columns] = df[float_columns].fillna(0)
df[float_columns] = df[float_columns].astype(int)

In [None]:
df.info()

In [None]:
# Save out parquet file.
df.to_parquet('./assets/WorkstationStatusCorrected.parquet', engine='pyarrow')

### Section 3 - Load data to the Azure SQL table


#### Make the database table.



Connect to database.
1. Connect using SQL editor of choice (DBeaver, DataGrip).
2. Select Azure SQL using built-in connection type.
3. In pop-up dialog window, enter connection credentials using values from `fun.json` to authenticate.
4. Connect to Azure DB.

&nbsp;

Add table.
1. Find `Tables` subdirectory under `dbo` schema.
2. Right-click to open context window. Highlight "Add...". Click "New table".
3. Dialog window appears. Provide SQL template code from `create_sql` directory for the corresponding table.
    - Change table name in dialog window to `Persist.Table_Name`.
    - You may need to edit the SQL to successfully create the correct table.
    - Remove `IDENTITY(1,1)`
    - Remove `ON [filegroup]` statement from `WITH` near bottom.
    - Change table name to: `dbo.[Persist.Table_Name]`
4. Once the table is created, run the below function to push your data to the table.



#### Upload the DataFrame as a parquet file to the table.


In [None]:
# Define the conn string
conn_string = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={cred['server']};DATABASE={cred['db']};UID={cred['uid']};PWD={cred['pwd']}"
# Establish a connection to the SQL Server database

conn = pyodbc.connect(conn_string)

In [None]:
def load_table_v2(conn_obj, table_name, pq_file_name):
    '''
    Loads a full table of data into the table_name of the
    connect database in the conn_obj. Uses chunking for an
    efficient load.
    '''

    # Create a cursor object
    cursor = conn_obj.cursor()

    # Read in the parquet file
    parquet_data = pq.ParquetFile(pq_file_name)

    # Get the field names from the parquet file schema
    pq_field_names = parquet_data.schema_arrow.names
    col_tup_str_curr = '(' + ','.join(pq_field_names) +')'

    # Get the number of (?) to duplicate in the insert query
    num_qs_rep = len(col_tup_str_curr.split(','))

    # Make the values string
    values_ques_str = ','.join(tuple(['?']*num_qs_rep))

    # Define the SQL insert statement
    insert_query = "INSERT INTO " + table_name + " " + col_tup_str_curr + " VALUES (" + values_ques_str + ")"

    # Read the Parquet file in chunks
    chunk_size = 10000
    num_rows = parquet_data.metadata.num_rows
    num_chunks = num_rows // chunk_size + 1

    # Track the rows loaded during the insert
    rows_loaded = 0
    track_to_100k = 0

    # Process and insert the data in chunks
    for batch in parquet_data.iter_batches(chunk_size):

        # Read the chunk of data from Parquet
        chunk = batch.to_pandas()

        # Convert the chunk to a list of tuples
        records = [tuple(row) for row in chunk.itertuples(index=False)]

        # Execute the INSERT statement with executemany
        cursor.executemany(insert_query, records)
        conn_obj.commit()

        # Notify when 100K rows are loaded
        rows_loaded = rows_loaded + len(chunk)
        track_to_100k = len(chunk) + track_to_100k
        if track_to_100k >= 100000:
            print(f"{rows_loaded} number of rows loaded.")
            track_to_100k = 0

    # Close the cursor and connection
    cursor.close()

# Call the function
load_table_v2(conn_obj=conn, table_name='dbo.[Persist.WorkstationStatus_DATA]', pq_file_name='assets/WorkstationStatusCorrected.parquet')

In [None]:
# Document dependencies in a jupyter notebook
# Dependencies for this notebook
%load_ext watermark
%watermark
%watermark --iversions