In this Notebook : we implement ways of optimizing the code that generates the matrix during the animation

# existing solution 

In [None]:
arguments = ['0', '59', '1', '-180', '-90', '180', '90', '2023-06-01 00:00:00', '1440', 'SECOND', '/home/ali/matrices/60_1', 'mobilitydb', 'processed_data', 'MMSI', 'traj']
arguments

In [None]:
"""

File used to create the matrices for the time deltas between the begin_frame and end_frame.

The matrices are saved in the /home/ali/matrices/ folder.


            # arguments :[ 
            # 0: begin_frame, 
            # 1: end_frame, 
            # 2: PERCENTAGE_OF_OBJECTS, 
            # 3: x_min, 4: y_min, 5: x_max, 6: y_max, 
            # 7: start_timestamp, 
            # 8: total_frames, 
            # 9: granularity, 
            # 10: matrix_directory_path, 
            # 11: database_name, 
            # 12: table_name, 
            # 13: id_column_name, 
            # 14: tpoint_column_name]

To measure the size of matrices folder : du -sh --block-size=MB matrices            

"""

import numpy as np
from shapely.geometry import Point
from pymeos.db.psycopg import MobilityDB

from pymeos import *
import os
import sys
from datetime import timedelta, datetime
from pymeos import *
import time

logs = ""
now = time.time()


args = arguments
logs += f"Args: {args}\n"
begin_frame = int(args[0])
end_frame = int(args[1])
TIME_DELTA_SIZE = end_frame - begin_frame + 1
PERCENTAGE_OF_OBJECTS = float(args[2])


SRID = 4326


DATABASE_NAME = args[11]
TPOINT_TABLE_NAME = args[12]
TPOINT_ID_COLUMN_NAME = args[13]
TPOINT_COLUMN_NAME = args[14]



class Database_connector:
    """
    Singleton class used to connect to the MobilityDB database.
    """
    
    def __init__(self):
        try: 
            connection_params = {
            "host": "localhost",
            "port": 5432,
            "dbname": DATABASE_NAME,
            "user": "postgres",
            "password": "postgres"
            }
            self.table_name = TPOINT_TABLE_NAME
            self.id_column_name = TPOINT_ID_COLUMN_NAME
            self.tpoint_column_name = TPOINT_COLUMN_NAME               
            self.connection = MobilityDB.connect(**connection_params)

            self.cursor = self.connection.cursor()

            self.cursor.execute(f"SELECT {self.id_column_name} FROM public.{self.table_name};")
            self.ids_list = self.cursor.fetchall()
            self.ids_list = self.ids_list[:int(len(self.ids_list)*PERCENTAGE_OF_OBJECTS)]
        except Exception as e:
            pass

  
    def get_subset_of_tpoints(self, pstart, pend, xmin, ymin, xmax, ymax):
        """
        For each object in the ids_list :
            Fetch the subset of the associated Tpoints between the start and end timestamps
            contained in the STBOX defined by the xmin, ymin, xmax, ymax.
        """
        try:
           
            ids_list = [ f"'{id[0]}'"  for id in self.ids_list]
            ids_str = ', '.join(map(str, ids_list))
          
            query = f"""
                    SELECT 
                        atStbox(
                            a.{self.tpoint_column_name}::tgeompoint,
                            stbox(
                                ST_MakeEnvelope(
                                    {xmin}, {ymin}, -- xmin, ymin
                                    {xmax}, {ymax}, -- xmax, ymax
                                    4326 -- SRID
                                ),
                                tstzspan('[{pstart}, {pend}]')
                            )
                        )
                    FROM public.{self.table_name} as a 
                    WHERE a.{self.id_column_name} in ({ids_str});
                    """
            self.cursor.execute(query)
            # print(query)
            rows = self.cursor.fetchall()
            return rows
        except Exception as e:
            # print(e)
            pass


    def close(self):
        """
        Close the connection to the MobilityDB database.
        """
        self.cursor.close()
        self.connection.close()


MATRIX_DIRECTORY_PATH = "/home/ali/matrices"
file_name = f"{args[10]}/matrix_{begin_frame}.npy"


  
Time_granularities = {
                    # "MILLISECOND" : timedelta(milliseconds=1),
                      "SECOND" : timedelta(seconds=1),
                      "MINUTE" : timedelta(minutes=1),
                    #   "HOUR" : timedelta(hours=1),
                    }



pymeos_initialize()
db = Database_connector()

x_min = float(args[3])
y_min = float(args[4])
x_max = float(args[5])
y_max = float(args[6])

start_date = args[7]
start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')


total_frames = int(args[8])
GRANULARITY = Time_granularities[args[9]]

timestamps = []
for i in range(total_frames): 
    timestamps.append(start_date + i*GRANULARITY)



p_start = timestamps[begin_frame]
p_end = timestamps[end_frame]
# print(p_start, p_end, x_min, y_min, x_max, y_max)
now_db = time.time()
rows = db.get_subset_of_tpoints(p_start, p_end, x_min, y_min, x_max, y_max)    


In [None]:
logs += f"Time to fetch subset of tpoints: {time.time() - now_db} seconds\n"
        
empty_point_wkt = Point().wkt  # "POINT EMPTY"
matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)

time_ranges = timestamps
now = time.time()

for i in range(len(rows)):
    try:
        traj = rows[i][0]
        traj = traj.temporal_precision(GRANULARITY) 
        num_instants = traj.num_instants()
        if num_instants == 0:
            continue
        elif num_instants == 1:
            single_timestamp = traj.timestamps()[0].replace(tzinfo=None)
            index = time_ranges.index(single_timestamp) - begin_frame
            matrix[i][index] = traj.values()[0].wkt
        
        elif num_instants >= 2:
            traj_resampled = traj.temporal_sample(start=time_ranges[0],duration= GRANULARITY)
            
            start_index = time_ranges.index( traj_resampled.start_timestamp().replace(tzinfo=None) ) - begin_frame
            end_index = time_ranges.index( traj_resampled.end_timestamp().replace(tzinfo=None) ) - begin_frame
    
            trajectory_array = np.array([point.wkt for point in traj_resampled.values()])
            matrix[i, start_index:end_index+1] = trajectory_array

    except:
        continue

np.save(file_name, matrix)

