In [34]:
import pandas as pd

## Data: How is the .csv organized?

For each .csv file the following apply:
* each row represents the data of a single vehicle
* the first 10 columns in the 1st row include the columns’ names
* the first 4 columns include information about the trajectory like the unique trackID, the type of vehicle, the distance traveled in meters and the average speed of the vehicle in km/h
* the last 6 columns are then repeated every 6 columns based on the time frequency. For example, column_5 contains the latitude of the vehicle at time column_10, and column­­­_11 contains the latitude of the vehicle at time column_16.
* Speed is in km/h, Longitudinal and Lateral Acceleration in m/sec2 and time in seconds.

In [35]:
# 1. lets blindly try to read the data into a dataframe

# df = pd.read_csv("../data/test_data.csv", sep=";")

The error is caused by the fact that rows do **not** have equal number of columns. 

In [36]:
with open("../data/test_data.csv", 'r') as file:
    lines = file.readlines()

In [37]:
print(f"The number of rows/lines is {len(lines)}")

The number of rows/lines is 853


In [38]:
print(lines[0]) # column names
print(lines[0].strip('\n').strip().strip(';').split(';')) # columns names as a list

track_id; type; traveled_d; avg_speed; lat; lon; speed; lon_acc; lat_acc; time

['track_id', ' type', ' traveled_d', ' avg_speed', ' lat', ' lon', ' speed', ' lon_acc', ' lat_acc', ' time']


In [39]:
lines_as_lists = [line.strip('\n').strip().strip(';').split(';') for line in lines]
len(lines_as_lists)

853

In [40]:
print(f"the number of fields in row 1 is {len(lines_as_lists[1])}, row 2 is {len(lines_as_lists[2])}")

the number of fields in row 1 is 3010, row 2 is 10510


The number of fields in each row is = 4 + 6 * n

* The first 4 unique values are: track_id; type; traveled_d; avg_speed
* The remaining fields are repeated sequences of: 
  * lat_1; lon_1; speed_1; lon_acc_1; lat_acc_1; time_1
  * lat_2; lon_2; speed_2; lon_acc_2; lat_acc_2; time_2
  * ...
  * lat_n; lon_n; speed_n; lon_acc_n; lat_acc_n; time_n

* n depends on the time frequency. It is different for each row.


In [41]:
no_field_max = 0

for row in lines_as_lists:
    if len(row) > no_field_max:
        no_field_max = len(row)

print(f"the maximum number of fields is {no_field_max}")
largest_n = int((no_field_max-4)/6)
print(f"the largest n = {largest_n}")

the maximum number of fields is 98410
the largest n = 16401


We can divide each row into 2 parts:
1.  A list of the first 4 values. 
    - These will correspond to ['track_id', ' type', ' traveled_d', ' avg_speed']
2.  A matrix of size 6*n of the remaining values. 
    - These will correspond to a table with columns [' lat', ' lon', ' speed', ' lon_acc', ' lat_acc', ' time'] and n rows.
    - We will append the 'track_id' value to each row to keep track of the vehicle identity.

In [42]:
cols = lines_as_lists.pop(0)

In [43]:
track_cols = cols[:4]
trajectory_cols = ['track_id'] + cols[4:]

print(track_cols)
print(trajectory_cols)


['track_id', ' type', ' traveled_d', ' avg_speed']
['track_id', ' lat', ' lon', ' speed', ' lon_acc', ' lat_acc', ' time']


In [44]:
track_info = []
trajectory_info = []

for row in lines_as_lists:
    track_id = row[0]

    # add the first 4 values to track_info
    track_info.append(row[:4]) 

    remaining_values = row[4:]
    # reshape the list into a matrix and add track_id
    trajectory_matrix = [ [track_id] + remaining_values[i:i+6] for i in range(0,len(remaining_values),6)]
    # add the matrix rows to trajectory_info
    trajectory_info = trajectory_info + trajectory_matrix


In [45]:
df_track = pd.DataFrame(data= track_info,columns=track_cols)

df_track.head(100)

Unnamed: 0,track_id,type,traveled_d,avg_speed
0,1,Car,134.88,24.278704
1,2,Car,426.97,21.958339
2,3,Car,206.58,8.263246
3,4,Car,261.45,30.361735
4,5,Taxi,264.12,16.979263
...,...,...,...,...
95,96,Car,86.51,12.976159
96,97,Car,212.20,14.042918
97,98,Car,224.82,11.562238
98,99,Motorcycle,191.57,24.630498


