In [13]:
import sqlite3
import pandas as pd
import numpy as np
from datetime import datetime
import sys
sys.path.append('../')
from user_packages import hashing


In [14]:
# set variables
target_model_name = 'excited.skirt.earth' # From bv_Model, make sure you are going to the right place
source_file_name = 'oh_attributes_analysis_import.csv'
source_file_path = ''

record_source = 'SQLServerAnalysis'


In [None]:
# db connect
conn = sqlite3.connect('../full_metadata.db')

In [26]:
# read table to df
dtypeDict = {
    'CHARACTER_MAXIMUM_LENGTH': 'Int64'
  , 'CHARACTER_OCTET_LENGTH': 'Int64'
  , 'NUMERIC_PRECISION': 'Int64'
  , 'NUMERIC_PRECISION_RADIX': 'Int64'
  , 'NUMERIC_SCALE': 'Int64'
  , 'DATETIME_PRECISION': 'Int64'
}
#, nrows=10
df = pd.read_csv(source_file_name, dtype = dtypeDict)
df = df.replace({np.nan: None})

# drop all the data analysis fields, as they update via their own pathway.
df = df.drop(
  [
      'SCAN_DATE'
    , 'NULL_COUNT'
    , 'NOT_NULL_COUNT'
    , 'NULL_PERCENT'
    , 'NOT_NULL_PERCENT'
    , 'BLANK_COUNT'
    , 'NOT_BLANK_COUNT'
    , 'BLANK_PERCENT'
    , 'NOT_BLANK_PERCENT'
    , 'NULL_OR_BLANK_COUNT'
    , 'NOT_NULL_OR_BLANK_COUNT'
    , 'NULL_OR_BLANK_PERCENT'
    , 'NOT_NULL_OR_BLANK_PERCENT'
    , 'COUNT_DISTINCT'
    , 'MIN_OF_NUMBER'
    , 'MAX_OF_NUMBER'
    , 'MIN_OF_DATE'
    , 'MAX_OF_DATE'
  ]
  , axis=1
)

#df

In [25]:
# add extra columns

# columns with variable data
df['RecordSource'] = record_source
df['LoadDate'] = datetime.now()

# hash the payload
df = hashing.add_md5_hash_column(
  df
  , md5_column_name = 'HashDiff'
  , columns = [
      'SERVER_NAME'
    , 'DATABASE_NAME'
    , 'SCHEMA_NAME'
    , 'TABLE_NAME'
    , 'COLUMN_NAME'

    , 'ORDINAL_POSITION'
    , 'COLUMN_DEFAULT'
    , 'IS_NULLABLE'
    , 'DATA_TYPE'
    , 'CHARACTER_MAXIMUM_LENGTH'
    , 'CHARACTER_OCTET_LENGTH'
    , 'NUMERIC_PRECISION'
    , 'NUMERIC_PRECISION_RADIX'
    , 'NUMERIC_SCALE'
    , 'DATETIME_PRECISION'
    , 'CHARACTER_SET_CATALOG'
    , 'CHARACTER_SET_SCHEMA'
    , 'CHARACTER_SET_NAME'
    , 'COLLATION_CATALOG'
    , 'COLLATION_SCHEMA'
    , 'COLLATION_NAME'
    , 'DOMAIN_CATALOG'
    , 'DOMAIN_SCHEMA'
    , 'DOMAIN_NAME'
    , 'COLUMN_DESCRIPTION'
  ]
)

#df

In [22]:
# Clear down and Write to staging
conn.execute("DELETE FROM stg_Py_SqlServerAnalysisCsvToPhysicalAttribute")
conn.commit()
df.to_sql('stg_Py_SqlServerAnalysisCsvToPhysicalAttribute', conn, if_exists='append', index=False)

1091

In [23]:
# Write to the Satellite (PhysicalAttribute_SqlServerScrape)
sql_query = """
INSERT INTO rv_s_PhysicalAttribute_SqlServerScrape
(
    PhysicalAttributeHashKey
  , LoadDate
  , RecordSource
  , HashDiff

  , SERVER_NAME
  , "DATABASE_NAME"
  , SCHEMA_NAME
  , TABLE_NAME
  , COLUMN_NAME

  , ORDINAL_POSITION
  , COLUMN_DEFAULT
  , IS_NULLABLE
  , DATA_TYPE
  , CHARACTER_MAXIMUM_LENGTH
  , CHARACTER_OCTET_LENGTH
  , NUMERIC_PRECISION
  , NUMERIC_PRECISION_RADIX
  , NUMERIC_SCALE
  , DATETIME_PRECISION
  , CHARACTER_SET_CATALOG
  , CHARACTER_SET_SCHEMA
  , CHARACTER_SET_NAME
  , COLLATION_CATALOG
  , COLLATION_SCHEMA
  , COLLATION_NAME
  , DOMAIN_CATALOG
  , DOMAIN_SCHEMA
  , DOMAIN_NAME
  , COLUMN_DESCRIPTION

)
SELECT DISTINCT
    stg.PhysicalAttributeHashKey
  , stg.LoadDate
  , stg.RecordSource
  , stg.HashDiff
  
  , stg.SERVER_NAME
  , stg."DATABASE_NAME"
  , stg.SCHEMA_NAME
  , stg.TABLE_NAME
  , stg.COLUMN_NAME

  , stg.ORDINAL_POSITION
  , stg.COLUMN_DEFAULT
  , stg.IS_NULLABLE
  , stg.DATA_TYPE
  , stg.CHARACTER_MAXIMUM_LENGTH
  , stg.CHARACTER_OCTET_LENGTH
  , stg.NUMERIC_PRECISION
  , stg.NUMERIC_PRECISION_RADIX
  , stg.NUMERIC_SCALE
  , stg.DATETIME_PRECISION
  , stg.CHARACTER_SET_CATALOG
  , stg.CHARACTER_SET_SCHEMA
  , stg.CHARACTER_SET_NAME
  , stg.COLLATION_CATALOG
  , stg.COLLATION_SCHEMA
  , stg.COLLATION_NAME
  , stg.DOMAIN_CATALOG
  , stg.DOMAIN_SCHEMA
  , stg.DOMAIN_NAME
  , stg.COLUMN_DESCRIPTION

FROM
  stg_Py_SqlServerAnalysisCsvToPhysicalAttribute AS stg
  LEFT OUTER JOIN rv_s_PhysicalAttribute_SqlServerScrape AS sat ON (
    stg.PhysicalAttributeHashKey = sat.PhysicalAttributeHashKey
    AND sat.LoadDate = (
      SELECT MAX(z.LoadDate)
      FROM rv_s_PhysicalAttribute_SqlServerScrape AS z
      WHERE z.PhysicalAttributeHashKey = sat.PhysicalAttributeHashKey
    )
  )
WHERE
  (
    sat.HashDiff != stg.HashDiff
    OR sat.HashDiff IS NULL
  )
""";
conn.execute(sql_query)
conn.commit()

