In [1]:
import requests
from pyhive import hive
from datetime import datetime
import pandas as pd

In [2]:
# Define API URLs
transactions_api_url = 'http://localhost:5000/api/transactions'
customers_api_url = 'http://localhost:5000/api/customers'
external_data_api_url = 'http://localhost:5000/api/externalData'

In [3]:
# Function to fetch data from API
def fetch_data(api_url):
    try:
        response = requests.get(api_url)
        response.raise_for_status()  # Raise an error for bad responses (4xx and 5xx)
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from {api_url}: {e}")
        return None

In [4]:
# Fetch data from transactions API
transactions_data = fetch_data(transactions_api_url)

# Fetch data from customers API
customers_data = fetch_data(customers_api_url)

# Fetch data from externalData API
external_data = fetch_data(external_data_api_url)

In [5]:
# Hive connection parameters
hive_host = "localhost"
hive_port = 10000   

# Connect to Hive
conn = hive.Connection(host=hive_host, port=hive_port, username="root")
cursor = conn.cursor()

In [6]:
# Create a database
cursor.execute("CREATE DATABASE IF NOT EXISTS Transactions")

In [7]:
# Use the database
cursor.execute("USE Transactions")

In [8]:
# Create a table for transactions
cursor.execute("""
    CREATE TABLE IF NOT EXISTS transactions (
      transaction_id STRING,
      date_time TIMESTAMP,
      amount DOUBLE,
      currency STRING,
      merchant_details STRING,
      customer_id STRING,
      transaction_type STRING,
      location STRING
    )    

""")

In [None]:
# Insert data into the Transactions table
for transaction in transactions_data:
    transaction_id = transaction['transaction_id']
    date_time_str = transaction['date_time']
    amount = transaction['amount']
    currency = transaction['currency']
    merchant_details = transaction['merchant_details']
    customer_id = transaction['customer_id']
    transaction_type = transaction['transaction_type']
    location = transaction['location']
    
    # Convert date_time_str to a datetime object
    date_time = datetime.strptime(date_time_str, "%Y-%m-%dT%H:%M:%S")


    insert_query = f"""
    INSERT INTO transactions
    VALUES (
      '{transaction_id}',
      '{date_time}',
       {amount},
      '{currency}',
      '{merchant_details}',
      '{customer_id}',
      '{transaction_type}',
      '{location}'
    )
    """

    cursor.execute(insert_query)

In [None]:
# Create customers table
cursor.execute("""
    CREATE TABLE IF NOT EXISTS customers (
        customer_id STRING,
        account_history ARRAY<STRING>,
        age INT,
        location STRING,
        avg_transaction_value DOUBLE
    )
""")

In [None]:
# Insert data into customers table
for customer in customers_data:
    customer_id = customer['customer_id']
    account_history = tuple(customer['account_history'])
    age = customer['demographics']['age']
    location = customer['demographics']['location']
    avg_transaction_value = customer['behavioral_patterns']['avg_transaction_value']

    insert_query = f"""
    INSERT INTO customers
    select 
        '{customer_id}',
        ARRAY{account_history},
        {age},
        '{location}',
        {avg_transaction_value}
    """

    cursor.execute(insert_query)
    
    #print(insert_query)

In [None]:
# create blacklist info table
cursor.execute("""
    CREATE TABLE IF NOT EXISTS blacklist (
        merchant_name STRING
    )
""")

In [None]:
# Extract blacklist_info from external_data
blacklist_info = external_data.get("blacklist_info", [])

# Insert data into the table
for merchant_name in blacklist_info:
    insert_query = f"""
    INSERT INTO blacklist (merchant_name)
    VALUES ('{merchant_name}')
    """

    cursor.execute(insert_query)

In [None]:
# create external info table
cursor.execute("""
    CREATE TABLE IF NOT EXISTS external_info (
        customer_id STRING,
        fraud_reports INT,
        credit_scores FLOAT
    )
""")

In [None]:
# retrieve the dictionary from the data source
credit_scores = external_data.get("credit_scores", {})
fraud_reports = external_data.get("fraud_reports", {})

# create dataframes from the dictionaries
df_credit_scores = pd.DataFrame(credit_scores.items(), columns=['customer_id', 'credit_scores'])
df_fraud_reports = pd.DataFrame(fraud_reports.items(), columns=['customer_id', 'fraud_reports'])
df_external_info = pd.merge(df_fraud_reports, df_credit_scores, on='customer_id')

# insert data into external infos table
for index, row in df_external_info.iterrows():
    customer_id = row['customer_id']
    credit_scores = row['credit_scores']
    fraud_reports = row['fraud_reports']

    insert_query = f"""
    INSERT INTO transactions.external_info
    SELECT 
      '{customer_id}',
       {credit_scores},
       {fraud_reports}
    """
    
    cursor.execute(insert_query)

In [None]:
# Commit the changes and close the connection
conn.commit()
conn.close()