In [13]:
from pyhive import hive

def create_hive_tables(database_name):
    # Connect to Hive
    connection = hive.Connection(host='localhost', port=10000, username='hive')
    print("Connected to Hive.")
    cursor = connection.cursor()

    try:
        # Create the Hive database if it doesn't exist
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        cursor.execute(f"USE {database_name}")

        # Create the transaction table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS transactions (
                transaction_id STRING,
                date_time TIMESTAMP,
                amount DOUBLE,
                currency STRING,
                merchant_details STRING,
                customer_id STRING,
                transaction_type STRING,
                location STRING
            )
        """)

        # Create the customer table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS customers (
                customer_id STRING,
                account_history STRING,
                age INT,
                location STRING,
                behavioral_pattern_avg DOUBLE
            )
        """)

        # Create the cust_external_data table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS cust_external_data (
                customer_id STRING,
                credit_score INT,
                fraud_report INT
            )
        """)

        # Create the blacklist table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS blacklist (
                blacklist_info STRING
            )
        """)

        print(f"Hive database '{database_name}' and tables created successfully.")

    except Exception as e:
        print(f"Error creating Hive tables: {e}")

    finally:
        cursor.close()
        connection.close()

# Example usage:
create_hive_tables("finaldb")


Connected to Hive.
Hive database 'finaldb' and tables created successfully.


In [1]:
from pyhive import hive

def get_tables_and_schemas(database_name):
    try:
        # Connect to Hive
        connection = hive.Connection(host='localhost', port=10000, username='hive')
        cursor = connection.cursor()

        # Use the specified database
        cursor.execute(f"USE {database_name}")

        # Get the list of tables in the database
        cursor.execute("SHOW TABLES")
        tables = [table[0] for table in cursor.fetchall()]

        # Get the schema for each table
        table_schemas = {}
        for table in tables:
            cursor.execute(f"DESCRIBE {table}")
            columns = [column[0] for column in cursor.fetchall()]
            table_schemas[table] = columns

        return table_schemas

    except Exception as e:
        print(f"Error: {e}")
        return None

    finally:
        cursor.close()
        connection.close()

# Example usage:
database_name = "finaldb"
result = get_tables_and_schemas(database_name)

if result:
    for table, schema in result.items():
        print(f"Table: {table}\nSchema: {schema}\n")


Table: blacklist
Schema: ['blacklist_info']

Table: cust_external_data
Schema: ['customer_id', 'credit_score', 'fraud_report']

Table: customers
Schema: ['customer_id', 'account_history', 'age', 'location', 'behavioral_pattern_avg']

Table: transactions
Schema: ['transaction_id', 'date_time', 'amount', 'currency', 'merchant_details', 'customer_id', 'transaction_type', 'location']



In [1]:
from pyhive import hive

def customers_transformation(customers):
    transformed_data = []

    for customer in customers:
        # Flatten the account history list into one string
        account_history_str = ",".join(customer["account_history"])

        # Split demographics into two variables (age and location)
        age = customer["demographics"]["age"]
        location = customer["demographics"]["location"]

        # Merge behavioral_patterns and avg_transaction_value into one variable
        behavioral_pattern_avg = customer["behavioral_patterns"]["avg_transaction_value"]

        # Create a dictionary with the transformed data
        transformed_customer = {
            "customer_id": customer["customer_id"],
            "account_history": account_history_str,
            "age": age,
            "location": location,
            "behavioral_pattern_avg": behavioral_pattern_avg,
            "fraud_reports": 0
        }

        transformed_data.append(transformed_customer)

    return transformed_data