In [46]:
df_trajectory = pd.DataFrame(data= trajectory_info,columns=trajectory_cols)

df_trajectory.head(100)

Unnamed: 0,track_id,lat,lon,speed,lon_acc,lat_acc,time
0,1,37.979513,23.736025,35.3204,-0.2996,-0.0175,14.000000
1,1,37.979510,23.736027,35.2990,-0.2788,-0.0188,14.040000
2,1,37.979508,23.736030,35.2800,-0.2656,-0.0200,14.080000
3,1,37.979505,23.736032,35.2624,-0.2589,-0.0213,14.120000
4,1,37.979502,23.736035,35.2460,-0.2460,-0.0225,14.160000
...,...,...,...,...,...,...,...
95,1,37.979214,23.736284,39.7003,-0.8160,0.1721,17.800000
96,1,37.979210,23.736287,39.5834,-0.8087,0.1764,17.840000
97,1,37.979207,23.736290,39.4686,-0.7858,0.1806,17.880000
98,1,37.979204,23.736293,39.3582,-0.7474,0.1846,17.920000


### Strip the whitespaces from the column names

In [47]:
df_trajectory.columns = df_trajectory.columns.str.strip()
df_track.columns = df_track.columns.str.strip()

In [48]:
# Define a dictionary with the correct data types for each column
column_types = {
    'track_id': 'int64',
    'lat': 'float64',
    'lon': 'float64',
    'speed': 'float64',
    'lon_acc': 'float64',
    'lat_acc': 'float64',
    'time': 'float64'
}

# Convert the data types of the DataFrame columns
df_trajectory = df_trajectory.astype(column_types)

In [49]:
print(df_trajectory.columns)


Index(['track_id', 'lat', 'lon', 'speed', 'lon_acc', 'lat_acc', 'time'], dtype='object')


In [50]:
df_trajectory.dtypes

track_id      int64
lat         float64
lon         float64
speed       float64
lon_acc     float64
lat_acc     float64
time        float64
dtype: object

In [28]:
def read_file(file_path):
    """
    This function reads a file and returns a list of lists where each list is a line from the file.
    :param file_path: The path to the file
    :return: A list of lists where each list is a line from the file
    """
    with open(file_path, 'r') as file:
        lines = file.readlines()
    return [line.strip('\n').strip().strip(';').split(';') for line in lines]


def get_max_fields(lines_as_lists):
    """
    This function returns the maximum number of fields in the lines.
    :param lines_as_lists: The lines as lists
    :return: The maximum number of fields
    """
    no_field_max = 0
    for row in lines_as_lists:
        if len(row) > no_field_max:
            no_field_max = len(row)
    return no_field_max


def get_track_and_trajectory_info(lines_as_lists, no_field_max):
    """
    This function returns the track and trajectory information.
    :param lines_as_lists: The lines as lists
    :param no_field_max: The maximum number of fields
    :return: The track and trajectory information
    """
    track_info = []
    trajectory_info = []
    for row in lines_as_lists:
        track_id = row[0]
        track_info.append(row[:4])
        remaining_values = row[4:]
        trajectory_matrix = [[track_id] + remaining_values[i:i + 6] for i in range(0, len(remaining_values), 6)]
        trajectory_info = trajectory_info + trajectory_matrix
    return track_info, trajectory_info


def create_dataframes(track_info, trajectory_info, cols):
    """
    This function creates dataframes from the track and trajectory information.
    :param track_info: The track information
    :param trajectory_info: The trajectory information
    :param cols: The columns
    :return: The dataframes
    """
    track_cols = cols[:4]
    trajectory_cols = ['track_id'] + cols[4:]
    df_vehicles = pd.DataFrame(data=track_info, columns=track_cols)
    df_trajectory = pd.DataFrame(data=trajectory_info, columns=trajectory_cols)
    return df_vehicles, df_trajectory

def create_pg_sqlalchemy_engine(user, password, host, port, db):
    connection_string = f'postgresql://{user}:{password}@{host}:{port}/{db}'
    engine = create_engine(connection_string)
    return engine

