In [1]:
%reload_ext sql
%config SqlMagic.autocommit=False
%config SqlMagic.autolimit=0
%config SqlMagic.autopandas=True
%config SqlMagic.displaylimit=200

In [2]:
%sql trino://localhost:9090/cuebiq/

'Connected: @cuebiq/'

In [3]:
pip install python-geohash

Note: you may need to restart the kernel to use updated packages.


In [4]:
import pandas as pd
import os
import geohash
from datetime import datetime, timedelta
import logging

In [5]:
# SQL engine
from trino.dbapi import connect 
from sqlalchemy import create_engine
import time

class TrinoEngine():
    def __init__(self):
        conn = connect(
            host="localhost",
            port=9090,
            catalog="cuebiq"
        )
        self.cur = conn.cursor()
        self.engine = create_engine("trino://localhost:9090/cuebiq/")
    
    def execute_statement(self, query:str) -> list:
        """
        Create and drop statements.
        """
        self.cur.execute(query)
        return self.cur.fetchall()
    
    def read_sql(self, query:str) -> pd.DataFrame: 
        """
        Select and insert into operations.
        """
        return pd.read_sql(query, self.engine)

sql_engine = TrinoEngine()

In [6]:
schema_name = {'cda': 'cuebiq.paas_cda_pe_v3'}
pe_tj_table = f"{schema_name['cda']}.trajectory_uplevelled"

In [10]:
# Test with gateway error handlling and continue working - working

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to process data for a single day
def process_day(event_date, country_code, sql_engine):
    try:
        # Read data from the SQL table
        pe_tj_df = sql_engine.read_sql(
            f"""
            SELECT 
                cuebiq_id,
                start_lat,
                start_lng,
                end_lat,
                end_lng,
                duration_minutes,
                length_meters
            FROM cuebiq.paas_cda_pe_v3.trajectory_uplevelled
            WHERE 
                event_date = {event_date}
                AND end_country = '{country_code}' 
                AND start_country = '{country_code}' 
            """
        )
        logging.info(f"Executing SQL query for date {event_date}")
        
        # Encode geohashes
        pe_tj_df['start_geohash5'] = pe_tj_df.apply(
            lambda x: geohash.encode(x['start_lat'], x['start_lng'], precision=5), axis=1)
        pe_tj_df['end_geohash5'] = pe_tj_df.apply(
            lambda x: geohash.encode(x['end_lat'], x['end_lng'], precision=5), axis=1)

        # Load cell lists from SQL
        try:
            celllist5 = sql_engine.read_sql(f"SELECT geohash5 AS geohash, no_of_unique_users FROM dedicated.pop_density.pd_{country_code}_{event_date}_agg5")
            geohash_dict5 = celllist5.set_index('geohash')['no_of_unique_users'].to_dict()
        except Exception as e:
            logging.warning(f"Failed to load geohash5 data for date {event_date}: {e}")
            geohash_dict5 = {}

        # Add user numbers to the aggregated data
        aggregated_df5 = pe_tj_df.groupby(['start_geohash5', 'end_geohash5']).agg({
            'cuebiq_id': 'count',
            'duration_minutes': ['mean', 'median', 'std'],
            'length_meters': ['mean', 'median', 'std']
        }).reset_index()
        aggregated_df5.columns = ['start_geohash5', 'end_geohash5', 'trip_count', 'm_duration_min', 'mdn_duration_min', 'sd_duration_min', 'm_length_m', 'mdn_length_m', 'sd_length_m']

        # Define the columns before mapping
        aggregated_df5['start_geohash_user'] = aggregated_df5['start_geohash5'].map(geohash_dict5)
        aggregated_df5['end_geohash_user'] = aggregated_df5['end_geohash5'].map(geohash_dict5)

        # Filter aggregated data
        filtered_df5 = aggregated_df5.dropna(subset=['start_geohash_user', 'end_geohash_user'])
        
        return filtered_df5

    except Exception as e:
        logging.error(f"Error processing data for date {event_date}: {e}")
        return pd.DataFrame()