db.close()
pymeos_finalize()
total_time = time.time() - now
frames_for_30_fps= 30 * total_time
print(f"================================================================     Matrix {begin_frame} created in {total_time} seconds, {frames_for_30_fps} frames for 30 fps animation.")
logs += f"time to create and fill the matrix {begin_frame}: {total_time} seconds\n"


# Moving the resampling of trajectories in the database query

In [7]:

import numpy as np
from shapely.geometry import Point
from pymeos.db.psycopg import MobilityDB

from pymeos import *
import os
import sys
from datetime import timedelta, datetime
from pymeos import *
import time

logs = ""
now = time.time()

FPS_DEQUEUE_SIZE = 5 # Length of the dequeue to calculate the average FPS
TIME_DELTA_DEQUEUE_SIZE =  10 # Length of the dequeue to keep the keys to keep in the buffer


args = sysargs
logs += f"Args: {args}\n"
begin_frame = int(args[0])
end_frame = int(args[1])
TIME_DELTA_SIZE = end_frame - begin_frame + 1
PERCENTAGE_OF_OBJECTS = float(args[2])


SRID = 4326


DATABASE_NAME = args[11]
TPOINT_TABLE_NAME = args[12]
TPOINT_ID_COLUMN_NAME = args[13]
TPOINT_COLUMN_NAME = args[14]



class Database_connector2:
    """
    Singleton class used to connect to the MobilityDB database.
    """
    
    def __init__(self):
        try: 
            connection_params = {
            "host": "localhost",
            "port": 5432,
            "dbname": DATABASE_NAME,
            "user": "postgres",
            "password": "postgres"
            }
            self.table_name = TPOINT_TABLE_NAME
            self.id_column_name = TPOINT_ID_COLUMN_NAME
            self.tpoint_column_name = TPOINT_COLUMN_NAME               
            self.connection = MobilityDB.connect(**connection_params)

            self.cursor = self.connection.cursor()

            self.cursor.execute(f"SELECT {self.id_column_name} FROM public.{self.table_name};")
            self.ids_list = self.cursor.fetchall()
            self.ids_list = self.ids_list[:int(len(self.ids_list)*PERCENTAGE_OF_OBJECTS)]
        except Exception as e:
            pass

  
    def get_subset_of_tpoints(self, pstart, pend, xmin, ymin, xmax, ymax):
        """
        For each object in the ids_list :
            Fetch the subset of the associated Tpoints between the start and end timestamps
            contained in the STBOX defined by the xmin, ymin, xmax, ymax.
        """
        try:
           
            ids_list = [ f"'{id[0]}'"  for id in self.ids_list]
            ids_str = ', '.join(map(str, ids_list))
          
            query = f"""
                    WITH trajectories as (
                    SELECT 
                        atStbox(
                            a.{self.tpoint_column_name}::tgeompoint,
                            stbox(
                                ST_MakeEnvelope(
                                    {xmin}, {ymin}, -- xmin, ymin
                                    {xmax}, {ymax}, -- xmax, ymax
                                    4326 -- SRID
                                ),
                                tstzspan('[{pstart}, {pend}]')
                            )
                        ) as trajectory
                    FROM public.{self.table_name} as a 
                    WHERE a.{self.id_column_name} in ({ids_str}))

                    SELECT tsample(trajectory, INTERVAL '1 minute', TIMESTAMP '2023-06-01 00:00:00')  AS resampled_trajectory
                        FROM 
                            trajectories ;
 
                    """
            self.cursor.execute(query)
            # print(query)
            rows = self.cursor.fetchall()
            return rows
        except Exception as e:
            # print(e)
            pass


    def get_min_timestamp(self):
        """
        Returns the min timestamp of the tpoints columns.

        """
        try:
            
            self.cursor.execute(f"SELECT MIN(startTimestamp({self.tpoint_column_name})) AS earliest_timestamp FROM public.{self.table_name};")
            return self.cursor.fetchone()[0]
        except Exception as e:
            pass

    def get_max_timestamp(self):
        """
        Returns the max timestamp of the tpoints columns.

        """
        try:
            self.cursor.execute(f"SELECT MAX(endTimestamp({self.tpoint_column_name})) AS latest_timestamp FROM public.{self.table_name};")
            return self.cursor.fetchone()[0]
        except Exception as e:
            pass


    def close(self):
        """
        Close the connection to the MobilityDB database.
        """
        self.cursor.close()
        self.connection.close()


MATRIX_DIRECTORY_PATH = "/home/ali/matrices"
file_name = f"/home/ali/matrices/matrix_{begin_frame}.npy"


  
Time_granularities = {
                    # "MILLISECOND" : timedelta(milliseconds=1),
                      "SECOND" : timedelta(seconds=1),
                      "MINUTE" : timedelta(minutes=1),
                    #   "HOUR" : timedelta(hours=1),
                    }


# check if file does't already exist

pymeos_initialize()
db = Database_connector2()

x_min = float(args[3])
y_min = float(args[4])
x_max = float(args[5])
y_max = float(args[6])

start_date = args[7]
start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')


total_frames = int(args[8])
GRANULARITY = Time_granularities[args[9]]

timestamps = []
for i in range(total_frames): 
    timestamps.append(start_date + i*GRANULARITY)



p_start = timestamps[begin_frame]
p_end = timestamps[end_frame]
# print(p_start, p_end, x_min, y_min, x_max, y_max)
now_db = time.time()
rows = db.get_subset_of_tpoints(p_start, p_end, x_min, y_min, x_max, y_max)    

print(f"Time to fetch tpoints : {time.time() - now_db} s")    
        

Time to fetch tpoints : 1.8734567165374756 s


In [15]:
empty_point_wkt = Point()  # "POINT EMPTY"
matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)

time_ranges = timestamps
now = time.time()

# try:
for i in range(len(rows)):
    if rows[i][0] is not None:
        try:
            traj_resampled = rows[i][0]

            start_index = time_ranges.index( traj_resampled.start_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) - begin_frame
            end_index = time_ranges.index( traj_resampled.end_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) - begin_frame
            matrix[i, start_index:end_index+1] = np.array(traj_resampled.values())
    
        except:
            print(i)
            # continue


# db.close()
# pymeos_finalize()
total_time = time.time() - now
frames_for_30_fps= 30 * total_time
print(f"Matrix {begin_frame} created in {total_time} seconds, {frames_for_30_fps} frames for 30 fps animation.")
# logs += f"time to create and fill the matrix {begin_frame}: {total_time} seconds\n"


