In [5]:
#=============================================================================
#=== Install and import necessary modules
#=============================================================================
from import_neccessary_modules import *
modules = ['os', 'pandas', 'pyodbc', 'numpy', 'glob', 'csv', 'tarfile', 'docker']
for module in modules:
    import_neccessary_modules(module)

import numpy as np
import docker


In [1]:
import os
import pandas
import pyodbc
import numpy as np
import glob
import csv
import tarfile
import docker

In [28]:
def rename(data, oldnames, newname):
    if type(oldnames) == str: # Input can be a string or list of strings
        oldnames = [oldnames] # When renaming multiple columns
        newname = [newname] # Make sure you pass the corresponding list of new names
    i = 0
    for name in oldnames:
        oldvar = [c for c in data.columns if name in c]
        if len(oldvar) == 0:
            raise ValueError("Sorry, couldn't find that column in the dataset")
        if len(oldvar) > 1: # Doesn't have to be an exact match
            print("Found multiple columns that matched " + str(name) + ": ")
            for c in oldvar:
                print(str(oldvar.index(c)) + ": " + str(c))
            ind = input('Please enter the index of the column you would like to rename: ')
            oldvar = oldvar[int(ind)]
        if len(oldvar) == 1:
            oldvar = oldvar[0]
        data = data.rename(columns = {oldvar : newname[i]})
        i += 1
    return data

In [2]:
# Set path
my_path = r"C:\MyDataFiles\Data_CCBIS_202107"
if not os.path.exists(my_path):
    os.makedirs(my_path)
    print("Directory created: " + my_path)
# Create data directors
my_path_CCBIS = my_path + "\CCBIS"
my_path_CCBISDW = my_path + "\CCBISDW"
my_path_cleaned = my_path + "\cleaned"
directors =  [my_path_CCBIS, my_path_CCBISDW, my_path_cleaned]
for director in directors:
    if not os.path.exists(director):
        os.makedirs(director)
        print('Directory created: ' + director)
# Set up SQL Server connector
sql_conn = pyodbc.connect('DRIVER={SQL Server}; SERVER=localhost; DATABASE=CCBIS; UID=sa; PWD=SQLServer2019') 
sql_dw_conn = pyodbc.connect('DRIVER={SQL Server}; SERVER=localhost; DATABASE=CCBISDW; UID=sa; PWD=SQLServer2019')   
# Set up transform tables
mergeTables = {
    1: {
        "mergeFrom":    "DimGeography_clean.csv",
        "mergeTo":      "DimCustomer_clean.csv",
        "dw_new":       "DimCustomer_DW.csv",
        "mergeBy":      "GeographyKey",
        "tableName":    "DimCustomer",
        "column_2":     "no",
        "column_new":   "no"  
    },
    2: {
        "mergeFrom":    "DimProductGroup_clean.csv",
        "mergeTo":      "DimProduct_clean.csv",
        "dw_new":       "DimProduct_DW.csv",
        "mergeBy":      "ProductGroup_Key",
        "tableName":    "DimProduct",
        "column_2":     "Name_2",   
        "column_new":   "ProductGroup"
    },
    3: {
        "mergeFrom":    "CDR_clean.csv",
        "mergeTo":      "",
        "dw_new":       "FactCDR_DW.csv",
        "mergeBy":      "",
        "tableName":    "FactCDR",
        "column_2":     "",   
        "column_new":   ""
    },
    4: {
        "mergeFrom":    "DimAgent_clean.csv",
        "mergeTo":      "",
        "dw_new":       "DimAgent_DW.csv",
        "mergeBy":      "",
        "tableName":    "DimAgent",
        "column_2":     "",   
        "column_new":   ""
    },
    5: {
        "mergeFrom":    "DimHandleType_clean.csv",
        "mergeTo":      "",
        "dw_new":       "DimHandleType_DW.csv",
        "mergeBy":      "",
        "tableName":    "DimHandleType",
        "column_2":     "",   
        "column_new":   ""
    },
    6: {
        "mergeFrom":    "DimServiceType_clean.csv",
        "mergeTo":      "",
        "dw_new":       "DimServiceType_DW.csv",
        "mergeBy":      "",
        "tableName":    "DimServiceType",
        "column_2":     "",   
        "column_new":   ""
    },
    7: {
        "mergeFrom":    "DimSeverifyType_clean.csv",
        "mergeTo":      "",
        "dw_new":       "DimSeverifyType_DW.csv",
        "mergeBy":      "",
        "tableName":    "DimSeverifyType",
        "column_2":     "",   
        "column_new":   ""
    }
}
transTables = {
    "CDR_clean.csv":                "FactCDR",
    "DimAgent_clean.csv":           "DimAgent",
    "DimHandleType_clean.csv":      "DimHandleType",
    "DimServiceType_clean.csv":     "DimServiceType",
    "DimSeverifyType_clean.csv":    "DimSeverifyType"
}