def insert_customers_into_hive(database_name, table_name, customers):
    # Connect to Hive
    connection = hive.Connection(host='localhost', port=10000, username='hive')
    print("Connected to Hive.")
    cursor = connection.cursor()

    try:
        # Use the specified database
        cursor.execute(f"USE {database_name}")

        # Transform customers data
        transformed_customers = customers_transformation(customers)

        # Drop all rows in the specified table
        cursor.execute(f"TRUNCATE TABLE {table_name}")

        # Insert transformed customers into the specified table
        for customer in transformed_customers:
            values = (
                customer["customer_id"],
                customer["account_history"],
                customer["age"],
                customer["location"],
                customer["behavioral_pattern_avg"],
                customer["fraud_reports"]
            )
            cursor.execute(f"""
                INSERT INTO TABLE {table_name} VALUES
                ('{values[0]}', '{values[1]}', {values[2]}, '{values[3]}', {values[4]}, {values[5]})
            """)

        print(f"Inserted {len(transformed_customers)} customers into Hive table '{table_name}'.")

    except Exception as e:
        print(f"Error inserting customers into Hive table: {e}")

    finally:
        cursor.close()
        connection.close()


# Example usage:
database_name = "testdbb"
table_name = "customers"
customers_data = [{'account_history': ['Ta28ac0af60cdc1e048d9b5853eda7864', 'T1acc99d5ed63955e56875612de1fd8d2', 'T22e1677bcec03fd3687829daf7fc1c77', 'T43e6e00f918630453dce83832afe124d', 'Tc1d387fc632ea6b5d7f81967ed3b0ed5', 'T0807a4c653c7495f3162f37c10b6f8b1'], 'behavioral_patterns': {'avg_transaction_value': 274.60671470605416}, 'customer_id': 'C000', 'demographics': {'age': 63, 'location': 
'City6'}}, {'account_history': ['T911a9a90494ef1bc9378dc6d34010db1', 'T802ced7f141214f3aabedb93a0b3aaa3'], 'behavioral_patterns': {'avg_transaction_value': 366.775058148578}, 'customer_id': 'C001', 'demographics': {'age': 35, 'location': 'City10'}}, {'account_history': ['Tc8b66b9ea9edea6a58f40c98988ef924', 'T0356e444f030154c20fa4a78adba39e1', 'Tc092ed9dbf1d071db31365ac130939fc'], 'behavioral_patterns': {'avg_transaction_value': 459.47208162510833}, 'customer_id': 'C002', 'demographics': {'age': 60, 
'location': 'City6'}}, {'account_history': ['T15098ec7fff229dc1079635bbc807c82', 'Tbaa9c89eb3f76df11a4f05d4c8deed3b', 'T59fe23f029424783ba5750011c2f9d94', 'T79546c938dddaff5a09ce0b0d96ab158', 'T46fccb3b04b7c7d6c9e1363fcfd20d35', 'T577025443c5addedf13125c3000e2fe2', 'Tc52e54d39f1b3f5f76cb804ed9dd5510', 'Te3f1a03dd1bc50190b59fed51f6ede8e', 'Tef8a7a0e4c03b9c85f8003f7286a356a', 'T949df9dfb9b16ce08eb39b3ff896e2fc', 'T7a7d82677fbb08385232d46abf13b07c', 'T158763c019607900cd4788c9837b766c', 'T9d92254531e97b345850fe3692093fd7', 'Tca786a868fd9b2aea24050e62b45d1a4', 'Ta6774aeb57ebf9e09556203b6ee5ba76', 'T1fd44194f6f47d2f0c0d3179cb5439f4', 'T46a2fb7672009a5d9c123bc40b6e4d30', 'T05eed83f9c7a999a7ee9f0f681b3aef0', 'T47f013689d60551faa0be819593ae4b9', 'T9dc8b0bbe47e537e9d537e6b63103b43', 'Tf7dfec3e31ebc59ed78167c2f250c0a9', 'Tcf040bd039585a9864982d00acb1218c', 'Te974429a9789722094988d54279538a2', 'T3168d423efe6de180498b52f8f5cff88', 'Tf66c5892075ae064f0a643b096ba4851', 'Td27eb7ee8726550d09969f7496609f1d', 'T61afac1cd0852f1513f07a2397855830', 'Te1ad35ca6ba0c7f06d8fde9597bce4ea'], 'behavioral_patterns': {'avg_transaction_value': 83.14797852297723}, 'customer_id': 'C003', 'demographics': {'age': 46, 'location': 'City1'}}, {'account_history': ['T69c69bbdcc3e0c01926d49f8fecec92b', 'T35c3d5da80e8778ebae9351cc6986079', 'T7b7582e338ff180decfb079123b1b043', 'T91ecd686fb616737055012eb2a0b4f4c', 'T9d3862de4d3c4f36cc99a8cf4cdc9617', 'T24df6a0e6e9f6d5abb7d1beb6ce8a155', 'T4328dd201b3e1d2e1357b1aa07648ff6', 'T96ca3c0009f96fd5327e5b6727c7d3d2', 'T02fcf8d6c6b4f6b6044b1d635569dbd6', 'T80f97aff44241a49afc1e90b6cb7a21a', 'T6914c01cd3a49d3e73b24136184cc5ab', 'Td82b35737f7ae7ea9a24451074799cdc', 'T9502d33c5dcae61eea6ee9871afde30a'], 'behavioral_patterns': {'avg_transaction_value': 409.75196188049614}, 'customer_id': 'C004', 'demographics': {'age': 35, 'location': 'City7'}}, {'account_history': ['T9a203c7b05e5ff6070fc682ee8660ae2', 'Te1e99502c5eea34f8fedc643e4af5ebb', 'Tc920c23eefd5767fe092265261219f2a', 'T5e69d1c3ef92c0fa6f42d56a40a057c0', 'Tb2f406bcebfe3eff57e74f791a7426b6'], 'behavioral_patterns': {'avg_transaction_value': 184.95718127783917}, 'customer_id': 'C005', 'demographics': {'age': 61, 'location': 'City8'}}, {'account_history': ['Ta0fdfd334a83f8b4264fe9aec37cee83', 'Tdde157a0019eaf9eb7dad6566ac00901'], 'behavioral_patterns': {'avg_transaction_value': 433.6111930248852}, 'customer_id': 'C006', 'demographics': {'age': 19, 'location': 'City8'}}, {'account_history': [], 'behavioral_patterns': {'avg_transaction_value': 358.48844848741913}, 'customer_id': 'C007', 'demographics': {'age': 47, 'location': 'City2'}}, {'account_history': ['T0207e1e0803dc53d6327f429e6c33d59', 'T904533c039458f9fc638630eb527ad03', 'T122966743f0559d361d6b48d9d4b86a6'], 'behavioral_patterns': {'avg_transaction_value': 364.9904570722303}, 'customer_id': 'C008', 'demographics': {'age': 61, 'location': 'City4'}}, {'account_history': ['T002567d2eca6c1d42d5226b9e7baf3d5', 'Tbdb9343cfa2fb73421203c2a4bee9401', 'Tdf094c83f139f8a795d00b505f2e3c91', 'T3d4b2931f72a8b052aa1c36d45189ce8', 'Ta29f136384dae470736aac47d597473a', 'T1fd7af1c54391280d7041ac860429acc', 'T8029e3972b77c58cc5a973381aa2d14e', 'T4445957f282098fd386712c87c2366e9', 'Te275540cfd25356d165aa7e79ac3f083', 'T6f978fbddc6f980f14d1fbd735e84655', 'Ta049c32b3720a22f48fae36a470cba59', 'T8824d32858cc2c63571f97872d516593'], 'behavioral_patterns': {'avg_transaction_value': 235.03987068990392}, 'customer_id': 'C009', 'demographics': {'age': 32, 'location': 'City10'}}, {'account_history': [], 'behavioral_patterns': {'avg_transaction_value': 201.03263751962157}, 'customer_id': 'C010', 'demographics': {'age': 44, 'location': 'City1'}}, {'account_history': ['T5b17dc68d19b130e7d4d80e28c804dcb'], 'behavioral_patterns': {'avg_transaction_value': 73.88798283004277}, 'customer_id': 'C011', 'demographics': {'age': 60, 'location': 'City5'}}, {'account_history': 
['T01f2591d8163330432055a70aec024c4'], 'behavioral_patterns': {'avg_transaction_value': 99.78173290976856}, 'customer_id': 'C012', 'demographics': {'age': 43, 'location': 'City6'}}, {'account_history': ['T9ce407fe16be82fb9d7b58a660b11a70', 'T86265da0ad08593c7aa6c71e0be53880'], 'behavioral_patterns': {'avg_transaction_value': 457.80291769514827}, 'customer_id': 'C013', 'demographics': {'age': 49, 'location': 'City10'}}, {'account_history': [], 'behavioral_patterns': {'avg_transaction_value': 308.57791060021117}, 'customer_id': 'C014', 'demographics': {'age': 25, 'location': 'City4'}}, {'account_history': ['T7ee20fc68604d957dda7477ec1bce6d1'], 'behavioral_patterns': {'avg_transaction_value': 459.2197685308178}, 'customer_id': 'C015', 'demographics': {'age': 26, 'location': 'City2'}}, {'account_history': ['T8fd5e744c975f45137bd171f07bc2946'], 'behavioral_patterns': {'avg_transaction_value': 412.3752153553793}, 'customer_id': 'C016', 'demographics': {'age': 34, 'location': 'City9'}}, {'account_history': [], 'behavioral_patterns': {'avg_transaction_value': 373.42784419404524}, 'customer_id': 'C017', 'demographics': {'age': 30, 'location': 'City9'}}, {'account_history': [], 'behavioral_patterns': {'avg_transaction_value': 122.41279014654701}, 'customer_id': 'C018', 'demographics': {'age': 27, 'location': 'City10'}}, {'account_history': [], 'behavioral_patterns': {'avg_transaction_value': 400.6920567595947}, 'customer_id': 'C019', 'demographics': {'age': 23, 'location': 'City7'}}]