Matrix 0 created in 15.015557765960693 seconds, 450.4667329788208 frames for 30 fps animation.


In [32]:
np.count_nonzero(matrix != 'POINT EMPTY')

2794080

In [26]:
%load_ext line_profiler

In [29]:
def opr(rows):
    empty_point_wkt = Point()  # "POINT EMPTY"
    matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)

    time_ranges = timestamps
    now = time.time()

    for i in range(len(rows)):
        if rows[i][0] is not None:
            try:
                traj_resampled = rows[i][0]
                # num_instants = traj_resampled.num_instants()
                # if num_instants == 1:
                #     # print(f"{i} has one instant")
                #     single_timestamp = traj_resampled.timestamps()[0].replace(tzinfo=None).replace(second=0, microsecond=0)
                #     index = time_ranges.index(single_timestamp) - begin_frame
                #     matrix[i][index] = traj.values()[0].wkt
                
                # elif num_instants >= 2:
                    # traj_resampled = traj.temporal_sample(start=time_ranges[0],duration= GRANULARITY)
                    
                start_index = time_ranges.index( traj_resampled.start_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) - begin_frame
                end_index = time_ranges.index( traj_resampled.end_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) - begin_frame
                vals = traj_resampled.values()
                trajectory_array = np.array(vals)
                matrix[i, start_index:end_index+1] = trajectory_array
        
            except:
                continue


    # db.close()
    # pymeos_finalize()
    total_time = time.time() - now
    frames_for_30_fps= 30 * total_time
    print(f"Matrix {begin_frame} created in {total_time} seconds, {frames_for_30_fps} frames for 30 fps animation.")
    # logs += f"time to create and fill the matrix {begin_frame}: {total_time} seconds\n"

    

In [30]:
%lprun -f opr opr(rows)

Matrix 0 created in 25.74210810661316 seconds, 772.2632431983948 frames for 30 fps animation.


Timer unit: 1e-09 s

Total time: 25.1043 s
File: /tmp/ipykernel_5042/310096134.py
Function: opr at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def opr(rows):
     2         1      54306.0  54306.0      0.0      empty_point_wkt = Point()  # "POINT EMPTY"
     3         1    6041035.0    6e+06      0.0      matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)
     4                                           
     5         1        392.0    392.0      0.0      time_ranges = timestamps
     6         1        716.0    716.0      0.0      now = time.time()
     7                                           
     8      5822    1132901.0    194.6      0.0      for i in range(len(rows)):
     9      5821    1072371.0    184.2      0.0          if rows[i][0] is not None:
    10      4174     365683.0     87.6      0.0              try:
    11      4174     670280.0    160.6      0.0             

In [22]:
now = time.time()

for i in range(len(rows)):
    if rows[i][0] is not None:
        try:
            traj_resampled = rows[i][0]

            start_index = time_ranges.index( traj_resampled.start_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) - begin_frame
            end_index = time_ranges.index( traj_resampled.end_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) - begin_frame
            vals = traj_resampled.values()
            trajectory_array = np.array([point.wkt for point in vals])
            matrix[i, start_index:end_index+1] = trajectory_array

        except:
            continue
        
print(f" Time to fill the matrix : {time.time() - now} s")

 Time to fill the matrix : 19.8060622215271 s


In [24]:
x1 = TGeomPointInst(point=(0, 0), timestamp=timestamps[0])
x2 = TGeomPointInst(point=(1, 1), timestamp=timestamps[TIME_DELTA_SIZE-1])
traj = TGeomPointSeq.from_instants([x1, x2], upper_inc=True)
traj_resampled = traj.temporal_sample(start=timestamps[0],duration= GRANULARITY)
empty_point_wkt = Point().wkt  # "POINT EMPTY"
matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)
trajectory_array = np.array([point.wkt for point in traj_resampled.values()])
trajectory_array

now = time.time()

for i in range(len(rows)):
    if 1 == 1 :
        try:
            start_index = timestamps.index( traj_resampled.start_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) 
            end_index = timestamps.index( traj_resampled.end_timestamp().replace(tzinfo=None).replace(second=0, microsecond=0) ) 

            trajectory_array = np.array([point.wkt for point in traj_resampled.values()])
            matrix[i, start_index:end_index+1] = trajectory_array
        except:
            continue
print(f"Time to fill the matrix : {time.time() - now} s")

Time to fill the matrix : 23.57325553894043 s


In [28]:
%lprun -f opr opr(rows)

Matrix 0 created in 34.37529969215393 seconds, 1031.258990764618 frames for 30 fps animation.


Timer unit: 1e-09 s

Total time: 33.4186 s
File: /tmp/ipykernel_5042/2884772419.py
Function: opr at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
     1                                           def opr(rows):
     2         1      68870.0  68870.0      0.0      empty_point_wkt = Point()  # "POINT EMPTY"
     3         1    6289944.0    6e+06      0.0      matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)
     4                                           
     5         1        410.0    410.0      0.0      time_ranges = timestamps
     6         1        889.0    889.0      0.0      now = time.time()
     7                                           
     8      5822    1172389.0    201.4      0.0      for i in range(len(rows)):
     9      5821    1257898.0    216.1      0.0          if rows[i][0] is not None:
    10      4174     417364.0    100.0      0.0              try:
    11      4174     507546.0    121.6      0.0            

In [12]:
pip install line_profiler

Collecting line_profiler
  Downloading line_profiler-4.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (34 kB)
