In [None]:
import oracledb
import polars as pl;import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed


ORACLE_USER = 'user'
ORACLE_PASSWORD = 'password'
ORACLE_DSN = 'dsn' 

In [None]:
import oracledb
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_parallel_data(oracle_user, oracle_password, oracle_dsn, queries, max_workers=4):
    """
    Fetch data from Oracle Database in parallel using multiple queries and return a combined Pandas DataFrame.
    
    Parameters:
    - oracle_user (str): Oracle DB username.
    - oracle_password (str): Oracle DB password.
    - oracle_dsn (str): Oracle DSN (Data Source Name).
    - queries (list): A list of SQL queries to be executed in parallel.
    - max_workers (int): Number of parallel workers (default is 4).
    
    Returns:
    - combined_df (pd.DataFrame): A Pandas DataFrame containing the combined results of all queries.
    """

    def fetch_data(query):
        """Fetch data from Oracle based on a query."""
        try:
            # Establish connection to Oracle DB
            connection = oracledb.connect(user=oracle_user, password=oracle_password, dsn=oracle_dsn)
            
            # Execute query and fetch results
            with connection.cursor() as cursor:
                cursor.execute(query)
                data = cursor.fetchall()
                columns = [col[0] for col in cursor.description]  # Extract column names
                return data, columns
        except oracledb.DatabaseError as e:
            print(f"Error fetching data: {e}")
            return None, None
        finally:
            connection.close()

    # Execute multiple queries concurrently using ThreadPoolExecutor
    dataframes = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit queries for parallel execution
        future_to_query = {executor.submit(fetch_data, query): query for query in queries}
        
        for future in as_completed(future_to_query):
            result, columns = future.result()
            if result and columns:
                # Convert the result to a Pandas DataFrame
                df = pd.DataFrame(result, columns=columns)
                dataframes.append(df)
    
    # Combine all dataframes into one for further analysis
    combined_df = pd.concat(dataframes, ignore_index=True)
    return combined_df

# Usage example
if __name__ == "__main__":
    # Oracle connection details
    ORACLE_USER = 'your_user'
    ORACLE_PASSWORD = 'your_password'
    ORACLE_DSN = 'your_dsn'  # Example: 'localhost:1521/ORCLPDB1'

    # Define SQL queries with Oracle's parallel query feature using 'state'
    queries = [
        "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'CA'",
        "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'TX'",
        "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'NY'",
        "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'FL'"
    ]
    
    # Fetch data in parallel and get a combined DataFrame
    combined_df = fetch_parallel_data(ORACLE_USER, ORACLE_PASSWORD, ORACLE_DSN, queries, max_workers=4)

    # Display a preview of the combined DataFrame
    display(combined_df.head())  # Use display for better output in Jupyter
    
    # Example of performing some operation on the Pandas DataFrame (e.g., filtering by state)
    filtered_df = combined_df[combined_df['state'] == 'CA']
    display(filtered_df.head())  # Display filtered data for 'CA'

    # Example of saving the result to a CSV file
    filtered_df.to_csv("filtered_results_state_CA.csv", index=False)


## PANDAS

In [None]:
# Connect to Oracle Database
def fetch_data(query):
    """Fetch data from Oracle based on a query."""
    try:
        # Establish connection to Oracle DB
        connection = oracledb.connect(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN)
        
        # Execute query and fetch results
        with connection.cursor() as cursor:
            cursor.execute(query)
            data = cursor.fetchall()
            columns = [col[0] for col in cursor.description]  # Extract column names
            return data, columns
    except oracledb.DatabaseError as e:
        print(f"Error fetching data: {e}")
        return None, None
    finally:
        connection.close()

# Define SQL queries with Oracle's parallel query feature using 'state'
queries = [
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'CA'",
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'TX'",
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'NY'",
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'FL'"
]