insert_customers_into_hive(database_name, table_name, customers_data)


Connected to Hive.
Inserted 20 customers into Hive table 'customers'.


In [2]:
type(customers_data)

list

In [5]:
from pyhive import hive

def retrieve_customers_from_hive(database_name, table_name):
    # Connect to Hive
    connection = hive.Connection(host='localhost', port=10000, username='hive')
    print("Connected to Hive.")
    cursor = connection.cursor()

    try:
        # Use the specified database
        cursor.execute(f"USE {database_name}")

        # Retrieve customers from the specified table
        cursor.execute(f"SELECT * FROM {table_name}")
        customers = cursor.fetchall()

        if customers:
            print("Retrieved customers from Hive:")
            for customer in customers:
                print(customer)
        else:
            print(f"No customers found in Hive table '{table_name}'.")

    except Exception as e:
        print(f"Error retrieving customers from Hive table: {e}")

    finally:
        cursor.close()
        connection.close()

# Example usage:
database_name = "testdbb"
table_name = "customers"

retrieve_customers_from_hive(database_name, table_name)

Connected to Hive.
Retrieved customers from Hive:
('C006', 'AH0061,AH0062', 25, 'US', 1000.0, 0)
('C007', 'AH0071,AH0072', 30, 'EU', 2000.0, 0)
('C008', 'AH0081,AH0082', 35, 'AS', 3000.0, 0)
('C009', 'AH0091,AH0092', 40, 'AF', 4000.0, 0)
('C010', 'AH0101,AH0102', 45, 'OC', 5000.0, 0)


