In [1]:
# Install required packages
#!pip install kaggle mysql-connector-python numpy pandas boto3 sqlalchemy pymysql

In [2]:
# Import all the required moules
import pandas as pd
import numpy as np
import mysql.connector #For database connection(data modelling)
import pymysql
from sqlalchemy import create_engine
import os
import shutil 
from kaggle.api.kaggle_api_extended import KaggleApi #To connect to Kaggle and get the dataset
import boto3 #for S3 conection
from tqdm import tqdm

In [3]:
# GETTING DATA FROM KAGGLE

# A function that gets the data from cagle and saves it in the current working dirsctory. It returns a dataframe with the dataset

def get_data_from_kaggle():
    # Initialize and autenticate the Kaggle API client

    api = KaggleApi()
    api.authenticate()

    # Create a variable 'dataset_name' for the dataset we want from Kaggle
    dataset_name = 'thedevastator/analyzing-credit-card-spending-habits-in-india'


    # Specify the csv file we want to load from the dataset and store in 'file_name' variable
    file_name = 'Credit card transactions - India - Simple.csv'

    # we will download the file. It wll be stored in the path 'Credit card transactions - India - Simple.csv\Credit card transactions - India - Simple.csv'
    # on our current working directory
    api.dataset_download_files(dataset_name, file_name, unzip=True)
    
    csv_file_path = r"Credit card transactions - India - Simple.csv\Credit card transactions - India - Simple.csv"
    
    return pd.read_csv(csv_file_path, index_col = 0)


get_data_from_kaggle()

Unnamed: 0_level_0,City,Date,Card Type,Exp Type,Gender,Amount
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,"Delhi, India",29-Oct-14,Gold,Bills,F,82475
1,"Greater Mumbai, India",22-Aug-14,Platinum,Bills,F,32555
2,"Bengaluru, India",27-Aug-14,Silver,Bills,F,101738
3,"Greater Mumbai, India",12-Apr-14,Signature,Bills,F,123424
4,"Bengaluru, India",5-May-15,Gold,Bills,F,171574
...,...,...,...,...,...,...
26047,"Kolkata, India",22-Jun-14,Silver,Travel,F,128191
26048,"Pune, India",3-Aug-14,Signature,Travel,M,246316
26049,"Hyderabad, India",16-Jan-15,Silver,Travel,M,265019
26050,"Kanpur, India",14-Sep-14,Silver,Travel,M,88174


In [4]:
# Initialize AWS S3 client

# Create a function that creates a connecton with s3. It returns a tuple with S3 resource object and S3Client object
def initialize_sc3():
    # Create a S3 resource object for downloading data
    s3 = boto3.resource('s3',
          aws_access_key_id='',
          aws_secret_access_key='')

    # Create a S3 client object for uploading data 
    s3client = boto3.client('s3',
          aws_access_key_id='',
          aws_secret_access_key='')

    # Create a variable for S3 Bucket name
    bucket_name = 'credit-card-data-project'
    
    return s3, s3client, bucket_name

initialize_sc3()

(s3.ServiceResource(),
 <botocore.client.S3 at 0x203b657be20>,
 'credit-card-data-project')

In [5]:
# This code segments is for uploading files to S3. 

# Create a program that incrementally ingests data to S3 Bucket. It should run on lambda
# The function first checks if the file 'last_transactio_id.txt' is in our S3 bucket. If it is not, then this is the first time 
# data is being uploaded to theS3 bucket. It will create a new text file with only the last transaction id in the csv file. It
# will then upload the csv file and the created txt file.
# If the txt file was found in the S3 bucket, then it is not the first time uploading data. The function will read the txt file 
# which contains the last transaction id only. It will fetch data from kaggle and if it finds new data, it will append it to the
# csv file in S3 Bucket, not replacing the csv, hence achieving incremental ingestion

