### This notebook is about steps involved in taking file from local file system and writing it to local database

In [5]:
#It uses database connectivity libraries, python and pandas

import pandas as pd
import psycopg2
import os

In [8]:
os.listdir(os.curdir)

['.ipynb_checkpoints',
 'clusterdash.config',
 'excelconvertScheduler.ipynb',
 'fileCSV',
 'fileupload',
 'helperFunctions.py']

Problem statement:

Excel file is uploaded to particular folder on regular intervals, say 15 minutes
File needs to be converted to CSV file and written back to same folder
The data in the file needs to be written to a database connected to Airflow

1) Locate the file location where it will be regularly updated by the absolute path

2) Write the function to read the file into pandas, create dataframe, drop unncessary columns

3) Write the function that writes the dataframe to csv file and stores it back to destination location

4) Write the database connectivity function that creates the table in Postgresql database

5) Deploy the Airflow DAG

5) Write the unit tests for these functions 

6) Build a Ci-Cd pipeline for checking these functions

7) Re-deploy the DAG


In [2]:
source = "/run/media/solverbot/repoA/gitFolders/dashBoard Designs/fsPipelines/fileupload/"
dest = "/run/media/solverbot/repoA/gitFolders/dashBoard Designs/fsPipelines/fileCSV/"

In [4]:
#Take the last file in the directory list
os.listdir(source)[1]

'our compay made progress in 2020.xlsx'

In [4]:
sourcePath = source + os.listdir(source)[-1]

In [5]:
name = os.listdir(source)[-1]

In [6]:
sourceDF = pd.read_excel(sourcePath,sheet_name='DataSource',header=0)

In [7]:
sourceDF.head()

Unnamed: 0,Year,Ticket Types,Age-Group,Gender,Location,Ticket Price,Audience,Location-Filter,Year-Filter,Ticket-Filter,Unnamed: 10,Unnamed: 11
0,2020,VIP,39-41,Male,Delta,100000,1,False,True,True,,
1,2020,VVIP,39-42,Female,Ebonyi,500000,3,False,True,True,,120140000.0
2,2020,Regular,43-45,Male,Edo,20000,1,False,True,True,,
3,2020,VIP,46-48,Female,Ekiti,100000,1,False,True,True,,
4,2020,VIP,49-51,Male,Imo,100000,1,False,True,True,,


In [8]:
frameCols = sourceDF.columns
dropColumns = []
for x in frameCols:
    if x.split(':')[0] == 'Unnamed':
        dropColumns.append(x)
sourceDF.drop(dropColumns,inplace=True,axis=1)

In [9]:
#Formatting the columns headers to 
frameCols = sourceDF.columns
cols = []
for x in frameCols:
    temp = x.replace(" ",'_')
    temp = temp.replace("-","_")
    temp = temp.replace(":","_")
    cols.append(temp)
sourceDF.columns = cols

In [10]:
#transformation function
def transformXL(fileLocation, worksheet = 'DataSource'):
    
    sourceDF = pd.read_excel(sourcePath,sheet_name='DataSource',header=0)
    
    frameCols = sourceDF.columns
    dropColumns = []
    
    for x in frameCols:
        if x.split(':')[0] == 'Unnamed':
            dropColumns.append(x)
    sourceDF.drop(dropColumns,inplace=True,axis=1)
    
    #Formatting the columns headers to 
    frameCols = sourceDF.columns
    cols = []
    for x in frameCols:
        temp = x.replace(" ",'_')
        temp = temp.replace("-","_")
        temp = temp.replace(":","_")
        cols.append(temp)
        
    sourceDF.columns = cols
    
    return sourceDF

In [11]:
destname = name.split('.')[0]+'.csv'
destname = destname.replace(' ','_')
destname = dest + destname
destname

'/run/media/solverbot/repoA/gitFolders/dashBoard Designs/fsPipelines/fileCSV/Tick_Sales_Dashboard.csv'

In [12]:
sourceDF.to_csv(destname,index=False)

In [17]:
def writeFS(dataframe, filename):
    dataframe.to_csv(filename,index=False)

In [16]:
os.listdir(dest)

['Tick_Sales_Dashboard.csv']

In [18]:
import configparser
import warnings
warnings.filterwarnings('ignore')

In [19]:
config = configparser.ConfigParser()
config.read('clusterdash.config')

['clusterdash.config']

In [None]:
os

In [20]:
db = config['POSTGRES']['PG_DB']
user = config['POSTGRES']['PG_UNAME']
passwd = config['POSTGRES']['PG_PASS']
port = config['POSTGRES']['PG_PORT']
host = config['POSTGRES']['PG_HOST']

In [23]:
credentials = "postgresql://{}:{}@{}:{}/{}".format(user,passwd,host,port,db)