In [10]:
from pyhive import hive
from datetime import datetime

def transaction_transformations(transactions):
    transformed_data = []

    for transaction in transactions:
        # Round the amount
        amount = round(transaction["amount"], 2)

        # Format date_time to be compatible with Hive timestamp or datetime format
        date_time_str = transaction["date_time"]
        formatted_date_time = datetime.strptime(date_time_str, "%Y-%m-%dT%H:%M:%S.%f").strftime("%Y-%m-%d %H:%M:%S")

        # unify currency"USD", "EUR", "GBP" to "USD"
        if transaction["currency"] in ["EUR", "GBP"]:
            if transaction["currency"] == "EUR":
                amount = round(amount * 1.2, 2)
            elif transaction["currency"] == "GBP":
                amount = round(amount * 1.4, 2)
            transaction["currency"] = "USD"
        # Create a dictionary with the transformed data
        transformed_transaction = {
            "transaction_id": transaction["transaction_id"],
            "date_time": formatted_date_time,
            "amount": amount,
            "currency": transaction["currency"],
            "merchant_details": transaction["merchant_details"],
            "customer_id": transaction["customer_id"],
            "transaction_type": transaction["transaction_type"],
            "location": transaction["location"]
        }

        transformed_data.append(transformed_transaction)

    return transformed_data