Downloading line_profiler-4.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (720 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m720.6/720.6 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: line_profiler
Successfully installed line_profiler-4.1.3
Note: you may need to restart the kernel to use updated packages.


In [9]:
np.count_nonzero(matrix != 'POINT EMPTY')

865790

# Moving Resampling AND the operations of finding the indexes in database


In [2]:
arguments = ['0', '59', '1', '-180', '-90', '180', '90', '2023-06-01 00:00:00', '1440', 'SECOND', '/home/ali/matrices/60_1', 'mobilitydb', 'processed_data', 'MMSI', 'traj']
arguments

['0',
 '59',
 '1',
 '-180',
 '-90',
 '180',
 '90',
 '2023-06-01 00:00:00',
 '1440',
 'SECOND',
 '/home/ali/matrices/60_1',
 'mobilitydb',
 'processed_data',
 'MMSI',
 'traj']

In [4]:

import numpy as np
from shapely.geometry import Point
from pymeos.db.psycopg import MobilityDB

from pymeos import *
import os
import sys
from datetime import timedelta, datetime
from pymeos import *
import time

logs = ""
now = time.time()

FPS_DEQUEUE_SIZE = 5 # Length of the dequeue to calculate the average FPS
TIME_DELTA_DEQUEUE_SIZE =  10 # Length of the dequeue to keep the keys to keep in the buffer


args = arguments
logs += f"Args: {args}\n"
begin_frame = int(args[0])
end_frame = int(args[1])
TIME_DELTA_SIZE = end_frame - begin_frame + 1
PERCENTAGE_OF_OBJECTS = float(args[2])


SRID = 4326


DATABASE_NAME = args[11]
TPOINT_TABLE_NAME = args[12]
TPOINT_ID_COLUMN_NAME = args[13]
TPOINT_COLUMN_NAME = args[14]



class Database_connector3:
    """
    Singleton class used to connect to the MobilityDB database.
    """
    
    def __init__(self):
        try: 
            connection_params = {
            "host": "localhost",
            "port": 5432,
            "dbname": DATABASE_NAME,
            "user": "postgres",
            "password": "postgres"
            }
            self.table_name = TPOINT_TABLE_NAME
            self.id_column_name = TPOINT_ID_COLUMN_NAME
            self.tpoint_column_name = TPOINT_COLUMN_NAME               
            self.connection = MobilityDB.connect(**connection_params)

            self.cursor = self.connection.cursor()

            self.cursor.execute(f"SELECT {self.id_column_name} FROM public.{self.table_name};")
            self.ids_list = self.cursor.fetchall()
            self.ids_list = self.ids_list[:int(len(self.ids_list)*PERCENTAGE_OF_OBJECTS)]
        except Exception as e:
            pass

  
    def get_subset_of_tpoints(self, pstart, pend, xmin, ymin, xmax, ymax, begin_frame):
        """
        For each object in the ids_list :
            Fetch the subset of the associated Tpoints between the start and end timestamps
            contained in the STBOX defined by the xmin, ymin, xmax, ymax.
        """
        try:
           
            ids_list = [ f"'{id[0]}'"  for id in self.ids_list]
            ids_str = ', '.join(map(str, ids_list))
          
            query = f"""
                    WITH trajectories as (
                    SELECT 
                        atStbox(
                            a.{self.tpoint_column_name}::tgeompoint,
                            stbox(
                                ST_MakeEnvelope(
                                    {xmin}, {ymin}, -- xmin, ymin
                                    {xmax}, {ymax}, -- xmax, ymax
                                    4326 -- SRID
                                ),
                                tstzspan('[{pstart}, {pend}]')
                            )
                        ) as trajectory
                    FROM public.{self.table_name} as a 
                    WHERE a.{self.id_column_name} in ({ids_str})),

                    resampled as (

                    SELECT tsample(traj.trajectory, INTERVAL '1 minute', TIMESTAMP '2023-06-01 00:00:00')  AS resampled_trajectory
                        FROM 
                            trajectories as traj)
				
                    SELECT
                            ( EXTRACT(EPOCH FROM (startTimestamp(rs.resampled_trajectory) - '2023-06-01 00:00:00'::timestamp))::integer / 60 ) - {begin_frame} AS start_index ,
                            ( EXTRACT(EPOCH FROM (endTimestamp(rs.resampled_trajectory) - '2023-06-01 00:00:00'::timestamp))::integer / 60 ) - {begin_frame} AS end_index,
                            rs.resampled_trajectory
                    FROM resampled as rs ;
 
                    """
            self.cursor.execute(query)
            # print(query)
            rows = self.cursor.fetchall()
            return rows
        except Exception as e:
            # print(e)
            pass


    def get_min_timestamp(self):
        """
        Returns the min timestamp of the tpoints columns.

        """
        try:
            
            self.cursor.execute(f"SELECT MIN(startTimestamp({self.tpoint_column_name})) AS earliest_timestamp FROM public.{self.table_name};")
            return self.cursor.fetchone()[0]
        except Exception as e:
            pass

    def get_max_timestamp(self):
        """
        Returns the max timestamp of the tpoints columns.

        """
        try:
            self.cursor.execute(f"SELECT MAX(endTimestamp({self.tpoint_column_name})) AS latest_timestamp FROM public.{self.table_name};")
            return self.cursor.fetchone()[0]
        except Exception as e:
            pass


    def close(self):
        """
        Close the connection to the MobilityDB database.
        """
        self.cursor.close()
        self.connection.close()


MATRIX_DIRECTORY_PATH = "/home/ali/matrices"
file_name = f"/home/ali/matrices/matrix_{begin_frame}.npy"


  
Time_granularities = {
                    # "MILLISECOND" : timedelta(milliseconds=1),
                      "SECOND" : timedelta(seconds=1),
                      "MINUTE" : timedelta(minutes=1),
                    #   "HOUR" : timedelta(hours=1),
                    }


# check if file does't already exist

pymeos_initialize()
db = Database_connector3()

x_min = float(args[3])
y_min = float(args[4])
x_max = float(args[5])
y_max = float(args[6])

start_date = args[7]
start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')


total_frames = int(args[8])
GRANULARITY = Time_granularities[args[9]]

timestamps = []
for i in range(total_frames): 
    timestamps.append(start_date + i*GRANULARITY)



p_start = timestamps[begin_frame]
p_end = timestamps[end_frame]
# print(p_start, p_end, x_min, y_min, x_max, y_max)
now_db = time.time()
rows = db.get_subset_of_tpoints(p_start, p_end, x_min, y_min, x_max, y_max, begin_frame)    

print(f"Time to fetch tpoints : {time.time() - now_db} s")    
        

Time to fetch tpoints : 2.164764165878296 s


In [5]:
empty_point_wkt = Point()  # "POINT EMPTY"
matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)

time_ranges = timestamps
now = time.time()

# try:
for i in range(len(rows)):
    if rows[i][2] is not None:
        try:
            traj_resampled = rows[i][2]

            start_index = rows[i][0] 
            end_index = rows[i][1]
            matrix[i, start_index:end_index+1] = np.array(traj_resampled.values())
    
        except:
            print(i)
            # continue


