In [1]:
import os
from datetime import datetime

def get_latest_log_file():
    # Define the path for today's log directory
    today_date = datetime.now().strftime("%Y-%m-%d")
    log_dir = os.path.join("LOGS", today_date)

    # Create the directory if it doesn't exist
    os.makedirs(log_dir, exist_ok=True)

    # Get all log files in the directory
    log_files = [logfile for logfile in os.listdir(log_dir) if logfile.endswith('.log')]
    
    # If there are no log files, return None
    if not log_files:
        return None

    # Get the full path of the most recently created log file
    latest_log_file = max(
        [os.path.join(log_dir, f) for f in log_files],
        key=os.path.getctime
    )
    
    return latest_log_file


In [2]:
import logging
from datetime import datetime

# Get the latest log file or create a new one
def setup_logging():
    latest_log_file = get_latest_log_file()
    
    if latest_log_file:
        # If there's an existing log file, append to it
        logging.basicConfig(
            filename=latest_log_file,
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        logging.info("Appending to existing log file.")
    else:
        # Create a new log file
        today_date = datetime.now().strftime("%Y-%m-%d")
        log_dir = os.path.join("LOGS", today_date)
        os.makedirs(log_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime("%H-%M-%S")
        new_log_file = os.path.join(log_dir, f"{today_date}_{timestamp}_data_ingestion.log")

        logging.basicConfig(
            filename=new_log_file,
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        logging.info("Created new log file.")




In [3]:
import pandas as pd
import sys
import requests
import io
import logging
import zipfile

# Set up logging for monitoring
# logging.basicConfig(level=logging.INFO)
setup_logging()

# Kaggle dataset URL and API key
dataset_url = "https://www.kaggle.com/api/v1/datasets/download/thedevastator/analyzing-credit-card-spending-habits-in-india"
headers = {'Authorization': 'Bearer <your-kaggle-api-key>'}

# Chunk size for processing large datasets (adjust based on memory constraints)
chunk_size = 10000  # Adjust this as necessary
final_df = pd.DataFrame()

try:
    # Start downloading dataset
    logging.info(f"#"*50)
    logging.info(f"STEP 1 : Extarcting data from kagggle")
    logging.info(f"#"*50)
    logging.info("Starting data download...")
    response = requests.get(dataset_url, headers=headers, stream=True)
    response.raise_for_status()  # Raise an error for bad responses

    # Unzip the downloaded file in memory and process the contents
    with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref:
        # List the files in the zip archive
        file_list = zip_ref.namelist()
        logging.info(f"Files in the zip archive: {file_list}")

        # Extract the CSV file
        with zip_ref.open(file_list[0]) as file:
            # Read data in chunks for memory-efficient processing
            logging.info("Processing the data in chunks...")
            for chunk in pd.read_csv(file, chunksize=chunk_size):
#                 print(chunk.columns)
#                 print(len(chunk))
#                 chunk['transaction_id'] = chunk.index
                chunk.rename(columns={'index':'transaction_id','City':'city','Date':'transaction_date',
                                     'Card Type':'card_type','Exp Type':'exp_type','Gender':'gender','Amount':'amount'},inplace=True)
                chunk['transaction_id']+=1
                cleaned_chunk = chunk
                final_df = pd.concat([final_df,cleaned_chunk])
                # Print cleaned data (for testing)
                logging.info(f"Processed {len(cleaned_chunk)} rows of data.")
                
                # Write the cleaned data to stdout for NiFi to pick up
        
#             final_df.to_csv(sys.stdout, index=False, header=True)  # Exclude header after the first chunk                 
            final_df.to_csv('output.csv', index=False, header=True)            
    logging.info("Data processing and transmission to NiFi completed successfully.")

except requests.exceptions.RequestException as e:
    logging.error(f"Failed to download dataset: {e}")
except pd.errors.EmptyDataError:
    logging.error("No data found in the response.")
except zipfile.BadZipFile:
    logging.error("The downloaded file is not a valid zip file.")
except Exception as e:
    logging.error(f"An unexpected error occurred: {e}")


In [6]:
import pyodbc
import pandas as pd
import logging
from datetime import datetime
import os

# Set up logging for monitoring
# logging.basicConfig(level=logging.INFO)
setup_logging()

# Database connection parameters
server = 'localhost'  
database = 'DEProject'  

username = 'LAPTOP-HK3BEPAJ\\akhil'
password = 'your_password_here'

# Connect to the database
conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes;'
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

# Table name
table_name = 'transactions'

def table_exists(cursor, table_name):
    query = f"SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{table_name}'"
    cursor.execute(query)
    return cursor.fetchone() is not None

def create_table_if_not_exists(cursor, table_name):
    if not table_exists(cursor, table_name):
        create_query = f"""
        CREATE TABLE {table_name} (
            transaction_id INT PRIMARY KEY,
            city VARCHAR(50),
            transaction_date DATE,
            card_type VARCHAR(20),
            exp_type VARCHAR(20),
            gender CHAR(1),
            amount DECIMAL(10, 2)
        )
        """
        cursor.execute(create_query)
        conn.commit()
        logging.info(f"Table '{table_name}' created.")
    else:
        logging.info(f"Table '{table_name}' already exists.")

def is_unique(cursor, transaction_id, table_name):
    check_query = f"SELECT COUNT(1) FROM {table_name} WHERE transaction_id = ?"
    cursor.execute(check_query, transaction_id)
    return cursor.fetchone()[0] == 0

def insert_data(cursor, df, table_name):
    for _, row in df.iterrows():
        if is_unique(cursor, row['transaction_id'], table_name):
            insert_query = f"""
            INSERT INTO {table_name} (transaction_id, city, transaction_date, card_type, exp_type, gender, amount)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            """
            cursor.execute(insert_query, row['transaction_id'], row['city'], row['transaction_date'],
                           row['card_type'], row['exp_type'], row['gender'], row['amount'])
            logging.info(f"Data inserted for transaction ID {row['transaction_id']}.")
        else:
            logging.info(f"Data for transaction ID {row['transaction_id']} already exists.")

# Main process
try:
    logging.info(f"#"*50)
    logging.info(f"STEP 2 : Pushing the data to table")
    logging.info(f"#"*50)
    create_table_if_not_exists(cursor, table_name)
    # Load DataFrame from the flow file (you may need to adjust this)
#     df = pd.read_csv(sys.stdin)  # Example for reading from stdin in NiFi
    df = pd.read_csv('output.csv')
#     print(df.columns)
#     print(df)
    insert_data(cursor, df, table_name)
    conn.commit()
    logging.info("Data processing and insertion completed successfully.")
except Exception as e:
    logging.error(f"Error processing data: {str(e)}")
finally:
    cursor.close()
    conn.close()


Index(['transaction_id', 'city', 'transaction_date', 'card_type', 'exp_type',
       'gender', 'amount'],
      dtype='object')
       transaction_id                   city transaction_date  card_type  \
0                   1           Delhi, India        29-Oct-14       Gold   
1                   2  Greater Mumbai, India        22-Aug-14   Platinum   
2                   3       Bengaluru, India        27-Aug-14     Silver   
3                   4  Greater Mumbai, India        12-Apr-14  Signature   
4                   5       Bengaluru, India         5-May-15       Gold   
...               ...                    ...              ...        ...   
26047           26048         Kolkata, India        22-Jun-14     Silver   
26048           26049            Pune, India         3-Aug-14  Signature   
26049           26050       Hyderabad, India        16-Jan-15     Silver   
26050           26051          Kanpur, India        14-Sep-14     Silver   
26051           26052       Hyderaba