def insert_transactions_into_hive(database_name, table_name, transactions):
    # Connect to Hive
    connection = hive.Connection(host='localhost', port=10000, username='hive')
    print("Connected to Hive.")
    cursor = connection.cursor()

    try:
        # Use the specified database
        cursor.execute(f"USE {database_name}")

        # Insert transactions into the specified table
        for transaction in transactions:
            # Perform transaction transformations
            transformed_transaction = transaction_transformations([transaction])[0]

            values = (
                transformed_transaction["transaction_id"],
                transformed_transaction["date_time"],
                transformed_transaction["amount"],
                transformed_transaction["currency"],
                transformed_transaction["merchant_details"],
                transformed_transaction["customer_id"],
                transformed_transaction["transaction_type"],
                transformed_transaction["location"]
            )

            cursor.execute(f"""
                INSERT INTO TABLE {table_name} VALUES
                ('{values[0]}', '{values[1]}', {values[2]}, '{values[3]}', '{values[4]}', {values[5]}, '{values[6]}', '{values[7]}', '{values[8]}')
            """)

        print(f"Inserted {len(transactions)} transactions into Hive table '{table_name}'.")

    except Exception as e:
        print(f"Error inserting transactions into Hive table: {e}")

    finally:
        cursor.close()
        connection.close()

# Example usage:
database_name = "finaldb"
table_name = "transactions"
transactions_data = [
    {
        "transaction_id": "T001",
        "date_time": "2023-11-30T12:30:00.000",
        "amount": 150.25,
        "currency": "USD",
        "merchant_details": "Merchant1",
        "customer_id": "C001",
        "transaction_type": "purchase",
        "location": "City1"
    },
    {
        "transaction_id": "T002",
        "date_time": "2023-11-30T13:30:00.000",
        "amount": 100.50,
        "currency": "USD",
        "merchant_details": "Merchant2",
        "customer_id": "C002",
        "transaction_type": "purchase",
        "location": "City2"
    },
    {
        "transaction_id": "T003",
        "date_time": "2023-11-30T14:30:00.000",
        "amount": 250.75,
        "currency": "USD",
        "merchant_details": "Merchant3",
        "customer_id": "C003",
        "transaction_type": "purchase",
        "location": "City3"
    },
    {
        "transaction_id": "T004",
        "date_time": "2023-11-30T15:30:00.000",
        "amount": 500.25,
        "currency": "USD",
        "merchant_details": "Merchant4",
        "customer_id": "C004",
        "transaction_type": "purchase",
        "location": "City4"
    },
    {
        "transaction_id": "T005",
        "date_time": "2023-11-30T16:30:00.000",
        "amount": 750.00,
        "currency": "USD",
        "merchant_details": "Merchant5",
        "customer_id": "C005",
        "transaction_type": "purchase",
        "location": "City5"
    }
]

insert_transactions_into_hive(database_name, table_name, transactions_data)


Connected to Hive.
Inserted 5 transactions into Hive table 'transactions'.


In [9]:
from pyhive import hive

def update_hive_tables_with_external_data(database_name, external_data):
    # Connect to Hive
    connection = hive.Connection(host='localhost', port=10000, username='hive')
    print("Connected to Hive.")
    cursor = connection.cursor()

    try:
        # Use the specified database
        cursor.execute(f"USE {database_name}")

        # Update credit scores and fraud reports in the customers table
        for customer_id, credit_score in external_data["credit_scores"].items():
            cursor.execute(f"""
                UPDATE customers
                SET credit_score = {credit_score}
                WHERE customer_id = '{customer_id}'
            """)
            
        for customer_id, fraud_reports in external_data["fraud_reports"].items():
            cursor.execute(f"""
                UPDATE customers
                SET fraud_reports = {fraud_reports}
                WHERE customer_id = '{customer_id}'
            """)

        # Update blacklist merchant in the transactions table
        for merchant in external_data["blacklist_info"]:
            cursor.execute(f"""
                UPDATE transactions
                SET blacklist_merchant = true
                WHERE merchant_details = '{merchant}'
            """)

        print("Updated Hive tables with external data successfully.")

    except Exception as e:
        print(f"Error updating Hive tables with external data: {e}")

    finally:
        cursor.close()
        connection.close()