# db.close()
# pymeos_finalize()
total_time = time.time() - now
frames_for_30_fps= 30 * total_time
print(f"Matrix {begin_frame} created in {total_time} seconds, {frames_for_30_fps} frames for 30 fps animation.")
# logs += f"time to create and fill the matrix {begin_frame}: {total_time} seconds\n"


Matrix 0 created in 0.14333271980285645 seconds, 4.299981594085693 frames for 30 fps animation.


In [15]:
np.count_nonzero(matrix != 'POINT EMPTY')

2794080

# Measuring nditer vs for loop for feature generation

Python For loops are expensives, numpy offers the nditer tool to iterate over its arrays, but on small operations this might not be worth the hassle to implement

In [29]:
%%timeit

empty_point_wkt = Point().wkt  # "POINT EMPTY"
# create a numpy array of size len(ids_list) with empty_point_wkt
starting_points = np.full((1, 5821), empty_point_wkt, dtype=object)

qgis_fields_list = []

for wkt in np.nditer(starting_points, flags=['refs_ok']):
    feat = ["vlayer_fields"]
    feat.append("datetime_obj")  # Set its attributes
    # Create geometry from WKT string
    feat.append(wkt.item())
    qgis_fields_list.append(feat)
        

962 µs ± 39.9 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [31]:
%%timeit

empty_point_wkt = Point().wkt  # "POINT EMPTY"

qgis_fields_list = []

for i in range(5821):
    feat = ["vlayer_fields"]
    feat.append("datetime_obj")  # Set its attributes
    # Create geometry from WKT string
    feat.append(empty_point_wkt)
    qgis_fields_list.append(feat)

378 µs ± 2.94 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


# Annex code to build the threading solution

In [4]:
# TODO : Include the PYQGIS imports for the plugin
from pymeos.db.psycopg import MobilityDB
from pymeos import *
from datetime import datetime, timedelta
import time
from collections import deque
from pympler import asizeof
import gc
from enum import Enum
import numpy as np
from shapely.geometry import Point
import math
import subprocess
import shutil
import os
import sys

pymeos_initialize()
DATABASE_NAME = "mobilitydb"
TPOINT_TABLE_NAME = "PyMEOS_demo"
TPOINT_ID_COLUMN_NAME = "MMSI"
TPOINT_COLUMN_NAME = "trajectory"

SRID = 4326




In [5]:
Time_granularities = {
                # "MILLISECOND" : timedelta(milliseconds=1),
                    "SECOND" : timedelta(seconds=1),
                    "MINUTE" : timedelta(minutes=1),
                #   "HOUR" : timedelta(hours=1),
                }

In [6]:
class Database_connector:
    """
    Singleton class used to connect to the MobilityDB database.
    """
    
    def __init__(self):
        try: 
            connection_params = {
            "host": "localhost",
            "port": 5432,
            "dbname": DATABASE_NAME,
            "user": "postgres",
            "password": "postgres"
            }
            self.table_name = TPOINT_TABLE_NAME
            self.id_column_name = TPOINT_ID_COLUMN_NAME
            self.tpoint_column_name = TPOINT_COLUMN_NAME               
            self.connection = MobilityDB.connect(**connection_params)

            self.cursor = self.connection.cursor()

            self.cursor.execute(f"SELECT {self.id_column_name} FROM public.{self.table_name};")
            self.ids_list = self.cursor.fetchall()
            self.ids_list = self.ids_list[:int(len(self.ids_list)*PERCENTAGE_OF_OBJECTS)]
            
        except Exception as e:
            pass

    
    def get_min_timestamp(self):
        """
        Returns the min timestamp of the tpoints columns.

        """
        try:
            
            self.cursor.execute(f"SELECT MIN(startTimestamp({self.tpoint_column_name})) AS earliest_timestamp FROM public.{self.table_name};")
            return self.cursor.fetchone()[0]
        except Exception as e:
            pass

    def get_max_timestamp(self):
        """
        Returns the max timestamp of the tpoints columns.

        """
        try:
            self.cursor.execute(f"SELECT MAX(endTimestamp({self.tpoint_column_name})) AS latest_timestamp FROM public.{self.table_name};")
            return self.cursor.fetchone()[0]
        except Exception as e:
            pass
        

    def get_subset_of_tpoints(self, pstart, pend, xmin, ymin, xmax, ymax, time_granularity, start_date):
        """
        For each object in the ids_list :
            Fetch the subset of the associated Tpoints between the start and end timestamps
            contained in the STBOX defined by the xmin, ymin, xmax, ymax.
        """
        
        try:
            ids_list = [ f"'{id[0]}'"  for id in self.ids_list]
            ids_str = ', '.join(map(str, ids_list))

            if time_granularity == "SECOND":
                time_value = 1
            elif time_granularity == "MINUTE":
                time_value = 60
            
            # return [self.tpoint_column_name, self.id_column_name, ids_str, xmin, ymin, xmax, ymax, pstart, pend, time_granularity, start_date, time_value]
            query = f"""WITH trajectories as (
                    SELECT 
                        atStbox(
                            a.{self.tpoint_column_name}::tgeompoint,
                            stbox(
                                ST_MakeEnvelope(
                                    {xmin}, {ymin}, -- xmin, ymin
                                    {xmax}, {ymax}, -- xmax, ymax
                                    4326 -- SRID
                                ),
                                tstzspan('[{pstart}, {pend}]')
                            )
                        ) as trajectory
                    FROM public.{self.table_name} as a 
                    WHERE a.{self.id_column_name} in ({ids_str})),

                    resampled as (

                    SELECT tsample(traj.trajectory, INTERVAL '1 {time_granularity}', TIMESTAMP '{start_date}')  AS resampled_trajectory
                        FROM 
                            trajectories as traj)
				
                    SELECT
                            EXTRACT(EPOCH FROM (startTimestamp(rs.resampled_trajectory) - '{start_date}'::timestamp))::integer / {time_value} AS start_index ,
                            EXTRACT(EPOCH FROM (endTimestamp(rs.resampled_trajectory) - '{start_date}'::timestamp))::integer / {time_value} AS end_index,
                            rs.resampled_trajectory
                    FROM resampled as rs ;"""
        
            self.cursor.execute(query)
       
            rows = self.cursor.fetchall()
            return rows
        except Exception as e:
            print(query)
            print(e)


    def close(self):
        """
        Close the connection to the MobilityDB database.
        """
        self.cursor.close()
        self.connection.close()

