In [6]:
import trino


In [5]:
def basic_trino_connection_and_query(query: str):
     
    conn = trino.dbapi.connect(
        host='localhost',
        port=8080,
        user='admin',  
        catalog='postgresql',   
        schema='public'        
    )
    
    # Create cursor
    cur = conn.cursor()
    
    # Execute simple query
    cur.execute(query)

    result = cur.fetchall()
    print(f"Query result for '{query}':", result)

    conn.close()

    return result
    
basic_trino_connection_and_query("SELECT * FROM employees LIMIT 5")


Query result for 'SELECT * FROM employees LIMIT 5': [[1, 'Alice Johnson', 'Engineering', 95000, datetime.date(2022, 1, 15)], [2, 'Bob Smith', 'Marketing', 75000, datetime.date(2022, 3, 20)], [3, 'Carol Davis', 'Engineering', 105000, datetime.date(2021, 11, 10)], [4, 'David Brown', 'Sales', 65000, datetime.date(2022, 2, 28)], [5, 'Eve Wilson', 'Engineering', 88000, datetime.date(2022, 4, 5)]]


[[1, 'Alice Johnson', 'Engineering', 95000, datetime.date(2022, 1, 15)],
 [2, 'Bob Smith', 'Marketing', 75000, datetime.date(2022, 3, 20)],
 [3, 'Carol Davis', 'Engineering', 105000, datetime.date(2021, 11, 10)],
 [4, 'David Brown', 'Sales', 65000, datetime.date(2022, 2, 28)],
 [5, 'Eve Wilson', 'Engineering', 88000, datetime.date(2022, 4, 5)]]

In [None]:
# docker exec -it postgres-db psql -U postgres -d testdb

# for check tables

In [None]:
#!/usr/bin/env python3
"""
Advanced Trino configuration and connection management
"""
import trino
import ssl
from typing import Optional, Dict, Any
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

class TrinoConnection:
    def __init__(
        self,
        host: str = 'localhost',
        port: int = 8080,
        user: str = 'admin',
        catalog: Optional[str] = None,
        schema: Optional[str] = None,
        auth: Optional[Any] = None,
        use_ssl: bool = False,
        ssl_verify: bool = True,
        properties: Optional[Dict[str, str]] = None
    ):
        self.host = host
        self.port = port
        self.user = user
        self.catalog = catalog
        self.schema = schema
        self.auth = auth
        self.use_ssl = use_ssl
        self.ssl_verify = ssl_verify
        self.properties = properties or {}
        
        self.connection = None
        self.cursor = None
    
    def connect(self):
        """Establish connection to Trino"""
        connection_params = {
            'host': self.host,
            'port': self.port,
            'user': self.user,
            'catalog': self.catalog,
            'schema': self.schema,
            'source': 'python-client',
            'isolation_level': trino.constants.ISOLATION_LEVEL.READ_COMMITTED
        }
        
        # Add authentication if provided
        if self.auth:
            connection_params['auth'] = self.auth
        
        # Add SSL configuration
        if self.use_ssl:
            if not self.ssl_verify:
                connection_params['http_scheme'] = 'https'
                connection_params['verify'] = False
            else:
                connection_params['http_scheme'] = 'https'
        
        # Add session properties
        if self.properties:
            connection_params['session_properties'] = self.properties
        
        self.connection = trino.dbapi.connect(**connection_params)
        self.cursor = self.connection.cursor()
        
        print(f"Connected to Trino at {self.host}:{self.port}")
        return self.connection
    
    def execute_query(self, query: str, parameters: Optional[tuple] = None):
        """Execute a query and return results"""
        if not self.cursor:
            raise RuntimeError("No active connection. Call connect() first.")
        
        try:
            if parameters:
                self.cursor.execute(query, parameters)
            else:
                self.cursor.execute(query)
            
            return self.cursor.fetchall()
        except Exception as e:
            print(f"Query execution error: {e}")
            raise
    
    def get_query_info(self):
        """Get information about the last executed query"""
        if self.cursor:
            return self.cursor.stats
        return None
    
    def close(self):
        """Close the connection"""
        if self.connection:
            self.connection.close()
            print("Connection closed")