# Function to execute multiple queries concurrently using ThreadPoolExecutor
def run_queries_in_parallel(queries):
    dataframes = []
    with ThreadPoolExecutor(max_workers=4) as executor:
        # Submit queries for parallel execution
        future_to_query = {executor.submit(fetch_data, query): query for query in queries}
        
        for future in as_completed(future_to_query):
            result, columns = future.result()
            if result and columns:
                # Convert the result to a Pandas DataFrame
                df = pd.DataFrame(result, columns=columns)
                dataframes.append(df)
    
    # Combine all dataframes into one for further analysis
    combined_df = pd.concat(dataframes, ignore_index=True)
    return combined_df

# Running the queries and processing the results
if __name__ == "__main__":
    # Fetch data in parallel and convert to Pandas DataFrame
    combined_df = run_queries_in_parallel(queries)

    # Display a preview of the combined DataFrame
    display(combined_df.head())  # Use display for better output in Jupyter
    
    # Example of performing some operation on the Pandas DataFrame (e.g., filtering by state)
    filtered_df = combined_df[combined_df['state'] == 'CA']
    display(filtered_df.head())  # Display filtered data for 'CA'

    # Example of saving the result to a CSV file
    filtered_df.to_csv("filtered_results_state_CA.csv", index=False)


In [None]:
def fetch_data(query):
    """Fetch data from Oracle based on a query."""
    try:
        # Establish connection to Oracle DB
        connection = oracledb.connect(user=ORACLE_USER, password=ORACLE_PASSWORD, dsn=ORACLE_DSN)
        
        # Execute query and fetch results
        with connection.cursor() as cursor:
            cursor.execute(query)
            data = cursor.fetchall()
            columns = [col[0] for col in cursor.description]  # Extract column names
            return data, columns
    except oracledb.DatabaseError as e:
        print(f"Error fetching data: {e}")
        return None, None
    finally:
        connection.close()

# Define SQL queries with Oracle's parallel query feature using 'state' as thread divisor
queries = [
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'ID'",
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'WA'",
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'OR'",
    "SELECT /*+ PARALLEL(large_table, 4) */ * FROM large_table WHERE state = 'FL'"]

#  multiple queries concurrently using ThreadPoolExecutor
def run_queries_in_parallel(queries):
    dataframes = []
    with ThreadPoolExecutor(max_workers=4) as executor:
        # Submit queries for parallel execution
        future_to_query = {executor.submit(fetch_data, query): query for query in queries}
        
        for future in as_completed(future_to_query):
            result, columns = future.result()
            if result and columns:
                # Convert the result to Polars DataFrame
                df = pl.DataFrame(result, schema=columns)
                dataframes.append(df)
    
    # Combine all dataframes into one for further analysis
    combined_df = pl.concat(dataframes)
    return combined_df

# Running the queries and processing the results
if __name__ == "__main__":
    # Fetch data in parallel and convert to Polars DataFrame
    combined_df = run_queries_in_parallel(queries)

    # Display a preview of the combined DataFrame
    combined_df.head().to_pandas()  # Using .to_pandas() for better display in Jupyter
    
    # Example of performing some operation on the Polars DataFrame (e.g., filtering by state)
    filtered_df = combined_df.filter(pl.col("state") == "CA")
    filtered_df.head().to_pandas()  # Display filtered data for 'CA'


In [None]:
query= """SELECT STATE_CODE, QTR,YR,COUNT(*) AS total_records,
          COUNT(DISTINCT ID) AS id_count,
          COUNT(DISTINCT UI) AS ui_count,
          COUNT(DISTINCT EIN) AS ein_count,
          SUM(WAGE) AS total_wage,
          AVE(WAGE) AS avg_wage,
          FROM oracleDB4,
          GROUP BY STATE_CODE,QTR,YR"""

In [None]:
def convertfips2state(state_code):
    state = us.states.lookup(state_code)
    return state.abbr

df['state_abbr'] = df['state_code'].apply(convertfips2state)