In [7]:
db = Database_connector()
GRANULARITY = Time_granularities["MINUTE"]
start_date = db.get_min_timestamp()
end_date = db.get_max_timestamp()
total_frames = math.ceil( (end_date - start_date) // GRANULARITY )

timestamps = [start_date + i * GRANULARITY for i in range(total_frames)]
timestamps = [dt.replace(tzinfo=None) for dt in timestamps]
timestamps_strings = [dt.strftime('%Y-%m-%d %H:%M:%S') for dt in timestamps]

In [8]:
len(timestamps)

1439

# Python Threading

In order to get rid of Subprocess without having the dips in framerate when changing the time delta, we are experimenting with the threading library

In [41]:
import numpy as np
import threading
from shapely.geometry import Point
from queue import Queue

def create_matrix(result_queue, arg2):
    empty_point_wkt = Point().wkt  # "POINT EMPTY"
    matrix = np.full((6000, 1440), empty_point_wkt)  # Creates a matrix filled with 'POINT EMPTY'
    print("Matrix shape:", matrix.shape)
    print(arg2)
    result_queue.put(matrix)  # Put the result in the queue

# Create a queue to hold the result
result_queue = Queue()

# Creating and starting a new thread to generate the matrix
thread = threading.Thread(target=create_matrix, args=(result_queue,"hello"))
thread.start()
thread.join()  # Wait for the thread to complete

# Retrieve the result from the queue
result_matrix = result_queue.get()
print("Retrieved matrix shape:", result_matrix.shape)


Matrix shape: (6000, 1440)
hello
Retrieved matrix shape: (6000, 1440)


In [10]:
result_matrix.shape

(6000, 1440)

In [13]:
args = [0, 59, 60, 0.1, -180, -90, 180, 90, timestamps, 'SECOND',db]
len(args)

11

In [19]:
import numpy as np
import threading
from shapely.geometry import Point
from queue import Queue

def create_matrix(result_queue, begin_frame, end_frame, TIME_DELTA_SIZE, PERCENTAGE_OF_OBJECTS, x_min, y_min, x_max, y_max, timestamps, total_frames, GRANULARITY, db):
    # a  =f"all parameters : {begin_frame, end_frame, TIME_DELTA_SIZE, PERCENTAGE_OF_OBJECTS, x_min, y_min, x_max, y_max, len(timestamps), total_frames, GRANULARITY, db}"
    # result_queue.put(a)  
    p_start = timestamps[begin_frame]
    p_end = timestamps[end_frame]
    # print(p_start, p_end, x_min, y_min, x_max, y_max)
    # now_db = time.time()
    rows = db.get_subset_of_tpoints(p_start, p_end, x_min, y_min, x_max, y_max, GRANULARITY, start_date)    
    # print(f"Time to fetch subset of tpoints: {time.time() - now_db} seconds\n")
            
    empty_point_wkt = Point().wkt  # "POINT EMPTY"
    matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)
    

    # now = time.time()

    for i in range(len(rows)):
        if rows[i][2] is not None:
            try:
                traj_resampled = rows[i][2]

                start_index = rows[i][0] - begin_frame
                end_index = rows[i][1] - begin_frame
                values = np.array([point.wkt for point in traj_resampled.values()])
                matrix[i, start_index:end_index+1] = values
        
            except:
                continue
        
    result_queue.put(matrix)
    
















    # empty_point_wkt = Point().wkt  # "POINT EMPTY"
    # matrix = np.full((6000, 1440), empty_point_wkt)  # Creates a matrix filled with 'POINT EMPTY'
    # print("Matrix shape:", matrix.shape)
    # print(arg2)
    # result_queue.put(matrix)  # Put the result in the queue

# Create a queue to hold the result
print("ok")
result_queue = Queue()
# Creating and starting a new thread to generate the matrix
thread = threading.Thread(target=create_matrix, args=(result_queue, 0, 59, 60, 0.1, -180, -90, 180, 90, timestamps,len(timestamps), 'SECOND',db))
thread.start()
print("ok2")
thread.join()  # Wait for the thread to complete
print("ok3")
# Retrieve the result from the queue
result_matrix = result_queue.get()
print("Retrieved matrix shape:", result_matrix)


ok
ok2
ok3
Retrieved matrix shape: [['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']
 ['POINT (8.42333 55.4718)' 'POINT (8.42333 55.4718)'
  'POINT (8.42333 55.4718)' ... 'POINT (8.42335 55.4718)'
  'POINT (8.42335 55.4718)' 'POINT (8.42335 55.4718)']
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']
 ...
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']]


In [None]:
import numpy as np
import threading
from shapely.geometry import Point
from queue import Queue

def create_matrix(result_queue, begin_frame, end_frame, TIME_DELTA_SIZE, PERCENTAGE_OF_OBJECTS, x_min, y_min, x_max, y_max, timestamps, total_frames, GRANULARITY, ids_list):
    # a  =f"all parameters : {begin_frame, end_frame, TIME_DELTA_SIZE, PERCENTAGE_OF_OBJECTS, x_min, y_min, x_max, y_max, len(timestamps), total_frames, GRANULARITY, db}"
    # result_queue.put(a)  
    p_start = timestamps[begin_frame]
    p_end = timestamps[end_frame]
    start_date = timestamps[0]
    connection_params = {
            "host": "localhost",
            "port": 5432,
            "dbname": "mobilitydb",
            "user": "postgres",
            "password": "postgres"
            }
    table_name = "pymeos_demo"
    id_column_name = "mmsi"
    tpoint_column_name = "trajectory"               
    connection = MobilityDB.connect(**connection_params)
    
    cursor = connection.cursor()

    ids_list = [ f"'{id[0]}'"  for id in ids_list]
    ids_str = ', '.join(map(str, ids_list))

    if GRANULARITY == "SECOND":
        time_value = 1
    elif GRANULARITY == "MINUTE":
        time_value = 60
            
    # return [self.tpoint_column_name, self.id_column_name, ids_str, xmin, ymin, xmax, ymax, pstart, pend, time_granularity, start_date, time_value]
    query = f"""WITH trajectories as (
            SELECT 
                atStbox(
                    a.{tpoint_column_name}::tgeompoint,
                    stbox(
                        ST_MakeEnvelope(
                            {x_min}, {y_min}, -- xmin, ymin
                            {x_max}, {y_max}, -- xmax, ymax
                            4326 -- SRID
                        ),
                        tstzspan('[{p_start}, {p_end}]')
                    )
                ) as trajectory
            FROM public.{table_name} as a 
            WHERE a.{id_column_name} in ({ids_str})),

            resampled as (

            SELECT tsample(traj.trajectory, INTERVAL '1 {GRANULARITY}', TIMESTAMP '{start_date}')  AS resampled_trajectory
                FROM 
                    trajectories as traj)
        
            SELECT
                    EXTRACT(EPOCH FROM (startTimestamp(rs.resampled_trajectory) - '{start_date}'::timestamp))::integer / {time_value} AS start_index ,
                    EXTRACT(EPOCH FROM (endTimestamp(rs.resampled_trajectory) - '{start_date}'::timestamp))::integer / {time_value} AS end_index,
                    rs.resampled_trajectory
            FROM resampled as rs ;"""

    cursor.execute(query)

    rows = cursor.fetchall()
    cursor.close()
    connection.close()
       
    empty_point_wkt = Point().wkt  # "POINT EMPTY"
    matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)
    

    # now = time.time()

    for i in range(len(rows)):
        if rows[i][2] is not None:
            try:
                traj_resampled = rows[i][2]

                start_index = rows[i][0] - begin_frame
                end_index = rows[i][1] - begin_frame
                values = np.array([point.wkt for point in traj_resampled.values()])
                matrix[i, start_index:end_index+1] = values
        
            except:
                continue
        
    result_queue.put(matrix)
    
















    # empty_point_wkt = Point().wkt  # "POINT EMPTY"
    # matrix = np.full((6000, 1440), empty_point_wkt)  # Creates a matrix filled with 'POINT EMPTY'
    # print("Matrix shape:", matrix.shape)
    # print(arg2)
    # result_queue.put(matrix)  # Put the result in the queue

