In [None]:
#Example on how to extract from Teradata and load into a table in Cassandra

In [1]:
from sqlalchemy import create_engine
import pandas as pd
from cassandra.cluster import Cluster

In [2]:
username = "your lanid"
password = "your password"
hostname = "hostname"

In [3]:
#Usually the first time you try to connect it will fail. Try running it again and you should be able to connect
db_engine = create_engine("teradata://{username}:{password}@{hostname}/;authentication=LDAP;CHARSET=UTF8;TMODE=ANSI"\
                         .format(username=username, password=password, hostname=hostname))


#While loop statement. Try to connect to Teradata until connection is successful.
db_connection = None

while db_connection is None:
    try:
        db_connection = db_engine.connect()
    except:
        pass

In [4]:
#Extract data to a list data structure example
raw_sql = "select top 10 * from dlbis_mytime.vol_drv_act"
raw_data = list(db_connection.execute(raw_sql))


In [5]:
raw_data

[('T0607', datetime.date(2012, 10, 21), 'UNITS DEPT', 'DELI', Decimal('135')),
 ('T1784', datetime.date(2014, 10, 5), 'UNITS DEPT', 'SOFTLINES', Decimal('16038')),
 ('T1804', datetime.date(2015, 10, 25), 'CARTONS', 'GMLAWN', Decimal('6')),
 ('T2476', datetime.date(2016, 7, 24), 'TRANSACTIONS', 'SDGC', Decimal('90')),
 ('T1972', datetime.date(2016, 10, 16), 'BACKSTOCK TRIPS', 'DAYBULK', Decimal('158')),
 ('T1220', datetime.date(2015, 2, 22), 'UNITS DEPT', 'HARDHARD', Decimal('37974')),
 ('T1100', datetime.date(2016, 1, 17), 'TRANSACTIONS', 'SDCRED', Decimal('102')),
 ('T0669', datetime.date(2016, 3, 27), 'UNITS DEPT', 'DELI', Decimal('103')),
 ('T0227', datetime.date(2012, 11, 18), 'PULL ITEMS WORKED', 'OTHER', Decimal('68')),
 ('T2011', datetime.date(2017, 1, 8), 'TRANSACTIONS', 'REGCRED', Decimal('2219'))]

In [6]:
#Extract data to a pandas dataframe example
pandas_sql = "select top 10 * from dlbis_mytime.vol_drv_Act"
pandas_df = pd.read_sql_query(pandas_sql, db_connection)

pandas_df

Unnamed: 0,location,cal_wk_beg_d,vd,node_td,actual_units
0,T0607,2012-10-21,UNITS DEPT,DELI,135.0
1,T1784,2014-10-05,UNITS DEPT,SOFTLINES,16038.0
2,T1804,2015-10-25,CARTONS,GMLAWN,6.0
3,T2476,2016-07-24,TRANSACTIONS,SDGC,90.0
4,T1972,2016-10-16,BACKSTOCK TRIPS,DAYBULK,158.0
5,T1220,2015-02-22,UNITS DEPT,HARDHARD,37974.0
6,T1100,2016-01-17,TRANSACTIONS,SDCRED,102.0
7,T0669,2016-03-27,UNITS DEPT,DELI,103.0
8,T0227,2012-11-18,PULL ITEMS WORKED,OTHER,68.0
9,T2011,2017-01-08,TRANSACTIONS,REGCRED,2219.0


In [7]:
#Connect to a Cassandra cluster
#12.34.56.78 = Bentobox node
cluster = Cluster(
    contact_points=['12.34.56.78'], port=9042)
#Connect to grocery keyspace
session = cluster.connect('mykeyspace')

In [8]:
def load_rows():
    'Create list of batch numbers from the index column'
    batch_num = pandas_df.index
    batch_list = batch_num.tolist()
    'Set end parameter to the max value of the batch list'
    end = max(batch_list)
    n = 0

    while n <= end:
        getRow = pandas_df[pandas_df.index == n]
        'Convert columns to string so we can get rid of the index column'
        data = getRow['location'].to_string(index=False, header=False)
        data2 = getRow['cal_wk_beg_d'].to_string(index=False, header=False)
        data3 = getRow['vd'].to_string(index=False, header=False)
        data4 = getRow['node_td'].to_string(index=False, header=False)
        data5 = getRow['actual_units'].to_string(index=False, header=False)
        
        'Convert strings into int and float'
        actual_units = float(data5)
        
        'Load row into Cassandra table'
        session.execute("""INSERT INTO vol_drv_act (cal_wk_beg_d, location, vd, node_td, actual_units) 
        values (%s, %s, %s, %s, %s)""", 
                        (data2,data,data3,data4,actual_units))
        print('Row',n,'loaded into Cassandra')
        n = n + 1


In [47]:
load_rows()

Row 0 loaded into Cassandra
Row 1 loaded into Cassandra
Row 2 loaded into Cassandra
Row 3 loaded into Cassandra
Row 4 loaded into Cassandra
Row 5 loaded into Cassandra
Row 6 loaded into Cassandra
Row 7 loaded into Cassandra
Row 8 loaded into Cassandra
Row 9 loaded into Cassandra


In [48]:
#Check results in Cassandra
getresults = session.execute("select * from vol_drv_act")

for x in getresults:
    print(x)

Row(cal_wk_beg_d=datetime.datetime(2016, 1, 10, 0, 0), location='T1858', vd='CARTONS', node_td='REPSHOE', actual_units=107.21175384521484)
Row(cal_wk_beg_d=datetime.datetime(2014, 8, 10, 0, 0), location='T1091', vd='CARTONS', node_td='GMSB', actual_units=724.888916015625)
Row(cal_wk_beg_d=datetime.datetime(2012, 4, 8, 0, 0), location='T0055', vd='TRANSACTIONS', node_td='REGCOU', actual_units=1592.0)
Row(cal_wk_beg_d=datetime.datetime(2016, 3, 20, 0, 0), location='T1378', vd='BACKSTOCK TRIPS', node_td='DAYUPPER', actual_units=309.0)
Row(cal_wk_beg_d=datetime.datetime(2013, 2, 3, 0, 0), location='T1960', vd='PULL TRIPS', node_td='DAYUPPER', actual_units=1216.0)
Row(cal_wk_beg_d=datetime.datetime(2016, 11, 13, 0, 0), location='T1878', vd='PRODUCTION UNITS', node_td='SBBLENDER', actual_units=355.0)
Row(cal_wk_beg_d=datetime.datetime(2012, 4, 29, 0, 0), location='T1360', vd='PULL ITEMS WORKED', node_td='MEAT', actual_units=632.0)
Row(cal_wk_beg_d=datetime.datetime(2014, 4, 13, 0, 0), locati

In [49]:
#Close out teradata connection when finish
db_connection.close()