def upload_data_to_S3(S3_resource_object, S3_client_object, bucket_name):
    # Lets get the files in our S3 bucket
    s3_bucket = S3_resource_object.Bucket(bucket_name)
    bucket_files = [s3_file.key for s3_file in s3_bucket.objects.all()]

    if 'last_transaction_id.txt' not in bucket_files:
        
        # This will run for the first time only.
        # This code segment will upload the initial csv file and a txt file that tracks the last transaction ID n the csv file to 
        # S3 bucket
        
        print('First time initialization')
        
        #Get data from Kaggle
        data = get_data_from_kaggle()
    
        #Get the last transaction ID of the data
        latest_transaction_id = data.index[-1]


        # Create the txt file that shows the last entered transaction ID. This will be used to achieve ingestion incremental
        with open('last_transaction_id.txt', "w") as last_transaction_id:
            # Write text to the file
            last_transaction_id.write(str(latest_transaction_id))


        csv_file_path = r"Credit card transactions - India - Simple.csv\Credit card transactions - India - Simple.csv"
        csv_filename = 'data.csv'

        txt_file_path = r"last_transaction_id.txt"
        txt_filename = 'last_transaction_id.txt'


        # Upload the txt file and the transaction csv to S3
        S3_client_object.upload_file(csv_file_path, bucket_name, csv_filename)
        S3_client_object.upload_file(txt_file_path, bucket_name, txt_filename)

        print('Files uploaded successfuly')

    else:
        print('Initializing incremental ingestion')        

        #Get the last transaction id of the data in S3 bucket
        response = S3_client_object.get_object(Bucket=bucket_name, Key='last_transaction_id.txt')
        textfile_content = response['Body'].read().decode('utf-8')
        last_inserted_id= int(textfile_content)
        
        #Get new updated data from Kaggle and store it in updated_df dataframe
        updated_df = get_data_from_kaggle()
        
        #Get the id of the last transaction 
        latest_transaction_id = updated_df.index[-1]
        
        
        if latest_transaction_id > last_inserted_id:
            new_data = updated_df.loc[last_inserted_id+1:,:]
            
            print(f'New data found: {len(new_data)} new records')
            
            # Get the csv to add new data
            response = S3_client_object.get_object(Bucket=bucket_name, Key='data.csv')
            object_content = response['Body']
            old_data = pd.read_csv(object_content)
           
            # Add the new data to the csv
            updated_data = pd.concat([old_data, new_data])
            
            # Update last transaction id in txt_file
            with open('last_transaction_id.txt', "w") as last_transaction_id:
                # Write text to the file
                last_transaction_id.write(str(latest_transaction_id))
                
            
            updated_data.to_csv('updated.csv')

            # Update the txt file and the transaction csv to S3
            S3_client_object.upload_file('updated.csv', bucket_name, 'data.csv')
            S3_client_object.upload_file('last_transaction_id.txt', bucket_name, 'last_transaction_id.txt')

            print('Data ingested sucessfully')
        else:
            print('Data is up to date!')
    try:
        shutil.rmtree('Credit card transactions - India - Simple.csv')
        os.remove('last_transaction_id.txt')
        os.remove('updated.csv')
        
    except:
        pass
            
               
s3_objects = initialize_sc3()
upload_data_to_S3(s3_objects[0], s3_objects[1], s3_objects[2]) 

Initializing incremental ingestion
Data is up to date!


In [25]:
# data modelling


# This function performs all the data modelling tasks. 

# First, it checks if it is the first time and created the database tables 
# and their relationships. It then models the data and inserts the data into appropriate tables.

# If it finds that the database tables were created and have data in them, it checks if there is 
# new data from the provided dataframe. If it finds new data, it moels it appropriately and uppend data into appropriate 
# tables.