In [24]:
#using psycopg2 to establish connection and keep the cursor ready

import psycopg2
try:
    conn = psycopg2.connect(host=host,dbname=db,user=user,password=passwd,port=port)
except Exception as e:
    print(e)
    
conn.set_session(autocommit=True)

try:
    cur = conn.cursor()
    
except:
    print(e)

In [62]:
#Using pandas read_sql for getting schema
def getSchema(tableName, credentials):
    schema = pd.read_sql("""SELECT * FROM information_schema.columns where table_name='{}'""".format(tableName),con=credentials)
    return schema

#Issue is in using pd.read_sql to write data to the database. so using psycopg2
def queryTable(query):
    try:
        schema = cur.execute(query)

    except Exception as e:
        print(e)
        
#This doesn't return anything

#Using the pd.read_sql for getting data from db
def queryBase(query):
    requiredTable = pd.read_sql(query,con=credentials)
    return requiredTable

#This returns the dataframe

def schemaGen(dataframe, schemaName):
    localSchema = pd.io.sql.get_schema(dataframe,schemaName)
    localSchema = localSchema.replace('TEXT','VARCHAR(255)').replace('INTEGER','VARCHAR').replace('\n','').replace('"',"")
    return "".join(localSchema)

def createCopyQuery(csvFilePath, tableName):
    copyCsvData = f"""COPY {tableName} from '{csvFilePath}' DELIMITER ',' CSV HEADER"""
    return copyCsvData

In [64]:
def writeToDb(config, dataframe, tableName, fileLocation):
    import psycopg2
    
    #database connection data
    db = config['POSTGRES']['PG_DB']
    user = config['POSTGRES']['PG_UNAME']
    passwd = config['POSTGRES']['PG_PASS']
    port = config['POSTGRES']['PG_PORT']
    host = config['POSTGRES']['PG_HOST']

    try:
        conn = psycopg2.connect(host=host,dbname=db,user=user,password=passwd,port=port)
        cur = conn.cursor()

    except Exception as e:
        print(e)

    conn.set_session(autocommit=True)
    
    tableCreationQuery = schemaGen(dataframe,tableName)
    
    #Create table
    queryTable(tableCreationQuery)
    
    #Create file copy query
    copyQuery = createCopyQuery(fileLocation,tableName)
    
    #Write data to the database
    
    queryTable(copyQuery)
    
    print(f'Completed. Check the database by querying it with Select * FROM {tableName}')

In [65]:
table_name = name.split('.')[0].replace(' ','_')

In [56]:
tableCreationQuery = schemaGen(sourceDF,name.split('.')[0].replace(' ','_'))

In [57]:
tableCreationQuery

'CREATE TABLE Tick_Sales_Dashboard (Year VARCHAR,  Ticket_Types VARCHAR(255),  Age_Group VARCHAR(255),  Gender VARCHAR(255),  Location VARCHAR(255),  Ticket_Price VARCHAR,  Audience VARCHAR,  Location_Filter VARCHAR,  Year_Filter VARCHAR,  Ticket_Filter VARCHAR)'

In [58]:
queryTable(tableCreationQuery)

In [60]:
copyQuery = createCopyQuery(destname,name.split('.')[0].replace(' ','_'))

In [61]:
queryTable(copyQuery)

In [67]:
writeToDb(config, sourceDF, table_name, destname)

Completed. Check the database by querying it with Select * FROM Tick_Sales_Dashboard


In [71]:
callSource = source + os.listdir(source)[1]

In [72]:
callCenterDF = transformXL(callSource)

In [73]:
callCenterDF.head(2)

Unnamed: 0,Year,Ticket_Types,Age_Group,Gender,Location,Ticket_Price,Audience,Location_Filter,Year_Filter,Ticket_Filter
0,2020,VIP,39-41,Male,Delta,100000,1,False,True,True
1,2020,VVIP,39-42,Female,Ebonyi,500000,3,False,True,True


In [77]:
csvFile = os.listdir(source)[1].split('.')[0]+".csv"
newDest = dest + csvFile.replace(' ','_')
newDest

'/run/media/solverbot/repoA/gitFolders/dashBoard Designs/fsPipelines/fileCSV/Call_Center_KPI_Dashboard.csv'

In [78]:
#write csv file
writeFS(callCenterDF,newDest)

In [80]:
os.listdir(dest)

['Call_Center_KPI_Dashboard.csv', 'Tick_Sales_Dashboard.csv']

In [81]:
newTableName = os.listdir(source)[1].split('.')[0].replace(' ','_')
newTableName

'Call_Center_KPI_Dashboard'

In [82]:
writeToDb(config, callCenterDF, newTableName, newDest)

Completed. Check the database by querying it with Select * FROM Call_Center_KPI_Dashboard