def _load_data(df, table_name):
    # create a connection to the database using sqlalchemy
    engine = create_pg_sqlalchemy_engine('airflow', 'airflow', 'postgres', '5432', 'postgres')

    # Try to establish a connection and execute a simple SQL query
    try:
        with engine.connect() as connection:
            result = connection.execute("SELECT 1")
            print("Connection successful. Result: ", result.scalar())
    except Exception as e:
        print("Failed to connect to the database. Error: ", e)

    # Load data from a CSV file into a pandas DataFrame
    # data = pd.read_csv('/opt/airflow/data/test_data.csv')

    # Write dataFrame to the database
    # TODO: convert the data types to the correct ones before writing to the database
    df.to_sql(table_name, con=engine, if_exists='replace', index=False)

def convert_columns_to_correct_data_types(list_of_dataframes, column_types=None):
    # Convert the data types of the DataFrame columns, loop through the list of DataFrames
    for df in list_of_dataframes:
        for column, data_type in column_types.items():
            try:
                df[column] = df[column].astype(data_type)
            except KeyError:
                print(f"Column {column} does not exist in the DataFrame.")
            except ValueError:
                print(f"Cannot convert data in column {column} to {data_type}.")

def generate_df(file_path: str):
    # Read the file
    lines_as_lists = read_file(file_path)
    # Get the maximum number of fields
    no_field_max = get_max_fields(lines_as_lists)
    print(f"the maximum number of fields is {no_field_max}")
    largest_n = int((no_field_max - 4) / 6)
    print(f"the largest n = {largest_n}")
    cols = lines_as_lists.pop(0)
    # Get the track and trajectory information
    track_info, trajectory_info = get_track_and_trajectory_info(lines_as_lists, no_field_max)
    # Create the dataframes
    df_vehicles, df_trajectory = create_dataframes(track_info, trajectory_info, cols)
    
    # Define a dictionary with the correct data types for each column
    column_types = {
        'track_id': 'int64',
        'lat': 'float64',
        'lon': 'float64',
        'speed': 'float64',
        'lon_acc': 'float64',
        'lat_acc': 'float64',
        'time': 'float64'
    }
    
    # Convert the columns to the correct data types
    convert_columns_to_correct_data_types([df_vehicles, df_trajectory], column_types)
    
    print(df_vehicles.head(20))
    print(df_trajectory.head())

    return df_vehicles, df_trajectory

In [29]:
df_vehicles, df_trajectory = generate_df("../data/test_data.csv")

the maximum number of fields is 98410
the largest n = 16401
Column lat does not exist in the DataFrame.
Column lon does not exist in the DataFrame.
Column speed does not exist in the DataFrame.
Column lon_acc does not exist in the DataFrame.
Column lat_acc does not exist in the DataFrame.
Column time does not exist in the DataFrame.
Column lat does not exist in the DataFrame.
Column lon does not exist in the DataFrame.
Column speed does not exist in the DataFrame.
Column lon_acc does not exist in the DataFrame.
Column lat_acc does not exist in the DataFrame.
Column time does not exist in the DataFrame.
    track_id         type  traveled_d   avg_speed
0          1          Car      134.88   24.278704
1          2          Car      426.97   21.958339
2          3          Car      206.58    8.263246
3          4          Car      261.45   30.361735
4          5         Taxi      264.12   16.979263
5          6          Car      251.90   28.516707
6          7          Car      245.85   

In [30]:
df_vehicles.head()

Unnamed: 0,track_id,type,traveled_d,avg_speed
0,1,Car,134.88,24.278704
1,2,Car,426.97,21.958339
2,3,Car,206.58,8.263246
3,4,Car,261.45,30.361735
4,5,Taxi,264.12,16.979263


In [31]:
df_vehicles.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 852 entries, 0 to 851
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   track_id     852 non-null    int64 
 1    type        852 non-null    object
 2    traveled_d  852 non-null    object
 3    avg_speed   852 non-null    object
dtypes: int64(1), object(3)
memory usage: 26.8+ KB


In [32]:
# check the columns types
df_trajectory.columns

Index(['track_id', ' lat', ' lon', ' speed', ' lon_acc', ' lat_acc', ' time'], dtype='object')

In [33]:
df_trajectory.dtypes

track_id     int64
 lat        object
 lon        object
 speed      object
 lon_acc    object
 lat_acc    object
 time       object
dtype: object