# Create a queue to hold the result
print("ok")
result_queue = Queue()
# Creating and starting a new thread to generate the matrix
thread = threading.Thread(target=create_matrix, args=(result_queue, 60, 119, 60, 0.1, -180, -90, 180, 90, timestamps,len(timestamps), 'MINUTE',db.ids_list))
thread.start()
print("ok2")
thread.join()  # Wait for the thread to complete
print("ok3")
# Retrieve the result from the queue
result_matrix = result_queue.get()
print("Retrieved matrix shape:", result_matrix)


# Python Multiprocess

In [12]:
import numpy as np
import multiprocessing
from shapely.geometry import Point

def create_matrix(result_queue, hello):
    print(hello)
    empty_point_wkt = Point().wkt  # "POINT EMPTY"
    # Creates a matrix filled with 'POINT EMPTY'
    matrix = np.full((6000, 1440), empty_point_wkt, dtype=object)
    # print("Matrix shape:", matrix.shape)
    result_queue.put(matrix)  # Put the result in the queue

if __name__ == '__main__':
    # Using Manager's queue
    # manager = multiprocessing.Manager()
    result_queue = multiprocessing.Queue()

    # Creating and starting a new process to generate the matrix
    process = multiprocessing.Process(target=create_matrix, args=(result_queue,"hello"))
    process.start()
    # while(True):
    a = result_queue.get()
    print(a)
    print(type(a))
    print(len(a))
    print(a.shape)
        
    process.join()  # Wait for the process to complete
    
    print("Process ended")
    # # Retrieve the result from the queue
    # result_matrix = result_queue.get()
    # print("Retrieved matrix shape:", result_matrix)


Process Process-4:
Traceback (most recent call last):
  File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
TypeError: create_matrix() takes 2 positional arguments but 13 were given


KeyboardInterrupt: 

In [13]:
import numpy as np
import multiprocessing
from shapely.geometry import Point




def create_matrix(result_queue, begin_frame, end_frame, TIME_DELTA_SIZE, PERCENTAGE_OF_OBJECTS, x_min, y_min, x_max, y_max, timestamps, total_frames, GRANULARITY, ids_list):
    p_start = timestamps[begin_frame]
    p_end = timestamps[end_frame]
    start_date = timestamps[0]
    print("ok")
    connection_params = {
            "host": "localhost",
            "port": 5432,
            "dbname": "mobilitydb",
            "user": "postgres",
            "password": "postgres"
            }
    table_name = "pymeos_demo"
    id_column_name = "mmsi"
    tpoint_column_name = "trajectory"               
    connection = MobilityDB.connect(**connection_params)
    
    cursor = connection.cursor()

    ids_list = [ f"'{id[0]}'"  for id in ids_list]
    ids_str = ', '.join(map(str, ids_list))

    if GRANULARITY == "SECOND":
        time_value = 1
    elif GRANULARITY == "MINUTE":
        time_value = 60
    print("ok2")
    # return [self.tpoint_column_name, self.id_column_name, ids_str, xmin, ymin, xmax, ymax, pstart, pend, time_granularity, start_date, time_value]
    query = f"""WITH trajectories as (
            SELECT 
                atStbox(
                    a.{tpoint_column_name}::tgeompoint,
                    stbox(
                        ST_MakeEnvelope(
                            {x_min}, {y_min}, -- xmin, ymin
                            {x_max}, {y_max}, -- xmax, ymax
                            4326 -- SRID
                        ),
                        tstzspan('[{p_start}, {p_end}]')
                    )
                ) as trajectory
            FROM public.{table_name} as a 
            WHERE a.{id_column_name} in ({ids_str})),

            resampled as (

            SELECT tsample(traj.trajectory, INTERVAL '1 {GRANULARITY}', TIMESTAMP '{start_date}')  AS resampled_trajectory
                FROM 
                    trajectories as traj)
        
            SELECT
                    EXTRACT(EPOCH FROM (startTimestamp(rs.resampled_trajectory) - '{start_date}'::timestamp))::integer / {time_value} AS start_index ,
                    EXTRACT(EPOCH FROM (endTimestamp(rs.resampled_trajectory) - '{start_date}'::timestamp))::integer / {time_value} AS end_index,
                    rs.resampled_trajectory
            FROM resampled as rs ;"""

    cursor.execute(query)
    print("ok3 rows")
    rows = cursor.fetchall()
    cursor.close()
    connection.close()
       
    empty_point_wkt = Point().wkt  # "POINT EMPTY"
    matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)
    

    # now = time.time()
    print("ok4 matrix")
    for i in range(len(rows)):
        if rows[i][2] is not None:
            try:
                traj_resampled = rows[i][2]

                start_index = rows[i][0] - begin_frame
                end_index = rows[i][1] - begin_frame
                values = np.array([point.wkt for point in traj_resampled.values()])
                matrix[i, start_index:end_index+1] = values
        
            except:
                continue
        
    result_queue.put(matrix)

    