# Example usage with different configurations
def example_connections():
    # Basic connection
    basic_conn = TrinoConnection(
        catalog='memory',
        schema='default'
    )
    basic_conn.connect()
    
    # Test memory catalog
    result = basic_conn.execute_query("SELECT 'Hello Trino!' as message")
    print("Memory catalog test:", result)
    basic_conn.close()
    
    # PostgreSQL connection
    pg_conn = TrinoConnection(
        catalog='postgresql',
        schema='public',
        properties={
            'query_max_execution_time': '1h',
            'query_max_memory': '1GB'
        }
    )
    pg_conn.connect()
    
    # Query PostgreSQL through Trino
    employees = pg_conn.execute_query("SELECT * FROM employees LIMIT 5")
    print("\nEmployees from PostgreSQL:")
    for emp in employees:
        print(f"  {emp}")
    
    pg_conn.close()

if __name__ == "__main__":
    example_connections()

In [None]:
#!/usr/bin/env python3
"""
Trino integration with Pandas for data analysis
"""
import trino
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
import plotly.express as px
import plotly.graph_objects as go

class TrinoPandas:
    def __init__(self, host='localhost', port=8080, user='admin'):
        self.host = host
        self.port = port
        self.user = user
        
        # Create SQLAlchemy engine for pandas integration
        self.engine = create_engine(
            URL.create(
                drivername="trino",
                host=host,
                port=port,
                username=user,
                database="postgresql",  # Default catalog
                query={"schema": "public"}
            )
        )
    
    def query_to_dataframe(self, query: str) -> pd.DataFrame:
        """Execute query and return results as pandas DataFrame"""
        try:
            df = pd.read_sql_query(query, self.engine)
            return df
        except Exception as e:
            print(f"Error executing query: {e}")
            raise
    
    def dataframe_to_trino(self, df: pd.DataFrame, table_name: str, 
                          catalog: str = 'memory', schema: str = 'default'):
        """Upload DataFrame to Trino (memory catalog for testing)"""
        # For memory catalog, we need to create table with values
        # This is a simplified example - in production, use proper ETL
        
        conn = trino.dbapi.connect(
            host=self.host,
            port=self.port,
            user=self.user,
            catalog=catalog,
            schema=schema
        )
        
        cur = conn.cursor()
        
        # Generate CREATE TABLE AS SELECT statement
        columns = []
        values_list = []
        
        for col in df.columns:
            # Determine Trino data type based on pandas dtype
            if df[col].dtype == 'int64':
                trino_type = 'BIGINT'
            elif df[col].dtype == 'float64':
                trino_type = 'DOUBLE'
            elif df[col].dtype == 'bool':
                trino_type = 'BOOLEAN'
            else:
                trino_type = 'VARCHAR'
            
            columns.append(f"{col} {trino_type}")
        
        # Create table
        create_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            {', '.join(columns)}
        )
        """
        
        cur.execute(create_query)
        
        # Insert data (simplified - in practice, use bulk insert methods)
        for _, row in df.head(100).iterrows():  # Limit for demo
            values = []
            for val in row:
                if pd.isna(val):
                    values.append('NULL')
                elif isinstance(val, str):
                    values.append(f"'{val.replace('\'', '\'\'')}'")
                else:
                    values.append(str(val))
            
            insert_query = f"""
            INSERT INTO {table_name} VALUES ({', '.join(values)})
            """
            cur.execute(insert_query)
        
        conn.close()
        print(f"DataFrame uploaded to {catalog}.{schema}.{table_name}")

def data_analysis_examples():
    """Examples of data analysis with Trino and Pandas"""
    tp = TrinoPandas()
    
    # Employee analysis
    print("=== Employee Analysis ===")
    employees_df = tp.query_to_dataframe("""
        SELECT 
            name,
            department,
            salary,
            hire_date,
            EXTRACT(YEAR FROM hire_date) as hire_year
        FROM employees
        ORDER BY salary DESC
    """)
    
    print("Employee DataFrame shape:", employees_df.shape)
    print("\nFirst 5 employees:")
    print(employees_df.head())
    
    print("\nSalary statistics by department:")
    dept_stats = employees_df.groupby('department')['salary'].agg(['mean', 'min', 'max', 'count'])
    print(dept_stats)
    
    # Sales analysis with joins
    print("\n=== Sales Analysis ===")
    sales_analysis = tp.query_to_dataframe("""
        SELECT 
            s.product_name,
            s.category,
            s.amount,
            s.sale_date,
            e.name as employee_name,
            e.department as employee_department
        FROM sales s
        JOIN employees e ON s.employee_id = e.id
        ORDER BY s.amount DESC
    """)
    
    print("Sales with employee info:")
    print(sales_analysis.head())
    
    # Aggregated sales by category
    category_sales = tp.query_to_dataframe("""
        SELECT 
            s.category,
            COUNT(*) as number_of_sales,
            SUM(s.amount) as total_sales,
            AVG(s.amount) as average_sale,
            MIN(s.amount) as min_sale,
            MAX(s.amount) as max_sale
        FROM sales s
        GROUP BY s.category
        ORDER BY total_sales DESC
    """)
    
    print("\nSales by category:")
    print(category_sales)
    
    # Complex analytical query
    print("\n=== Advanced Analytics ===")
    advanced_analysis = tp.query_to_dataframe("""
        WITH monthly_sales AS (
            SELECT 
                EXTRACT(MONTH FROM sale_date) as month,
                EXTRACT(YEAR FROM sale_date) as year,
                category,
                SUM(amount) as monthly_total
            FROM sales
            GROUP BY EXTRACT(YEAR FROM sale_date), EXTRACT(MONTH FROM sale_date), category
        ),
        employee_performance AS (
            SELECT 
                e.name,
                e.department,
                COUNT(s.id) as sales_count,
                SUM(s.amount) as total_sales,
                AVG(s.amount) as avg_sale
            FROM employees e
            LEFT JOIN sales s ON e.id = s.employee_id
            GROUP BY e.name, e.department
        )
        SELECT * FROM employee_performance
        WHERE sales_count > 0
        ORDER BY total_sales DESC
    """)
    
    print("Employee performance:")
    print(advanced_analysis)
    
    # Create some visualizations
    create_visualizations(employees_df, sales_analysis, category_sales)

def create_visualizations(employees_df, sales_analysis, category_sales):
    """Create visualizations using Plotly"""
    
    # Salary distribution by department
    fig1 = px.box(employees_df, x='department', y='salary', 
                  title='Salary Distribution by Department')
    fig1.show()
    
    # Sales by category
    fig2 = px.bar(category_sales, x='category', y='total_sales',
                  title='Total Sales by Category')
    fig2.show()
    
    # Sales timeline
    sales_timeline = sales_analysis.groupby('sale_date')['amount'].sum().reset_index()
    fig3 = px.line(sales_timeline, x='sale_date', y='amount',
                   title='Sales Timeline')
    fig3.show()

if __name__ == "__main__":
    data_analysis_examples()

In [None]:
 #!/usr/bin/env python3
"""
Federated queries across multiple data sources
"""
import trino
import pandas as pd
import json

class TrinoFederatedQueries:
    def __init__(self, host='localhost', port=8080, user='admin'):
        self.host = host
        self.port = port
        self.user = user
    
    def get_connection(self, catalog=None, schema=None):
        """Get Trino connection for specific catalog/schema"""
        return trino.dbapi.connect(
            host=self.host,
            port=self.port,
            user=self.user,
            catalog=catalog,
            schema=schema
        )
    
    def cross_catalog_query(self):
        """Example of querying across multiple catalogs"""
        conn = self.get_connection()
        cur = conn.cursor()
        
        # Query combining PostgreSQL and memory catalogs
        federated_query = """
        SELECT 
            pg_emp.name,
            pg_emp.department,
            pg_emp.salary,
            mem_data.bonus_percentage
        FROM postgresql.public.employees pg_emp
        CROSS JOIN (
            VALUES 
                ('Engineering', 0.15),
                ('Marketing', 0.12),
                ('Sales', 0.10)
        ) AS mem_data(department, bonus_percentage)
        WHERE pg_emp.department = mem_data.department
        ORDER BY pg_emp.salary DESC
        """
        
        cur.execute(federated_query)
        results = cur.fetchall()
        
        print("Cross-catalog federated query results:")
        for row in results:
            name, dept, salary, bonus = row
            bonus_amount = salary * bonus
            print(f"{name} ({dept}): ${salary:,} + ${bonus_amount:,.2f} bonus")
        
        conn.close()
    
    def window_functions_example(self):
        """Advanced analytics with window functions"""
        conn = self.get_connection(catalog='postgresql', schema='public')
        cur = conn.cursor()
        
        window_query = """
        SELECT 
            name,
            department,
            salary,
            RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_salary_rank,
            DENSE_RANK() OVER (ORDER BY salary DESC) as overall_salary_rank,
            LAG(salary) OVER (PARTITION BY department ORDER BY salary) as prev_salary_in_dept,
            AVG(salary) OVER (PARTITION BY department) as dept_avg_salary,
            salary - AVG(salary) OVER (PARTITION BY department) as salary_vs_dept_avg
        FROM employees
        ORDER BY department, salary DESC
        """
        
        cur.execute(window_query)
        results = cur.fetchall()
        
        print("\nWindow functions analysis:")
        for row in results:
            print(f"{row[0]} - Dept Rank: {row[3]}, Overall Rank: {row[4]}, "
                  f"vs Dept Avg: ${row[7]:.2f}")
        
        conn.close()
    
    def array_and_json_operations(self):
        """Working with complex data types"""
        conn = self.get_connection(catalog='memory', schema='default')
        cur = conn.cursor()
        
        # Create table with complex types
        complex_query = """
        WITH sample_data AS (
            SELECT * FROM VALUES
            (1, 'John', ARRAY['python', 'sql', 'java'], 
             JSON '{"experience": 5, "certifications": ["aws", "gcp"]}'),
            (2, 'Jane', ARRAY['python', 'r', 'scala'], 
             JSON '{"experience": 7, "certifications": ["azure", "databricks"]}'),
            (3, 'Bob', ARRAY['java', 'javascript', 'go'], 
             JSON '{"experience": 3, "certifications": ["kubernetes"]}')
        ) AS t(id, name, skills, profile)
        
        SELECT 
            name,
            CARDINALITY(skills) as num_skills,
            ARRAY_JOIN(skills, ', ') as skills_list,
            JSON_EXTRACT_SCALAR(profile, '$.experience') as experience_years,
            JSON_EXTRACT(profile, '$.certifications') as certifications
        FROM sample_data
        WHERE CONTAINS(skills, 'python')
        """
        
        cur.execute(complex_query)
        results = cur.fetchall()
        
        print("\nComplex data types analysis:")
        for row in results:
            print(f"{row[0]}: {row[2]} ({row[1]} skills), "
                  f"{row[3]} years experience")
        
        conn.close()

def performance_monitoring():
    """Monitor query performance and statistics"""
    conn = trino.dbapi.connect(
        host='localhost',
        port=8080,
        user='admin',
        catalog='postgresql',
        schema='public'
    )
    
    cur = conn.cursor()
    
    # Execute a sample query
    query = """
    SELECT 
        department,
        COUNT(*) as employee_count,
        AVG(salary) as avg_salary,
        STDDEV(salary) as salary_stddev
    FROM employees
    GROUP BY department
    ORDER BY avg_salary DESC
    """
    
    cur.execute(query)
    results = cur.fetchall()
    
    # Get query statistics
    stats = cur.stats
    if stats:
        print("Query Performance Statistics:")
        print(f"  Query ID: {stats.get('queryId', 'N/A')}")
        print(f"  Elapsed Time: {stats.get('elapsedTimeMillis', 0)} ms")
        print(f"  CPU Time: {stats.get('cpuTimeMillis', 0)} ms")
        print(f"  Processed Rows: {stats.get('processedRows', 0)}")
        print(f"  Processed Bytes: {stats.get('processedBytes', 0)}")
        print(f"  Peak Memory: {stats.get('peakMemoryBytes', 0)} bytes")
    
    print("\nQuery Results:")
    for row in results:
        dept, count, avg_sal, stddev = row
        print(f"  {dept}: {count} employees, "
              f"avg ${avg_sal:.2f}, stddev ${stddev:.2f}")
    
    conn.close()

if __name__ == "__main__":
    tfq = TrinoFederatedQueries()
    
    print("=== Cross-Catalog Queries ===")
    tfq.cross_catalog_query()
    
    print("\n=== Window Functions ===")
    tfq.window_functions_example()
    
    print("\n=== Complex Data Types ===")
    tfq.array_and_json_operations()
    
    print("\n=== Performance Monitoring ===")
    performance_monitoring()

In [None]:
#!/usr/bin/env python3
"""
ETL pipeline using Python and Trino
"""
import trino
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import csv
from io import StringIO
import os

class TrinoETL:
    def __init__(self, host='localhost', port=8080, user='admin'):
        self.host = host
        self.port = port
        self.user = user
    
    def get_connection(self, catalog='memory', schema='default'):
        return trino.dbapi.connect(
            host=self.host,
            port=self.port,
            user=self.user,
            catalog=catalog,
            schema=schema
        )
    
    def create_sample_data(self):
        """Generate sample data files for ETL demonstration"""
        
        # Generate customer data
        customers = []
        for i in range(1000):
            customer = {
                'customer_id': i + 1,
                'first_name': f'Customer{i+1}',
                'last_name': f'LastName{i+1}',
                'email': f'customer{i+1}@example.com',
                'registration_date': (datetime.now() - timedelta(days=np.random.randint(1, 365))).strftime('%Y-%m-%d'),
                'country': np.random.choice(['US', 'UK', 'CA', 'DE', 'FR'], p=[0.4, 0.2, 0.15, 0.15, 0.1]),
                'age': np.random.randint(18, 80)
            }
            customers.append(customer)
        
        # Save as JSON
        with open('customers.json', 'w') as f:
            for customer in customers:
                f.write(json.dumps(customer) + '\n')
        
        # Generate orders data as CSV
        orders = []
        for i in range(5000):
            order = {
                'order_id': i + 1,
                'customer_id': np.random.randint(1, 1001),
                'product_name': np.random.choice([
                    'Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones',
                    'Tablet', 'Phone', 'Speaker', 'Camera', 'Printer'
                ]),
                'quantity': np.random.randint(1, 5),
                'unit_price': round(np.random.uniform(10, 1000), 2),
                'order_date': (datetime.now() - timedelta(days=np.random.randint(1, 90))).strftime('%Y-%m-%d'),
                'status': np.random.choice(['completed', 'pending', 'cancelled'], p=[0.8, 0.15, 0.05])
            }
            order['total_amount'] = round(order['quantity'] * order['unit_price'], 2)
            orders.append(order)
        
        # Save as CSV
        df_orders = pd.DataFrame(orders)
        df_orders.to_csv('orders.csv', index=False)
        
        print("Sample data files created: customers.json, orders.csv")
    
    def load_json_data(self):
        """Load JSON data into Trino memory catalog"""
        conn = self.get_connection()
        cur = conn.cursor()
        
        # Create customers table
        cur.execute("""
            CREATE TABLE IF NOT EXISTS customers (
                customer_id BIGINT,
                first_name VARCHAR,
                last_name VARCHAR,
                email VARCHAR,
                registration_date DATE,
                country VARCHAR,
                age INTEGER
            )
        """)
        
        # Load JSON data
        with open