# Function to insert data in chunks
def insert_data_in_chunks(df, table_name, engine, chunk_size=5000):
    for start in range(0, len(df), chunk_size):
        chunk = df.iloc[start:start + chunk_size]
        chunk.to_sql(table_name, engine, index=False, if_exists='append', method='multi')

# Main processing loop
def process_date_range(start_date, end_date, country_code, sql_engine):
    start_time = time.time()  # Record start time before processing loop
        
    current_date = start_date
    while current_date <= end_date:
        event_date = current_date.strftime('%Y%m%d')
        try:
            filtered_df5 = process_day(event_date, country_code, sql_engine)

            # Create the SQL engine
            output_schema_name = "od_matrix"
            final_table_5 = f"od_{country_code.lower()}_{event_date}_agg5"
            con = create_engine(f"trino://localhost:9090/dedicated/{output_schema_name}")

            # Create the SQL table with the correct name for 5-level geohash
            create_table_query_5 = f"""
            CREATE TABLE IF NOT EXISTS {final_table_5} (
                start_geohash5 varchar,
                start_geohash_user bigint,
                end_geohash5 varchar,
                end_geohash_user bigint,
                trip_count bigint,
                m_duration_min double,
                mdn_duration_min double,
                sd_duration_min double,
                m_length_m double,
                mdn_length_m double,
                sd_length_m double,
                partition_key bigint
            )
            WITH (
              partitioned_by = ARRAY['partition_key'],
              bucketed_by = ARRAY['end_geohash5'],
              bucket_count = 5
            )
            """

            with con.connect() as connection:
                connection.execute(create_table_query_5)

            # Add partition key and ensure correct data types for 5-level geohash
            filtered_df5['partition_key'] = (filtered_df5.index // 5000) + 1
            filtered_df5 = filtered_df5.astype({
                'start_geohash_user': 'int',
                'end_geohash_user': 'int',
                'trip_count': 'int',
                'partition_key': 'int'
            })

            # Insert data into the table with the correct name
            if not filtered_df5.empty:
                insert_data_in_chunks(filtered_df5, final_table_5, con, 5000)
                logging.info(f"Data inserted into {final_table_5}")
            else:
                logging.info(f"No data to insert for {final_table_5} for 5-level geohash")
        except Exception as e:
            logging.error(f"Failed to process data for date {event_date}: {e}")

        # Move to the next day
        current_date += timedelta(days=1)
    
    end_time = time.time()  # Record end time after processing loop
    total_time = end_time - start_time
    logging.info(f"Total processing time: {total_time:.2f} seconds")
    
process_date_range(datetime(2019, 11, 1), datetime(2019, 12, 31), 'ID', sql_engine)

2024-06-07 02:19:23,243 - INFO - Executing SQL query for date 20190528
2024-06-07 02:21:40,539 - INFO - Data inserted into od_id_20190528_agg5
2024-06-07 02:21:53,705 - INFO - Executing SQL query for date 20190529
2024-06-07 02:24:17,945 - INFO - Data inserted into od_id_20190529_agg5
2024-06-07 02:24:30,915 - INFO - Executing SQL query for date 20190530
2024-06-07 02:27:00,669 - INFO - Data inserted into od_id_20190530_agg5
2024-06-07 02:27:14,478 - INFO - Executing SQL query for date 20190531
2024-06-07 02:29:50,836 - INFO - Data inserted into od_id_20190531_agg5
2024-06-07 02:29:50,836 - INFO - Total processing time: 651.57 seconds


In [7]:
# For fillin missing data dates

import pandas as pd
import geohash
from datetime import datetime
import logging
from sqlalchemy import create_engine
import time

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to process data for a single day
def process_day(event_date, country_code, sql_engine):
    try:
        # Read data from the SQL table
        pe_tj_df = sql_engine.read_sql(
            f"""
            SELECT 
                cuebiq_id,
                start_lat,
                start_lng,
                end_lat,
                end_lng,
                duration_minutes,
                length_meters
            FROM cuebiq.paas_cda_pe_v3.trajectory_uplevelled
            WHERE 
                event_date = {event_date}
                AND end_country = '{country_code}' 
                AND start_country = '{country_code}' 
            """
        )
        logging.info(f"Executing SQL query for date {event_date}")
        
        # Encode geohashes
        pe_tj_df['start_geohash5'] = pe_tj_df.apply(
            lambda x: geohash.encode(x['start_lat'], x['start_lng'], precision=5), axis=1)
        pe_tj_df['end_geohash5'] = pe_tj_df.apply(
            lambda x: geohash.encode(x['end_lat'], x['end_lng'], precision=5), axis=1)

        # Load cell lists from SQL
        try:
            celllist5 = sql_engine.read_sql(f"SELECT geohash5 AS geohash, no_of_unique_users FROM dedicated.pop_density.pd_{country_code}_{event_date}_agg5")
            geohash_dict5 = celllist5.set_index('geohash')['no_of_unique_users'].to_dict()
        except Exception as e:
            logging.warning(f"Failed to load geohash5 data for date {event_date}: {e}")
            geohash_dict5 = {}

        # Add user numbers to the aggregated data
        aggregated_df5 = pe_tj_df.groupby(['start_geohash5', 'end_geohash5']).agg({
            'cuebiq_id': 'count',
            'duration_minutes': ['mean', 'median', 'std'],
            'length_meters': ['mean', 'median', 'std']
        }).reset_index()
        aggregated_df5.columns = ['start_geohash5', 'end_geohash5', 'trip_count', 'm_duration_min', 'mdn_duration_min', 'sd_duration_min', 'm_length_m', 'mdn_length_m', 'sd_length_m']

        # Define the columns before mapping
        aggregated_df5['start_geohash_user'] = aggregated_df5['start_geohash5'].map(geohash_dict5)
        aggregated_df5['end_geohash_user'] = aggregated_df5['end_geohash5'].map(geohash_dict5)

        # Filter aggregated data
        filtered_df5 = aggregated_df5.dropna(subset=['start_geohash_user', 'end_geohash_user'])
        
        return filtered_df5

    except Exception as e:
        logging.error(f"Error processing data for date {event_date}: {e}")
        return pd.DataFrame()

# Function to insert data in chunks
def insert_data_in_chunks(df, table_name, engine, chunk_size=5000):
    for start in range(0, len(df), chunk_size):
        chunk = df.iloc[start:start + chunk_size]
        chunk.to_sql(table_name, engine, index=False, if_exists='append', method='multi')

# Main processing loop for a list of dates
def process_date_list(date_list, country_code, sql_engine):
    start_time = time.time()  # Record start time before processing loop

    for event_date in date_list:
        try:
            filtered_df5 = process_day(event_date, country_code, sql_engine)

            # Create the SQL engine
            output_schema_name = "od_matrix"
            final_table_5 = f"od_{country_code.lower()}_{event_date}_agg5"
            con = create_engine(f"trino://localhost:9090/dedicated/{output_schema_name}")

            # Create the SQL table with the correct name for 5-level geohash
            create_table_query_5 = f"""
            CREATE TABLE IF NOT EXISTS {final_table_5} (
                start_geohash5 varchar,
                start_geohash_user bigint,
                end_geohash5 varchar,
                end_geohash_user bigint,
                trip_count bigint,
                m_duration_min double,
                mdn_duration_min double,
                sd_duration_min double,
                m_length_m double,
                mdn_length_m double,
                sd_length_m double,
                partition_key bigint
            )
            WITH (
              partitioned_by = ARRAY['partition_key'],
              bucketed_by = ARRAY['end_geohash5'],
              bucket_count = 5
            )
            """

            with con.connect() as connection:
                connection.execute(create_table_query_5)

            # Add partition key and ensure correct data types for 5-level geohash
            filtered_df5['partition_key'] = (filtered_df5.index // 5000) + 1
            filtered_df5 = filtered_df5.astype({
                'start_geohash_user': 'int',
                'end_geohash_user': 'int',
                'trip_count': 'int',
                'partition_key': 'int'
            })

            # Insert data into the table with the correct name
            if not filtered_df5.empty:
                insert_data_in_chunks(filtered_df5, final_table_5, con, 5000)
                logging.info(f"Data inserted into {final_table_5}")
            else:
                logging.info(f"No data to insert for {final_table_5} for 5-level geohash")
        except Exception as e:
            logging.error(f"Failed to process data for date {event_date}: {e}")

    end_time = time.time()  # Record end time after processing loop
    total_time = end_time - start_time
    logging.info(f"Total processing time: {total_time:.2f} seconds")

# Example usage with list of specific dates:
date_list = date_list = ['20191109', '20191120', '20191208', '20191230']

process_date_list(date_list, 'ID', sql_engine)


2024-06-08 16:05:59,959 - INFO - Executing SQL query for date 20191109
2024-06-08 16:09:19,968 - INFO - Data inserted into od_id_20191109_agg5
2024-06-08 16:09:33,519 - INFO - Executing SQL query for date 20191120
2024-06-08 16:12:28,127 - INFO - Data inserted into od_id_20191120_agg5
2024-06-08 16:12:40,973 - INFO - Executing SQL query for date 20191208
2024-06-08 16:13:56,404 - INFO - Data inserted into od_id_20191208_agg5
2024-06-08 16:14:21,484 - INFO - Executing SQL query for date 20191230
2024-06-08 16:15:52,949 - INFO - Data inserted into od_id_20191230_agg5
2024-06-08 16:15:52,950 - INFO - Total processing time: 607.66 seconds


In [13]:
# # the normal and working one. 

# # Set up logging
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# # Function to process data for a single day
# def process_day(event_date, country_code, sql_engine):
#     try:
#         # Read data from the SQL table
#         pe_tj_df = sql_engine.read_sql(
#             f"""
#             SELECT 
#                 cuebiq_id,
#                 start_lat,
#                 start_lng,
#                 end_lat,
#                 end_lng,
#                 duration_minutes,
#                 length_meters
#             FROM cuebiq.paas_cda_pe_v3.trajectory_uplevelled
#             WHERE 
#                 event_date = {event_date}
#                 AND end_country = '{country_code}' 
#                 AND start_country = '{country_code}' 
#             """
#         )
#         logging.info(f"Executing SQL query for date {event_date}")
        
#         # Encode geohashes
#         pe_tj_df['start_geohash5'] = pe_tj_df.apply(
#             lambda x: geohash.encode(x['start_lat'], x['start_lng'], precision=5), axis=1)
#         pe_tj_df['end_geohash5'] = pe_tj_df.apply(
#             lambda x: geohash.encode(x['end_lat'], x['end_lng'], precision=5), axis=1)

#         # Load cell lists from SQL
#         try:
#             celllist5 = sql_engine.read_sql(f"SELECT geohash5 AS geohash, no_of_unique_users FROM dedicated.pop_density.pd_{country_code}_{event_date}_agg5")
#             geohash_dict5 = celllist5.set_index('geohash')['no_of_unique_users'].to_dict()
#         except Exception as e:
#             logging.warning(f"Failed to load geohash5 data for date {event_date}: {e}")
#             geohash_dict5 = {}

#         # Add user numbers to the aggregated data
#         aggregated_df5 = pe_tj_df.groupby(['start_geohash5', 'end_geohash5']).agg({
#             'cuebiq_id': 'count',
#             'duration_minutes': ['mean', 'median', 'std'],
#             'length_meters': ['mean', 'median', 'std']
#         }).reset_index()
#         aggregated_df5.columns = ['start_geohash5', 'end_geohash5', 'trip_count', 'm_duration_min', 'mdn_duration_min', 'sd_duration_min', 'm_length_m', 'mdn_length_m', 'sd_length_m']

#         # Define the columns before mapping
#         aggregated_df5['start_geohash_user'] = aggregated_df5['start_geohash5'].map(geohash_dict5)
#         aggregated_df5['end_geohash_user'] = aggregated_df5['end_geohash5'].map(geohash_dict5)

#         # Filter aggregated data
#         filtered_df5 = aggregated_df5.dropna(subset=['start_geohash_user', 'end_geohash_user'])
        
#         return filtered_df5

#     except Exception as e:
#         logging.error(f"Error processing data for date {event_date}: {e}")
#         return pd.DataFrame()

# # Function to insert data in chunks
# def insert_data_in_chunks(df, table_name, engine, chunk_size=5000):
#     for start in range(0, len(df), chunk_size):
#         chunk = df.iloc[start:start + chunk_size]
#         chunk.to_sql(table_name, engine, index=False, if_exists='append', method='multi')

# # Main processing loop
# def process_date_range(start_date, end_date, country_code, sql_engine):
#     start_time = time.time()  # Record start time before processing loop
        
#     current_date = start_date
#     while current_date <= end_date:
#         event_date = current_date.strftime('%Y%m%d')
        
#         filtered_df5 = process_day(event_date, country_code, sql_engine)

#         # Create the SQL engine
#         output_schema_name = "od_matrix"
#         final_table_5 = f"od_{country_code.lower()}_{event_date}_agg5"
#         con = create_engine(f"trino://localhost:9090/dedicated/{output_schema_name}")

#         # Create the SQL table with the correct name for 5-level geohash
#         create_table_query_5 = f"""
#         CREATE TABLE IF NOT EXISTS {final_table_5} (
#             start_geohash5 varchar,
#             start_geohash_user bigint,
#             end_geohash5 varchar,
#             end_geohash_user bigint,
#             trip_count bigint,
#             m_duration_min double,
#             mdn_duration_min double,
#             sd_duration_min double,
#             m_length_m double,
#             mdn_length_m double,
#             sd_length_m double,
#             partition_key bigint
#         )
#         WITH (
#           partitioned_by = ARRAY['partition_key'],
#           bucketed_by = ARRAY['end_geohash5'],
#           bucket_count = 5
#         )
#         """

#         with con.connect() as connection:
#             connection.execute(create_table_query_5)

#         # Add partition key and ensure correct data types for 5-level geohash
#         filtered_df5['partition_key'] = (filtered_df5.index // 5000) + 1
#         filtered_df5 = filtered_df5.astype({
#             'start_geohash_user': 'int',
#             'end_geohash_user': 'int',
#             'trip_count': 'int',
#             'partition_key': 'int'
#         })

#         # Insert data into the table with the correct name
#         if not filtered_df5.empty:
#             insert_data_in_chunks(filtered_df5, final_table_5, con, 5000)
#             logging.info(f"Data inserted into {final_table_5}")
#         else:
#             logging.info(f"No data to insert for {final_table_5} for 5-level geohash")

#         # Move to the next day
#         current_date += timedelta(days=1)
    
#     end_time = time.time()  # Record end time after processing loop
#     total_time = end_time - start_time
#     logging.info(f"Total processing time: {total_time:.2f} seconds")

# # Example usage:
# process_date_range(datetime(2019, 5, 28), datetime(2019, 5, 31), 'ID', sql_engine)


2024-06-06 22:08:45,701 - INFO - Executing SQL query for date 20190515
2024-06-06 22:11:16,100 - INFO - Data inserted into od_id_20190515_agg5
2024-06-06 22:11:28,346 - INFO - Executing SQL query for date 20190516
2024-06-06 22:13:52,708 - INFO - Data inserted into od_id_20190516_agg5
2024-06-06 22:14:09,505 - INFO - Executing SQL query for date 20190517
2024-06-06 22:16:36,709 - INFO - Data inserted into od_id_20190517_agg5
2024-06-06 22:16:48,650 - INFO - Executing SQL query for date 20190518
2024-06-06 22:19:15,876 - INFO - Data inserted into od_id_20190518_agg5
2024-06-06 22:19:26,551 - INFO - Executing SQL query for date 20190519
2024-06-06 22:21:42,681 - INFO - Data inserted into od_id_20190519_agg5
2024-06-06 22:21:54,795 - INFO - Executing SQL query for date 20190520
2024-06-06 22:24:24,868 - INFO - Data inserted into od_id_20190520_agg5
2024-06-06 22:24:24,869 - INFO - Total processing time: 956.96 seconds