# Example usage:
database_name = "testdbb"
external_data = {
    "blacklist_info": ["Merchant26", "Merchant22", "Merchant27", "Merchant28", "Merchant28", "Merchant30", "Merchant27", "Merchant26", "Merchant25", "Merchant29"],
    "credit_scores": {"C000": 388, "C001": 431, "C002": 719, "C003": 471, "C004": 726},
    "fraud_reports": {"C000": 4, "C001": 0, "C002": 1, "C003": 0, "C004": 4}
}

update_hive_tables_with_external_data(database_name, external_data)


Connected to Hive.
Error updating Hive tables with external data: TExecuteStatementResp(status=TStatus(statusCode=3, infoMessages=['*org.apache.hive.service.cli.HiveSQLException:Error while compiling statement: FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.:17:16', 'org.apache.hive.service.cli.operation.Operation:toSQLException:Operation.java:380', 'org.apache.hive.service.cli.operation.SQLOperation:prepare:SQLOperation.java:206', 'org.apache.hive.service.cli.operation.SQLOperation:runInternal:SQLOperation.java:290', 'org.apache.hive.service.cli.operation.Operation:run:Operation.java:320', 'org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:530', 'org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:HiveSessionImpl.java:506', 'org.apache.hive.service.cli.CLIService:executeStatement:CLIService.java:280', 'org.apache.hive.service.cli.th

In [15]:
from pyhive import hive

def insert_external_data_into_hive(database_name, cust_external_data_table, blacklist_table, external_data):
    # Connect to Hive
    connection = hive.Connection(host='localhost', port=10000, username='hive')
    print("Connected to Hive.")
    cursor = connection.cursor()

    try:
        # Use the specified database
        cursor.execute(f"USE {database_name}")

        # Truncate existing data from cust_external_data table
        cursor.execute(f"TRUNCATE TABLE {cust_external_data_table}")

        # get credit_scores and fraud_reports from external_data
        credit_scores = external_data['credit_scores']
        fraud_reports = external_data['fraud_reports']

        # Combine credit_scores and fraud_reports into one dictionary
        credit_fraud = {k: (credit_scores[k], fraud_reports[k]) for k in credit_scores}

        # Insert new data into cust_external_data table
        for customer_id, (credit_score, fraud_report) in credit_fraud.items():
            cursor.execute(f"""
                INSERT INTO TABLE {cust_external_data_table} VALUES
                ('{customer_id}', {credit_score}, {fraud_report})
            """)

        # Truncate existing data from blacklist table
        cursor.execute(f"TRUNCATE TABLE {blacklist_table}")

        # Insert new data into blacklist table
        for merchant_info in external_data['blacklist_info']:
            cursor.execute(f"""
                INSERT INTO TABLE {blacklist_table} VALUES
                ('{merchant_info}')
            """)

        print(f"Inserted external data into Hive tables '{cust_external_data_table}' and '{blacklist_table}'.")

    except Exception as e:
        print(f"Error inserting external data into Hive tables: {e}")

    finally:
        cursor.close()
        connection.close()

# Example usage:
database_name = "finaldb"
cust_external_data_table = "cust_external_data"
blacklist_table = "blacklist"
external_data = {
    "blacklist_info": ["Merchant25", "Merchant24", "Merchant21"],
    "credit_scores": {"C000": 405, "C001": 817, "C002": 614},
    "fraud_reports": {"C000": 2, "C001": 12, "C002": 5}
}

insert_external_data_into_hive(database_name, cust_external_data_table, blacklist_table, external_data)

Connected to Hive.
Inserted external data into Hive tables 'cust_external_data' and 'blacklist'.