In [11]:
#=============================================================================
#=== Transform - Merge table
#=============================================================================

print('==== Transform: Merge tables ====')
os.chdir(my_path_cleaned)
for i in mergeTables.values():
    mergeFrom = i.get("mergeFrom")
    mergeTo = i.get("mergeTo")
    dw_new = i.get("dw_new")
    mergeBy = i.get("mergeBy")
    tableName = i.get("tableName")

    # Get data
    df_from = pandas.read_csv(mergeFrom)
    size_org = df_from.shape[0]
    print('\nFrom: ' + mergeFrom + str(df_from.shape))
    print(df_from.columns)
    if mergeTo =="":
        df_new = df_from
    else:
        df_to = pandas.read_csv(mergeTo)
        size_org = df_to.shape[0]
        print('To: ' + mergeTo + str(df_to.shape))
        print(df_to.columns)

        # Merge two tables
        df_new = pandas.merge(df_to, df_from, how = 'left', on = mergeBy, suffixes=('', '_2'))
    
        #df_new.head()

        # Deal with duplicated column names
    
        if mergeFrom == 'DimProductGroup_clean.csv':
            df_new = rename(df_new, ['Name_2'], ['ProductGroup'])
        #    df_new = rename(df_new, column_2, column_new)
    toNew_column_name = list(df_new.columns)
    #for column in toNew_column_name:
    #    if column in column_2:
    
    print(len(toNew_column_name))    
    print(toNew_column_name)
 
    # Check destination DW columns
    query = "SELECT * FROM [dbo].[" + tableName + "]"
    #print(query)
    df_dw = pandas.read_sql(query, sql_dw_conn)
    dw_column_name = list(df_dw.columns)
    print(len(dw_column_name))    
    print(dw_column_name)

    for column in toNew_column_name:
        if column not in dw_column_name:
            print(column)
            df_new.drop(column, axis = 1, inplace = True)
    #print(df_new.head())

    # Rearragnge Columns
    df_new = df_new[dw_column_name]
    print(len(df_new.head()))
    print(df_new.head())

    # Export to DW .csv
    os.chdir(my_path_CCBISDW)
    df_new.to_csv(dw_new, index = False)
    df_new = pandas.read_csv(dw_new)
    size_org = df_new.shape[0]
    print('\nCreate DW file: ' + dw_new + str(df_new.shape))
    os.chdir(my_path_cleaned)


==== Transform: Merge tables ====

From: DimGeography_clean.csv(655, 10)
Index(['GeographyKey', 'City', 'StateProvinceCode', 'StateProvinceName',
       'CountryRegionCode', 'EnglishCountryRegionName',
       'SpanishCountryRegionName', 'FrenchCountryRegionName', 'PostalCode',
       'SalesTerritoryKey'],
      dtype='object')