In [24]:
# Get the incoming tags from the staging table,
# split them into a list of tags
# insert tags, taking out the ones that are currently active.
# NOTE: This does not END any tags.
sql_query = """
WITH RECURSIVE
source_table AS (
	SELECT 
	  PhysicalAttributeHashKey AS id
	  , Tags as string
	FROM stg_Py_SqlServerAnalysisCsvToPhysicalAttribute
)
, splitstring (id, string, remaining_string) AS (
  SELECT
		id
    , CASE
      WHEN a.string LIKE '%;%'
      THEN SUBSTRING(a.string, 1, INSTR(a.string, ';')-1)
      ELSE a.string
    END AS string
    , SUBSTRING(SUBSTRING(a.string, INSTR(a.string, ';') + 1), 1) AS remaining_string
  FROM source_table AS a
UNION ALL
  SELECT 
		id
    , CASE 
      WHEN c.remaining_string LIKE '%;%' 
      THEN SUBSTRING(c.remaining_string, 1, INSTR(c.remaining_string, ';')-1) 
      ELSE c.remaining_string
    END AS string
    , CASE
      WHEN c.remaining_string LIKE '%;%' 
      THEN SUBSTRING( SUBSTRING(c.remaining_string, INSTR(c.remaining_string, ';')+1), 1)
    END AS remaining_string
  FROM splitstring c
  WHERE
    c.string <> ''
    AND c.string IS NOT NULL
)
, incoming_tags AS (
  SELECT
    s.id AS PhysicalAttributeHashKey
		, stg.LoadDate 
		, stg.RecordSource
		
    , s.string AS Tag
    , stg.LoadDate AS StartDate
  --   , ROW_NUMBER() OVER (
  -- 		PARTITION BY s.id
  -- 		ORDER BY s.string DESC
  -- 	) AS ROW_id
  FROM
    splitstring s
    LEFT JOIN stg_Py_SqlServerAnalysisCsvToPhysicalAttribute stg ON (s.id = stg.PhysicalAttributeHashKey)
  WHERE
    s.string <> ''
    AND s.string IS NOT NULL
)
, current_tags AS ( -- gets the active tags that are on the Attributes in the stg table
  SELECT
      sat.PhysicalAttributeHashKey
    , sat.Tag
    --, sat.StartDate
    --, sat.EndDate
  FROM
    rv_s_PhysicalAttribute_AnalysisTag sat
    INNER JOIN stg_Py_SqlServerAnalysisCsvToPhysicalAttribute stg ON (
      sat.PhysicalAttributeHashKey = stg.PhysicalAttributeHashKey
      AND sat.LoadDate = (
        SELECT MAX(z.LoadDate)
        FROM rv_s_PhysicalAttribute_AnalysisTag AS z
        WHERE z.PhysicalAttributeHashKey = sat.PhysicalAttributeHashKey
      )
    )
  WHERE
    sat.EndDate IS NULL
)
INSERT INTO rv_s_PhysicalAttribute_AnalysisTag
(
    PhysicalAttributeHashKey
  , LoadDate
  , RecordSource

  , Tag
  , StartDate
)
SELECT DISTINCT
    stg.PhysicalAttributeHashKey
  , stg.LoadDate
  , stg.RecordSource
  
  , stg.Tag
  , stg.LoadDate AS StartDate
FROM
  incoming_tags AS stg
  LEFT OUTER JOIN current_tags AS sat ON (
    stg.PhysicalAttributeHashKey = sat.PhysicalAttributeHashKey
		AND stg.Tag = sat.Tag
  )
-- "Where in incoming_tags and not in current_tags"	
WHERE
	sat.PhysicalAttributeHashKey IS NULL
	AND sat.Tag IS NULL
""";
conn.execute(sql_query)
conn.commit()
