### Part 1

**Querying Data from a DB into a Pandas DataFrame or a CSV file**

In [None]:
import pyodbc
import csv
import pandas as pd


#### DATABASE CREDENTIALS ####
server = 'SERVERNAME'
database = 'DATABASENAME'
username = 'USER'
password = 'PASSWORD'
conn = pyodbc.connect('DRIVER={DRIVER NAME WITH BRACKETS INCLUDED};SERVER='+server+';
		      DATABASE='+database+';UID='+username+';PWD='+ password)
conn_str = 'DRIVER={DRIVER NAME WITH BRACKETS INCLUDED};SERVER='+server+';
		      DATABASE='+database+';UID='+username+';PWD='+ password
cursor = conn.cursor()



#### CREATE A FUNCTION THAT FETCHES DATA USING AN SQL QUERY AND RETURNS IT IN A PYTHON LIST ####
def run_sql(sql):
	sql = sql
	with pyodbc.connect(conn_str) as conn:
		cursor = conn.cursor()
		conn.autocommit = True
		cursor.execute(sql)
		return [list(row) for row in cursor.fetchall()]
		cursor.close()
		del cursor


#### WRITE YOUR SQL ####
sql = ''' WRITE YOUR SQL HERE ''''


#### USE THE FUNCTION TO PUT YOUR QUERY DATA INTO A LIST ####

data = db.run_sql(sql)

#### IF YOU WANT IT IN A PANDAS DATAFRAME DO THIS ####

#### Write the list of lists into a CSV file
write_file = '~/directory/filename.csv'
with open(write_file,'w') as f:
    writer = csv.writer(f, delimiter = ',', lineterminator='\n')
    writer.writerows(data)


#### Create a list of column names based on your data source
names = ['column 1','column2','column3',[n]...]

#### Read the file into a DataFrame
df = pd.read_csv(write_file, names = names, usecols=names)

When SQL isn’t sufficient for an analysis or a complex data transformation, Python is probably the answer. But before you can wrangle any data, you have to get the data into memory so you can do stuff with it. Using PYODBC if you’re database is on MS SQL Server or PSYCOPG2 if you’re on Postgres, you can write queries and pull data easily using Python. From there it’s just a matter of getting your data into a format that’s easy to work with. For this I like Pandas. Getting the query data into Pandas is as simple as converting the list to a CSV and then using the pandas read_csv function. Alternatively, you could build the read_csv right into your run_sql function if desired.

### **Sending email alerts with the full error code output**

In [None]:
import smtplib
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import traceback

#### THIS IS THE FUNCTION THAT WILL USE SMTPLIB TO SEND EMAILS VIA PYTHON ####
def send_alert(subject,msg,receivers):
	msg = MIMEText(msg,'html')
	# if '@microsoft.graph.downloadUrl' in files.keys():
	msg['Subject'] = subject
	msg['From'] = USER
	msg['To'] = ','.join(receivers)
	# Send the message via our own SMTP server.
	try:
		s = smtplib.SMTP('smtp.gmail.com','587')
		s.starttls()
		s.login(USER,PASSWORD)
		s.send_message(msg)
		s.quit()
	except smtplib.SMTPException as e:
		print("Error: unable to send email: {}".format(e))

#### INPUT THE EMAIL USER NAME AND PASSWORD AND WHO SHOULD RECEIVE THE EMAIL HERE ####
USER = 'USER'
PASSWORD = 'password'
RECIEVERS = ['reciever1','reciever2',[n]....]


#### THIS IS YOUR MAIN CODE CHUNK OR FUNCTION WITH TRY AND EXCEPT CRITERIA
try:
   some_function()
except Exception as e:
  tb = traceback.format_exc()
  subject = 'There was an error on the script'
  msg = f'Here is the full traceback: {tb}'
  send_alert(subject,msg,RECIEVERS)

Monitoring errors in automated jobs (cron or other) is essential to running data pipelines or other code. I utilize email alerting extensively to alert me when any script I have running on an automated cadence breaks.

To quickly get to the source of an issue I also really like to get the full traceback error message from Python in my email so I know exactly what to look for when I go to fix the script. The traceback package allows you to get this using traceback.format_exc() and then place that as a string in your email message.

The best way to use the code snippet is actually to call the functions into another script where you have the code that you want to monitor. Wrap your code in a try/except statement and then upon exception execute the send_alert function to send a full error report to your email.

Before using this code you need to have a dummy email account setup. Gmail is the simplest. For this example, make sure that you have ‘Less Secure App access turned on’

**Writing a CSV file into a DB (Postgres or SQL Server)**

In [None]:
import pyodbc
from azure.storage.blob import BlockBlobService

#### These are your Azure Blob storage credentials ####
AZB_CREDS = {
	'STORAGEACCOUNTNAME': "ACCOUNT NAME",
	'STORAGEKEY': "###################",
	'CREDENTIAL': 'AzureStorageCredential',
	'FILEFORMAT': 'generic_csv_format', #
}


#### This function creates an external data source on your database that points to blob storage ####
def asdb_create_external_data_source(table_name, container_name):
	q_extds = '''
	CREATE EXTERNAL DATA SOURCE {table_name}_asdb_storage
	WITH
	(
	    TYPE = BLOB_STORAGE,
	    LOCATION='https://{blob_address_here}/{container_name}'
	)
	'''.format(table_name=table_name,container_name=container_name)
	# LOCATION ='wasbs://{container_name}@{storage_account_name}.blob.core.windows.net',
	with pyodbc.connect(conn_str) as conn:
		cursor = conn.cursor()
		conn.autocommit = True
		try:
			cursor.execute(q_extds)
			print('External data source created for the following container: {}'.format(container_name))
		except Exception as e:
			# print 'Error with creating external data source for the following container: {} \nHere is the error: {}'.format(container_name,e)
			pass
		cursor.close()
		del cursor
		conn.autocommit = False


#### This function drops a temporary version of a table with the string _tmp at the end ####
def drop_temp_table(schema, table):
	sql = '''
	DROP TABLE {schema}.{table}_tmp;
	'''.format(table=table, schema=schema)
	with pyodbc.connect(conn_str) as conn:
		cursor = conn.cursor()
		conn.autocommit = True
		try:
			cursor.execute(sql)
			print('Table {schema}.{table}_tmp dropped.'.format(schema=schema,table=table))
		except Exception as e:
			# print 'Error dropping external table dbo.{}\nError: {}'.format(table_name,e) # errors when the table doesn't exist - all good
			pass
		cursor.close()
		del cursor
		conn.autocommit = False


#### This function creates a temporary table from the target table you want to push data to ####
def create_temp_table(schema,table_name):
	with pyodbc.connect(conn_str) as conn:
		cursor = conn.cursor()
		conn.autocommit = True
		sql = '''
		select *
		into {schema}.{table_name}_tmp
		from {schema}.{table_name}
		where 1 = 0;
		'''.format(schema=schema,table_name=table_name)
		# print sql
		cursor.execute(sql)
		print('Created temp table [{}].[{}]'.format(schema,table_name+'_tmp'))
		cursor.close()
		del cursor
		conn.autocommit = False

#### This function inserts a csv file from blob storage into the target table ####
def bulk_insert(schema, table_name, data_source_name, container_file_name, field_terminator=',', row_terminator='0x0a'):
    sql = '''
    BULK INSERT {schema}.{table_name}
    FROM '{container_file_name}'
    WITH (DATA_SOURCE = '{data_source_name}',ROWTERMINATOR='{row_terminator}',FIELDTERMINATOR='{field_terminator}', FORMAT='CSV', CODEPAGE = '65001')
    '''.format(schema=schema,table_name=table_name
      ,container_file_name=container_file_name,data_source_name=data_source_name
      ,field_terminator=field_terminator,row_terminator=row_terminator)
    # print(sql)
    with pyodbc.connect(conn_str) as conn:
      cursor = conn.cursor()
      conn.autocommit = True
      message = 'Success'
      try:
        cursor.execute(sql)
      except Exception as e:
        print('Error with bulk insert: {} \nHere is the error: {}'.format(container_file_name,e))
        message = 'Error with bulk insert: {} \nHere is the error: {}'.format(container_file_name,e)

    cursor.close()
    del cursor
    conn.autocommit = False
    return message



#### This function merges the temporary table with the main table using a list of unique keys ####
def merge_temp_table_with_main(schema,table_name,list_of_unique_fields):
  tbl = '{schema}.{table_name}'.format(schema=schema,table_name=table_name)
  tmp_tbl = tbl+'_tmp'
  unique_where_filter_string = 'AND '.join(['coalesce({tbl}.{col},\'NULL\') = coalesce({tmp_tbl}.{col},\'NULL\')\n'.format(col=c,tbl=tbl,tmp_tbl=tmp_tbl) for c in list_of_unique_fields])
  sql = '''
  delete from {schema}.{table_name}
  where exists (select 1 from {schema}.{table_name}_tmp where {unique_where_filter_string});
  insert into {schema}.{table_name} select * from {schema}.{table_name}_tmp;
  drop table {schema}.{table_name}_tmp;
  '''.format(schema=schema,table_name=table_name
    ,unique_where_filter_string=unique_where_filter_string)
  #print sql
  with pyodbc.connect(conn_str) as conn:
    cursor = conn.cursor()
    conn.autocommit = True
    try:
      cursor.execute(sql)
      print('Successful merge into {}.{}.'.format(schema,table_name))
      message = 'Successful merge into {}.{}.'.format(schema,table_name)
    except Exception as e:
      print('Error with the merge into {}.{}\nError: {}'.format(schema,table_name,e))
      # pass
      message = 'Error with the merge into {}.{}\nError: {}'.format(schema,table_name,e)
    cursor.close()
    del cursor
    conn.autocommit = False
  return message


write_file = '~/directory/file.csv'

### STEP 1: Create a new blob in your container of choice in azure blob storage using your csv file. This function uses the create_blob_from_path function from the azure blob api
container_name = 'NAME OF YOUR CONTAINER'
container_file_name = 'file.csv'.format(today)
blob_service = BlockBlobService(account_name=AZB_CREDS['STORAGEACCOUNTNAME'], account_key=AZB_CREDS['STORAGEKEY'])

blob_service.create_blob_from_path(container_name, container_file_name, write_file) # this always overwrites whatever is in the container
print('Done sending txt file up to AZB ...')

### STEP 2: Create an external data source that ties a specified database table to a container in blob storage
target_schema = 'schema'
table_name = 'table'
data_source_name = table_name+'_asdb_storage' # you can create this on the next line
asdb_create_external_data_source(table_name, container_name)

### STEP 3: If a temporary table already exists, drop it. Then create a new temporary table to push the CSV file to
drop_temp_table(target_schema, table_name)
create_temp_table(target_schema, table_name)

### STEP 4: Insert the CSV file from blob storage into the temporary table
bulk_insert(target_schema,table_name+'_tmp', data_source_name, container_file_name)

### STEP 5: Merge the CSV data into the existing table using merge criteria from the temporary table
unique_columns = ['column1','column2','column3',[n].....]
merge_temp_table_with_main(target_schema,table_name,unique_columns)

This is A LOT of code — but it is one of the most important techniques that I use on a daily basis to get data from any source and push it into a database. The example above is best suited for working with MS SQL Server — specifically Azure SQL Database and Azure Blob Storage. Below the another example that you can use for POSTGRES databases — which is a bit simpler.

The basic idea here is to get the data from a CSV file into your database using a few steps:

1. Push the CSV file into blob storage.
2. Tie the target database table to blob storage by creating an external data source on your SQL database.
3. Create a temporary table where you will insert the CSV file.
4. Bulk insert the CSV file into the temporary table
5. Finally, merge the temporary table into the final table using merge criteria.

There are countless ways to get data where you want it to go. This is the method that I depend on almost every day.

In [None]:
# for the postgres databases
import psycopg2

HOST = 'HOSTNAME'
DB = 'DBNAME'
USER = 'USERNAME'
PASSWORD = 'PASSWORD'
PORT = 'PORT'

conn = psycopg2.connect(host=HOST,database=DB,user=USER,password=PASSWORD,port=PORT)

# cursor = conn.cursor()

# this can bulk insert a file into a table
def copy_file_into(schema, table, file, delimiter=',',null='NULL',columns=[]):
    cursor = conn.cursor()
    fpath = open(file,'r')
    if len(columns) > 0:
        cols_str = ','.join(columns)
        copy_sql = f'''COPY {schema}.{table} ({cols_str}) FROM STDIN WITH CSV DELIMITER E'{delimiter}' QUOTE '"' NULL '{null}';''' # allows there to be embedded commas which in NICE!!
    else:
        copy_sql = f'''COPY {schema}.{table} FROM STDIN WITH CSV DELIMITER E'{delimiter}' QUOTE '"' NULL '{null}';''' # allows there to be embedded commas which in NICE!!
    # copy_sql = f'''COPY {schema}.{table} FROM STDIN WITH CSV DELIMITER E'\t' QUOTE '"' NULL 'NULL';''' # allows there to be embedded commas which in NICE!!
    print(copy_sql)
    cursor.copy_expert(copy_sql, fpath)
    print('Done with the copy')
    cursor.execute('COMMIT;')
    cursor.close()
    del cursor

# use this to merge data from one table to another - you just need to define the sources, targets and primary key fields
def merge_one_table_with_main(source_schema,target_schema,source_table,target_table,list_of_unique_fields):
    # unique_where_filter_string = 'AND '.join(['{target_schema}.{target_table}.{col} = {source_schema}.{source_table}.{col}\n'.format(col=c,target_table=target_table,source_table=source_table,source_schema=source_schema,target_schema=target_schema) for c in list_of_unique_fields])
    unique_where_filter_string = 'AND '.join([f'{target_schema}.{target_table}.{col} = {source_schema}.{source_table}.{col}\n' for col in list_of_unique_fields])
    sql = f'''
    delete from {target_schema}.{target_table}
    where exists (select 1 from {source_schema}.{source_table} where {unique_where_filter_string});
    insert into {target_schema}.{target_table} select * from {source_schema}.{source_table};
    drop table {source_schema}.{source_table};
    COMMIT;'''
    # print(sql)
    cursor = conn.cursor()
    cursor.execute(sql)
    cursor.close()
    del cursor


def create_temp_table(schema, table, temp_table):
   sql = f'''
   DROP TABLE IF EXISTS {schema}.{temp_table};
   SELECT * INTO {schema}.{temp_table} FROM {schema}.{table} WHERE 1 = 0;
   COMMIT;
   '''
   cursor = conn.cursor()
   cursor.execute(sql)
   cursor.close()
   del cursor

**Pulling back multiple pages of data from a REST API**

In [None]:
import requests
import json
import csv



#### PULLING FROM A SYNCHRONOUS ENDPOINT

endpoint = '#######'
url = f'https://place.com/v11.0/{endpoint}/insights'
	r = requests.get(url,params=params)
	apiResults = r.json()


data = []

while(True):
	try:
		for row in apiResults['data']:
			data.append(row)
		# Attempt to make a request to the next page of data, if it exists.
		apiResults=requests.get(apiResults['paging']['next']).json()
	except KeyError:
		# When there are no more pages (['paging']['next']), break from the
		# loop and end the script.
		break

#### PULLING FROM A ASYNCHRONOUS ENDPOINT (REQUEST IS PROCESSED IN THE BACKGROUND)

endpoint = '#######'
url = f'https://place.com/v11.0/{endpoint}'
r = requests.post(url,params=params)
if r.status_code != 200:
	print(f'{r.status_code} error with request: {r.text}\nExiting the program ...')
	raise SystemExit
apiResults = r.json()
print(apiResults)

report_run_id = campaigns.get('report_run_id')
job_status = None

while job_status != 'Job Completed':

		params = {
	            'access_token':'############'
	        }

		url = f'https://place.com/v11.0/{report_run_id}'
		r = requests.get(url,params=params)
		apiResults = r.json()
		job_status = apiResults.get('status')
		if job_status == 'Job Completed':
			print('Job completed...breaking the loop and retrieving the data')
			break
		else:
			print('Job not completed....percent completed: {}'.format(results.get('percent_completion')))
			time.sleep(30)


url = f'https://place.com/v11.0/{report_run_id}'
r = requests.get(url,params=params)
apiResults = r.json()

Whenever I need to create an ETL pipeline with an API integration, I will use some iteration of the above two pieces of code. A lot of APIs use asynchronous requests (where you make a code request and then wait for it to complete in the background), which I also included in the example. To get the code to work, you’ll need to swap out the endpoint URLs and the specific pagination key for the API that you’re using. If the API requires authentication using a client_id/client_secret then you’ll also need to follow an authentication process to acquire an access token. Once you’ve done that, the above code examples will serve as a great starting point to get your integration built.

**Cleaning and formatting REST API JSON results for database insertion**

In [None]:
import requests
import json



##### API URL and URL PARAMETERS GO HERE #####
params = {'######':'######'}
url = 'request_url'


##### CREATE AN EMPTY LIST TO STORE THE JSON DATA #####
table = []

##### MAKE THE REQUEST AND THEN PUT THE REQUEST DATA INTO JSON FORMAT #####
r = requests.get(url, params=params)
data = r.json()

##### LOOP THROUGH EACH JSON OBJECT IN THE LIST AND PUT IT CLEANLY INTO ITS OWN ROW  #####
for item in data['key_where_the_data_lives']:
  ### item.keys() (to figure out what the keys are)
  row = [
  item.get('key1'),
  item.get('key2'),
  item.get('key3'),
  item.get('key4'),
  item.get('key5'),
  item.get('keyN')
  ]
  table.append(row)


##### WRITE THE ROWS OF THE LIST (TABLE) INTO A CSV FILE ######
write_file = '~/directory/filename.csv'
with open(write_file,'w', newline = '') as f:
    writer = csv.writer(f, lineterminator = '\n')
    writer.writerows(table)

I use a variation on the above code snippet for almost any API integration that I write. This one assumes that the data will come back in JSON format — meaning it should work for about 75% of API requests for data. When formatted, this script will organize a JSON data structure into a tabular format (list of lists) which can then be easily inserted into a CSV — making it ready for pushing to a DB. There’s probably a much simpler way to do this but this is the method I’ve been using for years and it has yet to fail me.

**Cleaning a CSV file for insertion into a DB**

In [None]:
import pandas as pd
write_file = '~/directory/filename.csv'
df = pd.read_csv(write_file)
df = df.replace({‘\$’:’’}, regex = True)
df = df.replace({‘%’:’’}, regex = True)
df = df.replace({‘\,’:’’}, regex = True)
df = df.replace({“‘“:’’}, regex = True)
df = df.replace({“nan”:’’}, regex = True)

**Creating To and From Dates**

In [None]:
import datetime
#### Set todays date ####
today = datetime.datetime.now().date()
#### Create to_date by adding or subtracting dates from today's date
to_date = today-datetime.timedelta(1)
#### Create your from date by subtracting the number of days back #### you want to start
from_date = today-datetime.timedelta(7)
#### Create timestamp of today's date using desired format
todaysdate = datetime.datetime.strftime(datetime.datetime.now(),'%Y-%m-%d')

Most API’s require date parameters when requesting data. This example uses Python’s datetime package to easily create to and from dates using the .timedelta() function. I use this all the time to build automatic to/from date calculations into my script based on the day/time that the script is running.

**Process a big Python list in chunks**

In [None]:
def chunker(seq, size):
 return (seq[pos:pos + size] for pos in range(0, len(seq), size))
for group in chunker(order_items,100):
      for item in group:
# Do something to each group

This is a little function I use quite a bit to chunk up really big lists. This goes really will with cleaning and formatting JSON data (shown above) as sometimes you’ll be pulling back a lot of data from an API and want to get it processed and pushed into your DB in manageable chunks. The chunker function allows you to pick how large of chunks you want to process at a time, and then run your code over each chunk. This example uses a 100 item chunk, but you can adjust that number to be whatever you want.