To: DimCustomer_clean.csv(18484, 34)
Index(['CustomerKey', 'GeographyKey', 'CustomerAlternateKey', 'Title',
       'FirstName', 'MiddleName', 'LastName', 'NameStyle', 'BirthDate',
       'MaritalStatus', 'Suffix', 'Gender', 'EmailAddress', 'YearlyIncome',
       'TotalChildren', 'NumberChildrenAtHome', 'EnglishEducation',
       'SpanishEducation', 'FrenchEducation', 'EnglishOccupation',
       'SpanishOccupation', 'FrenchOccupation', 'HouseOwnerFlag',
       'NumberCarsOwned', 'AddressLine1', 'AddressLine2', 'Phone',
       'DateFirstPurchase', 'CommuteDistance', 'Title_ismissing',
       'MiddleName_ismissing', 'LastName_ismissing', 'Suffix_ismissing',
       '

In [143]:
#=============================================================================
#=== Validation
#=============================================================================

os.chdir(my_path_CCBISDW)
print("==== Validation Tables ====")

for file in glob.glob("*.csv"):
    # Get table info
    tableName = str(file)[:-4]
    pkNameQuery = "SELECT Col.Column_Name as PkName from INFORMATION_SCHEMA.TABLE_CONSTRAINTS Tab, INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE Col WHERE Col.Constraint_Name = Tab.Constraint_Name AND Col.Table_Name = Tab.Table_Name AND Constraint_Type = 'PRIMARY KEY' AND Col.Table_Name = '" + tableName +"'"
    pkList = list(pandas.read_sql(pkNameQuery, sql_conn)["PkName"])

    # Get data
    df = pandas.read_csv(file, index_col = pkList)
    size_org = df.shape[0]
    print('\nFrom: ' + file + str(df.shape))

==== Validation Tables ====

From: DimCustomer_DW.csv(18484, 37)

From: DimProduct_DW.csv(25, 3)


In [236]:
#=============================================================================
#=== SQL Bulk Insert Procedue
#=============================================================================
class c_bulk_insert:
    def __init__(self, csv_file_nm, db_nm, db_table_nm):
        # Connect to the database, perform the insert, and update the log table.
        
        conn = self.connect_db()
        self.insert_data(conn, csv_file_nm, db_table_nm)
        conn.close
    def connect_db(self):
        # Connect to the server and database with Windows authentication.
        # conn_string = 'DRIVER={SQL Server}; SERVER = localhost; DATABASE=' + db_nm + '; UID=sa; PWD=SQLServer2019; Trusted_Connection=yes'
        conn = pyodbc.connect('DRIVER={SQL Server}; SERVER=localhost; DATABASE=CCBISDW; UID=sa; PWD=SQLServer2019') 
        # conn = pyodbc.connect(conn_string)
        return conn
    def insert_data(self, conn, csv_file_nm, db_table_nm):
        # Insert the data from the CSV file into the database table.
        # Assemble the BULK INSERT query. Be sure to skip the header row by specifying FIRSTROW = 2.
        qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV', FIRSTROW = 2)"
        # Execute the query
        cursor = conn.cursor()
        success = cursor.execute(qry)
        conn.commit()
        cursor.close

In [4]:
import docker
client = docker.from_env()
client.containers.run("alpine", "echo hello world")

b'hello world\n'

In [8]:
#=============================================================================
#=== Copy .csv to Docker Container Procedue
#=============================================================================
import docker
client = docker.from_env()
# src - from file name (in local), shall be an absolute path of fromFile
# dst - to dir (in docker container)
def copy_to_container(src, dst):
    name, dst = dst.split(':')
    container = client.containers.get(name)

    os.chdir(os.path.dirname(src))
    srcname = os.path.basename(src)
    tar = tarfile.open(src + '.tar', mode='w')
    try:
        tar.add(srcname)
    finally:
        tar.close()

    data = open(src + '.tar', 'rb').read()
    container.put_archive(os.path.dirname(dst), data)
# To use
# copy_to_container("C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\DimCustomer_DW.csv", 'SQL_Server_2019:/var/tmp')

In [9]:
# Copy .csv to Docker Container
for file in glob.glob("*.csv"):
    os.chdir(my_path_CCBISDW)
    fileName = os.path.join(my_path_CCBISDW, file)
    tableName = str(file)[:-7]
    toContainerDir = 'SQL_Server_2019:/var/tmp'
    print(tableName)
    print(fileName)
    print(toContainerDir)
    copy_to_container(fileName, toContainerDir)
    #bulk_insert = c_bulk_insert(fileName, 'CCBISDW', tableName) 

DimAgent
C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\DimAgent_DW.csv
SQL_Server_2019:/var/tmp
DimCustomer
C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\DimCustomer_DW.csv
SQL_Server_2019:/var/tmp
DimHandleType
C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\DimHandleType_DW.csv
SQL_Server_2019:/var/tmp
DimProduct
C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\DimProduct_DW.csv
SQL_Server_2019:/var/tmp
DimServiceType
C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\DimServiceType_DW.csv
SQL_Server_2019:/var/tmp
DimSeverifyType
C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\DimSeverifyType_DW.csv
SQL_Server_2019:/var/tmp
FactCDR
C:\MyDataFiles\Data_CCBIS_202107\CCBISDW\FactCDR_DW.csv
SQL_Server_2019:/var/tmp


In [7]:
#=============================================================================
#=== Load CSV to DW
#=============================================================================
os.chdir(my_path_CCBISDW)
print("==== Load to WD ====")
# Set up SQL Server connector
#sql_dw_conn = pyodbc.connect('DRIVER={SQL Server}; SERVER=localhost; DATABASE=CCBISDW; UID=sa; PWD=SQLServer2019')  

cursor = sql_dw_conn.cursor()
for file in glob.glob("*.csv"):
    # Get table info
    tableName = str(file)[:-7]
    #fileName = str(os.path.join(my_path_CCBISDW, file))
    #fileName = os.path.join(toContainerDir,file)
    #toName = 'my-container:/tmp/CCBISDW/DimCustomer_DW.csv'
    #copy_to(fileName, toName)
    toContainerDir = 'SQL_Server_2019:/var/tmp'
    fileName = str(toContainerDir + "/" + tableName + '_DW.csv')
    fileName = str(toContainerDir + "/" + tableName + '_DW.csv')
    #pkNameQuery = "SELECT Col.Column_Name as PkName from INFORMATION_SCHEMA.TABLE_CONSTRAINTS Tab, INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE Col WHERE Col.Constraint_Name = Tab.Constraint_Name AND Col.Table_Name = Tab.Table_Name AND Constraint_Type = 'PRIMARY KEY' AND Col.Table_Name = '" + tableName +"'"
    #pkList = list(pandas.read_sql(pkNameQuery, sql_dw_conn)["PkName"])
    print(tableName)
    print(fileName)
    #from c_bulk_insert import c_bulk_insert
    ########bulk_insert = c_bulk_insert(fileName, 'CCBISDW', tableName) 
    # Get data
    #qry = "SELECT @@SERVERNAME"
    #qry = "use CCBISDW BULK INSERT dbo." + tableName + " FROM '" + toContainerDir + "' WITH (datafiletype = 'char', FIRSTROW = 2, FIELDTERMINATOR ='\t', ROWTERMINATOR ='\n')"
    qry = "use CCBISDW BULK INSERT DimAgent FROM 'SQL_Server_2019:/var/tmp/DimAgent_DW.csv' "
    print(qry)
    cursor.execute(qry)
    qry = "select top (10) * from " + tableName
    print(qry)
    cursor.execute(qry)
    #success = cursor.execute(qry)
    #sql_dw_conn.commit()
cursor.close
    #df = pandas.read_csv(file, index_col = pkList)
    #with open (file, 'r') as f:
    #    reader = csv.reader(f)
    #    columns = next(reader) 

    # Insert DataFrame to Table
    #insert_data(file, sql_dw_conn, fileName, tableName)
        #cursor = sql_dw_conn.cursor()
        #query = "INSERT INTO " + tableName  + 'values ({1})'
        #query = query.format(','.join(columns), ','.join('?' * len(columns)))
        #query = "Use CCBISDW bulk insert dbo." + tableName + "From '" + my_path_CCBISDW + "\" + file + "' With(Datafile = "char", FIRSTROW = 2, FIELDTERMINATOR = ",", ROWTERMINATOR = "0x0a")" 
        #for data in reader:
        #    cursor.execute(query, data)
        #sql_dw_conn.commit()

==== Load to WD ====
DimAgent
SQL_Server_2019:/var/tmp/DimAgent_DW.csv
use CCBISDW BULK INSERT DimAgent FROM 'SQL_Server_2019:/var/tmp/DimAgent_DW.csv' 
select top (10) * from DimAgent
DimCustomer
SQL_Server_2019:/var/tmp/DimCustomer_DW.csv
use CCBISDW BULK INSERT DimAgent FROM 'SQL_Server_2019:/var/tmp/DimAgent_DW.csv' 
select top (10) * from DimCustomer
DimHandleType
SQL_Server_2019:/var/tmp/DimHandleType_DW.csv
use CCBISDW BULK INSERT DimAgent FROM 'SQL_Server_2019:/var/tmp/DimAgent_DW.csv' 
select top (10) * from DimHandleType
DimProduct
SQL_Server_2019:/var/tmp/DimProduct_DW.csv
use CCBISDW BULK INSERT DimAgent FROM 'SQL_Server_2019:/var/tmp/DimAgent_DW.csv' 
select top (10) * from DimProduct
DimServiceType
SQL_Server_2019:/var/tmp/DimServiceType_DW.csv
use CCBISDW BULK INSERT DimAgent FROM 'SQL_Server_2019:/var/tmp/DimAgent_DW.csv' 
select top (10) * from DimServiceType
DimSeverifyType
SQL_Server_2019:/var/tmp/DimSeverifyType_DW.csv
use CCBISDW BULK INSERT DimAgent FROM 'SQL_Serv

<function Cursor.close>