def data_modelling(dataframe):
    # Database credentials
    # Host: sql9.freesqldatabase.com
    # Database name: sql9648234
    # Database user: sql9648234
    # Database password: eCsWegQqsf
    # Port number: 3306

    # create a connection to the database
    
    df = dataframe
    db = mysql.connector.connect(
        host = 'myhost',
        user = 'myusername',
        password = 'pass',
        database = 'db'
    )

    # create a cursor object to eecute SQL commannds
    mycursor = db.cursor()

    #Create an sql_engine
    engine = create_engine(r'mysql+pymysql://user:pass@dbhost/db')


    # mycursor.execute('DROP TABLE IF EXISTS transactions, card_type, expense_type, gender;')

    # Check the tables in the database. This is to determine if it is a first time set up or update
    mycursor.execute("SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_type = 'BASE TABLE'")
    tables = [i[0] for i in mycursor]

    if 'card_type' not in tables:
        #Database tables not found. This means it is a first time set up. Create all the tables and relationships.
        print('Creating database tables')
        mycursor.execute("CREATE TABLE card_type(card_type_id INT PRIMARY KEY,card_name VARCHAR(100))")
        mycursor.execute("CREATE TABLE expense_type("
                         "expense_type_id INT PRIMARY KEY,"
                         "expense_type VARCHAR(100));")
        mycursor.execute("CREATE TABLE gender("
                         "gender_id INT PRIMARY KEY,"
                         "gender VARCHAR(100));")
        mycursor.execute("CREATE TABLE transactions("
                        "transaction_id INT PRIMARY KEY,"
                        "city VARCHAR(1000),"
                        "Date DATE,"
                        "card_type_id INT,"
                        "expense_type_id INT,"
                        "gender_id INT,"
                        "amount INT,"
                        "FOREIGN KEY (card_type_id) REFERENCES card_type(card_type_id) ON DELETE SET NULL,"
                        "FOREIGN KEY (expense_type_id) REFERENCES expense_type(expense_type_id) ON DELETE SET NULL,"
                        "FOREIGN KEY (gender_id) REFERENCES gender(gender_id) ON DELETE SET NULL) ;")

        print('Tables created successfully')

        # Insert data into the tables.
        # The database was normalized, so I have to get data for all the tables from the database

        #create a dictionary of the different card types and corresponding index. The dictionary will look like {card type(eg. Gold): card_number(eg. 1)}
        card_type_dict = {value:index for index, value in enumerate(list(df['Card Type'].unique()), start=1)}

        #create a dictionary of the different expense types and corresponding index. The dictionary will look like {'Bills': 1,'Food': 2, ...}
        expense_type_dict = {value:index for index, value in enumerate(list(df['Exp Type'].unique()), start=1)}

        #create a dictionary of the different genders and corresponding index. The dictionary will look like {'M': 1,'F': 2}
        gender_dict = {value:index for index, value in enumerate(list(df['Gender'].unique()), start=1)}

        #Make a copy of the dataframe to transactions_df. This datarame will contain the normalized data for the transactions table
        transactions_df = df.copy()

        #Update the transactions dataframe to incorporate the foreign keys of the other tables. Instead of gender column showing M 
        #and F, It will show 1 for male and 2 for Female. Same forExpense type and card type, instead of Gold, silver,Platinum etc 
        #it will show the corresponding keys(numbes).

        transactions_df.Gender = df.Gender.apply(lambda x: x.replace(x, f'{gender_dict[x]}'))
        transactions_df['Exp Type'] = df['Exp Type'].apply(lambda x: x.replace(x, f'{expense_type_dict[x]}'))
        transactions_df['Card Type'] = df['Card Type'].apply(lambda x: x.replace(x, f'{card_type_dict[x]}'))

        #Convert the date column to the right format YYYY-MM-DD
        transactions_df.Date = df.Date.apply(lambda x: pd.to_datetime(x))

        #Replace transactions_df columnheader tomatchheaders in the database
        col = ['transaction_id', 'city', 'Date', 'card_type_id', 'expense_type_id', 'gender_id', 'amount']
        transactions_df.reset_index(inplace = True)
        transactions_df.columns= col
        transactions_df = transactions_df.set_index('transaction_id')


        #Insert the card_type data into the card type table
        for index in card_type_dict:
            mycursor.execute(f"INSERT INTO card_type VALUES({card_type_dict[index]},'{index}')")
            db.commit()

        print(f'{len(card_type_dict)} records inserted into card_type table successfully')

        #Insert the expense_type data into the expense_type table
        for index in expense_type_dict:
            mycursor.execute(f"INSERT INTO expense_type VALUES({expense_type_dict[index]},'{index}')")
            db.commit()

        print(f'{len(expense_type_dict)} records inserted into expense_type table successfully')


        #Insert the gender data into the gender table
        for index in gender_dict:
            mycursor.execute(f"INSERT INTO gender VALUES({gender_dict[index]},'{index}')")
            db.commit()

        print(f'{len(gender_dict)} records inserted into gender table successfully')

        #Insert the transactions data into the transactions table

        transactions_df.to_sql('transactions', con=engine,if_exists='append')

        print(f'Inserted {len(transactions_df.reset_index().values.tolist())} records into transactions table.')

    else:
        print('Data update')
        # Check if there are new records in the data

        # Get the transaction ID of the last record in the database 
        mycursor.execute('SELECT transaction_id FROM transactions ORDER BY transaction_id DESC LIMIT 1;')
        last_transaction_id_in_database = [i for i in mycursor][0][0]

        # Get the transaction ID of the last record in the dataframe 
        last_id_in_dataframe = df.index[-1]

        if last_id_in_dataframe > last_transaction_id_in_database:
            # It means there is some new records. get the new rexords only and append to our db.
            new_data_df = df.loc[last_transaction_id_in_database+1:,:]





            # From the new data, check if there is a new card type. It there is one, add it to db.
            card_types_in_new_data = list(new_data_df['Card Type'].unique())


            mycursor.execute('SELECT card_name FROM card_type')
            card_types = [i[0] for i in mycursor]
            mycursor.execute('SELECT card_type_id FROM card_type ORDER BY card_type_id DESC LIMIT 1')
            last_card_type_id = [i[0] for i in mycursor][0]
            new_card_types = []
            for i in card_types_in_new_data:
                if i not in card_types:
                    new_card_types.append(i)

            new_card_types_dict = {}
            if len(new_card_types) != 0:
                new_card_types_dict = {value:index for index, value in enumerate(new_card_types, start=last_card_type_id+1)}

                for index in new_card_types_dict:
                    mycursor.execute(f"INSERT INTO card_type VALUES({new_card_types_dict[index]},'{index}')")
                    db.commit()


                print(f'{len(new_card_types_dict)} new records inserted into card_type table successfully')
            else:
                print('No new card types')


            # From the new data, check if there is a new expense type. It there is one, add it to db.

            expense_types_in_new_data = list(new_data_df['Exp Type'].unique())


            mycursor.execute('SELECT expense_type FROM expense_type')
            expense_types = [i[0] for i in mycursor]
            mycursor.execute('SELECT expense_type_id FROM expense_type ORDER BY expense_type_id DESC LIMIT 1')
            last_expense_type_id = [i[0] for i in mycursor][0]
            new_expense_types = []

            for i in expense_types_in_new_data:
                if i not in expense_types:
                    new_expense_types.append(i)

            new_expense_types_dict = {}
            if len(new_expense_types) != 0:
                new_expense_types_dict = {value:index for index, value in enumerate(new_expense_types, start=last_expense_type_id+1)}

                for index in new_expense_types_dict:
                    mycursor.execute(f"INSERT INTO expense_type VALUES({new_expense_types_dict[index]},'{index}')")
                    db.commit()


                print(f'{len(new_expense_types_dict)} new records inserted into expense_type table successfully')
            else:
                print('No new expense types')


            # From the new data, check if there is a new expense type. It there is one, add it to db.

            genders_in_new_data = list(new_data_df['Gender'].unique())


            mycursor.execute('SELECT gender FROM gender')
            gender_types = [i[0] for i in mycursor]
            mycursor.execute('SELECT gender_id FROM gender ORDER BY gender_id DESC LIMIT 1')
            last_gender_id = [i[0] for i in mycursor][0]
            new_genders = []

            for i in genders_in_new_data:
                if i not in gender_types:
                    new_genders.append(i)

            new_genders_dict = {}
            if len(new_genders) != 0:
                new_genders_dict = {value:index for index, value in enumerate(new_genders, start=last_gender_id+1)}

                for index in new_genders_dict:
                    mycursor.execute(f"INSERT INTO gender VALUES({new_genders_dict[index]},'{index}')")
                    db.commit()


                print(f'{len(new_genders_dict)} new records inserted into gender table successfully')
            else:
                print('No new genders')

            #Model the transactions dataframe to have freign keys appropriatelty and append the new records into the db

            #create a dictionary of the updated card types and corresponding index from the db. The dictionary will look like {'Gold': 1,'Silver': 2, ...}
            mycursor.execute('SELECT * FROM card_type')
            card_types = [i for i in mycursor]
            card_type_dict = {i[1]:i[0] for i in card_types}

            #create a dictionary of the updated expense types and corresponding index from the db. The dictionary will look like {'Bills': 1,'Food': 2, ...}

            mycursor.execute('SELECT * FROM expense_type')
            expense_types = [i for i in mycursor]
            expense_type_dict = {i[1]:i[0] for i in expense_types}
            print(expense_type_dict)

            #create a dictionary of the updated genders and corresponding index from the db. The dictionary will look like {'M': 1,'F': 2, ...}

            mycursor.execute('SELECT * FROM gender')
            gender_types = [i for i in mycursor]
            gender_dict = {i[1]:i[0] for i in gender_types}
            print(gender_dict)


            #Make a copy of the dataframe to transactions_df. This datarame will contain the normalized data for the transactions table
            transactions_df = new_data_df.copy()

            #Update the transactions dataframe to incorporate the foreign keys of the other tables. Instead of gender column showing M 
            #and F, It will show 1 for male and 2 for Female. Same forExpense type and card type, instead of Gold, silver,Platinum etc 
            #it will show the corresponding keys(numbes).

            transactions_df.Gender = df.Gender.apply(lambda x: x.replace(x, f'{gender_dict[x]}'))
            transactions_df['Exp Type'] = df['Exp Type'].apply(lambda x: x.replace(x, f'{expense_type_dict[x]}'))
            transactions_df['Card Type'] = df['Card Type'].apply(lambda x: x.replace(x, f'{card_type_dict[x]}'))

            #Convert the date column to the right format YYYY-MM-DD
            transactions_df.Date = df.Date.apply(lambda x: pd.to_datetime(x))

            #Replace transactions_df columnheader tomatchheaders in the database
            col = ['transaction_id', 'city', 'Date', 'card_type_id', 'expense_type_id', 'gender_id', 'amount']
            transactions_df.reset_index(inplace = True)
            transactions_df.columns= col
            transactions_df = transactions_df.set_index('transaction_id')


            #Append the new transactions data into the transactions table

            transactions_df.to_sql('transactions', con=engine,if_exists='append')

            print(f'Added {len(transactions_df.reset_index().values.tolist())} new records into transactions table.')

        else:
            print('No new data to ingest')

            
df = get_data_from_kaggle()

data_modelling(df)


Data update
No new data to ingest


In [29]:
df.groupby('Card Type').count()

Unnamed: 0_level_0,City,Date,Exp Type,Gender,Amount
Card Type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Gold,6367,6367,6367,6367,6367
Platinum,6398,6398,6398,6398,6398
Signature,6447,6447,6447,6447,6447
Silver,6840,6840,6840,6840,6840


In [34]:
df.groupby('Exp Type').count().sort_values(by='Date', ascending = False)

Unnamed: 0_level_0,City,Date,Card Type,Gender,Amount
Exp Type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Food,5463,5463,5463,5463,5463
Fuel,5257,5257,5257,5257,5257
Bills,5078,5078,5078,5078,5078
Entertainment,4762,4762,4762,4762,4762
Grocery,4754,4754,4754,4754,4754
Travel,738,738,738,738,738