# Using Manager's queue
# manager = multiprocessing.Manager()
result_queue = multiprocessing.Queue()

# Creating and starting a new process to generate the matrix
process = multiprocessing.Process(target=create_matrix, args=(result_queue, 60, 119, 60, 0.1, -180, -90, 180, 90, timestamps,len(timestamps), 'MINUTE',db.ids_list))
process.start()
# while(True):
a = result_queue.get()
print(a)
print(type(a))
print(len(a))
print(a.shape)
    
process.join()  # Wait for the process to complete

print("Process ended")


ok


ok2
ok3 rows
ok4 matrix
[['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT (12.6058 55.6843)'
  'POINT (12.6058 55.6843)' 'POINT (12.6058 55.6843)']
 ['POINT (8.42334 55.4718)' 'POINT (8.42334 55.4718)'
  'POINT (8.423373333333334 55.4718)' ... 'POINT (8.42334 55.4718)'
  'POINT (8.42334 55.4718)' 'POINT (8.42335 55.4718)']
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']
 ...
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']
 ['POINT EMPTY' 'POINT EMPTY' 'POINT EMPTY' ... 'POINT EMPTY'
  'POINT EMPTY' 'POINT EMPTY']]
<class 'numpy.ndarray'>
5821
(5821, 60)
Process ended


In [10]:
import numpy as np
import multiprocessing
from shapely.geometry import Point
def create_matrix(result_queue, begin_frame, end_frame, TIME_DELTA_SIZE, PERCENTAGE_OF_OBJECTS, x_min, y_min, x_max, y_max, timestamps, total_frames, GRANULARITY, db):
    # a  =f"all parameters : {begin_frame, end_frame, TIME_DELTA_SIZE, PERCENTAGE_OF_OBJECTS, x_min, y_min, x_max, y_max, len(timestamps), total_frames, GRANULARITY, db}"
    # result_queue.put(a)  
    p_start = timestamps[begin_frame]
    p_end = timestamps[end_frame]
    # print(p_start, p_end, x_min, y_min, x_max, y_max)
    # now_db = time.time()
    


    rows = db.get_subset_of_tpoints(p_start, p_end, x_min, y_min, x_max, y_max, GRANULARITY, start_date)    
    # print(f"Time to fetch subset of tpoints: {time.time() - now_db} seconds\n")
            
    empty_point_wkt = Point().wkt  # "POINT EMPTY"
    matrix = np.full((len(rows), TIME_DELTA_SIZE), empty_point_wkt, dtype=object)
    

    # now = time.time()

    for i in range(len(rows)):
        if rows[i][2] is not None:
            try:
                traj_resampled = rows[i][2]

                start_index = rows[i][0] - begin_frame
                end_index = rows[i][1] - begin_frame
                values = np.array([point.wkt for point in traj_resampled.values()])
                matrix[i, start_index:end_index+1] = values
        
            except:
                continue
        
    result_queue.put(matrix)
    

# Using Manager's queue
# manager = multiprocessing.Manager()
result_queue = multiprocessing.Queue()

# Creating and starting a new process to generate the matrix
process = multiprocessing.Process(target=create_matrix, args=(result_queue, 0, 59, 60, 0.1, -180, -90, 180, 90, timestamps,len(timestamps), 'SECOND',db))
process.start()
# while(True):
a = result_queue.get()
print(a)
print(type(a))
print(len(a))
print(a.shape)
    
process.join()  # Wait for the process to complete

print("Process ended")


KeyboardInterrupt: 

In [4]:
a[0]

array(['POINT EMPTY', 'POINT EMPTY', 'POINT EMPTY', ..., 'POINT EMPTY',
       'POINT EMPTY', 'POINT EMPTY'], dtype=object)

In [None]:
import numpy as np
import multiprocessing

def process_row(row):
    # Example operation: Increment each element in the row
    return row + 1

def create_and_process_matrix():
    # Step 2: Create a large matrix
    rows, cols = 6000, 1440  # Example size
    matrix = np.random.rand(rows, cols)
    
    # Step 3: Create a pool of workers and map process_row to each row
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        # Apply process_row to each row of the matrix
        # Note: pool.map expects a function that takes a single argument,
        # so you might need to adapt if your function requires more.
        result = pool.map(process_row, matrix)
    
    return np.array(result)

if __name__ == '__main__':
    result_matrix = create_and_process_matrix()
    print("Processed matrix shape:", result_matrix.shape)


In [20]:
import numpy as np
import multiprocessing
from multiprocessing import shared_memory
from shapely.geometry import Point

def create_matrix(shm_name, shape):
    # Access the existing shared memory block
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    
    # Create a NumPy array backed by shared memory
    matrix = np.ndarray(shape, dtype=np.str_, buffer=existing_shm.buf)
    
    # Perform the operation to fill the matrix
    empty_point_wkt = Point().wkt  # "POINT EMPTY"
    matrix.fill(empty_point_wkt)
    print("Matrix shape:", matrix.shape)
    
    # Clean up (memory will still exist, just detach from this process)
    existing_shm.close()


shape = (6000, 1440)

# Calculate the size of the array and create shared memory
dtype = np.str_
itemsize = np.dtype(dtype).itemsize
size = itemsize * shape[0] * shape[1]
shm = shared_memory.SharedMemory(create=True, size=size)

# Create a NumPy array backed by shared memory
matrix = np.ndarray(shape, dtype=difference_type, buffer=shm.buf)
matrix.fill('')  # Initialize with empty strings, can also use the actual initialization you need

# Start the process that will modify the shared matrix
process = multiprocessing.Process(target=create_matrix, args=(shm.name, shape))
process.start()
process.join()

# Access the matrix from shared memory after the process completes
result_matrix = np.ndarray(shape, dtype=difference_type, buffer=shm.buf)
print("Processed matrix shape:", result_matrix.shape)

# Clean up shared memory
shm.close()
shm.unlink()  # This removes the memory, ensure all processes are done using it



ValueError: 'size' must be a positive number different from zero