# FitFile2DB

```markdown
Author:         Maik 'Schrottie' Bischoff
Decription:     Parse Garmin .fit files and write the data to a CSV file or database.
Version:        0.6
Date:           17.04.2023
Requires:       [dtcooper/python-fitparse](https://github.com/dtcooper/python-fitparse)
```
### Change-/Versionlog:

<table>
    <tr>
        <td>
            <span style="font-size: 85%;font-family: monospace">0.6:</span>
        </td>
        <td>
            <ul style="font-size: 85%;font-family: monospace">
                <li>Added (simple) error logging function.</li>
                <li>Extension of the database with an 'id' field (autoincrement, not null, primary key) and adjustment of the write function because this field does not exist in the dataframe..</li>
            </ul>
        </td>
    </tr>
    <tr>
        <td>
            <span style="font-size: 85%;font-family: monospace">0.5:</span>
        </td>
        <td>
            <ul style="font-size: 85%;font-family: monospace">
                <li>Added function to use PostgreSQL instead of SQLite3.</li>
                <li>Renamed project to FITFILE2DB.
            </ul>
        </td>
    </tr>
    <tr>
        <td>
            <span style="font-size: 85%;font-family: monospace">0.4:</span>
        </td>
        <td>
            <ul style="font-size: 85%;font-family: monospace">
                <li>Added function to read activity type</lI>
                <li>Added function to read the totals of an activity and write them into a table. If some fields of new fit files are missing, the fields would be added to the table.</li>
            </ul>
        </td>
    </tr>
    <tr>
        <td>
            <span style="font-size: 85%;font-family: monospace">0.3:</span>
        </td>
        <td>
            <ul style="font-size: 85%;font-family: monospace">
                <li>Added function to convert mph to kph</li>
                <li>Added a function for write data into database.
                    <ul>
                        <li>convert some fields to correct datatype</li>
                        <li>check whether all fields from the current .fit-file exist in an existing table, if necessary updating the table with the new fields</li>
                    </ul>
                </li>
            </ul>
        </td>
    </tr>
    <tr>
        <td>
            <span style="font-size: 85%;font-family: monospace">0.2:</span>
        </td>
        <td>
            <ul style="font-size: 85%;font-family: monospace">
                <li>Added a function to recursively search a directory for .fit files to allow processing multiple files at the same time.</li>
                <li>Adjusting the field label for the longitude (position_long --> position_lon) so that the field is recognized properly when the data is processed further (e.g. in ArcGIS Pro)</li>
            </ul>
        </td>
    </tr>
    <tr>
        <td>
            <span style="font-size: 85%;font-family: monospace">0.1:</span>
        </td>
        <td>
            <ul style="font-size: 85%;font-family: monospace">
                <li>Basic function for processing a .fit file.</li>
                <li>Converting the crude Garmin coordinate format (semicircles) into 'real' coordinates.</li>
            </ul>
        </td>
    </tr>
</table>

## Imports

In [None]:
import os
import fitparse
import pandas as pd
import datetime
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.orm import Session
import sqlite3
import numpy as np
import psycopg2
import logging

## Variables

In [None]:
today = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
fit_path = os.path.join(os.getcwd(), 'testdata')
csv_path = os.path.join(os.getcwd(), 'testdata')
sqlite_db = os.path.join(os.getcwd(), 'fitfile.db')
err_log = os.path.join(os.getcwd(), 'error.log')
use_db = True # If True, all data were written into database and no CSV would be generated
db_type = "SQLITE" # Possible types are 'PGSQL' for PostgreSQL and 'SQLITE' for SQLite3.

## Error Logger

In [None]:

# Initialize error logger
log_format = '%(asctime)s - %(levelname)s - %(message)s'
date_format = '%d-%b-%y %H:%M:%S'
logging.basicConfig(filename=err_log, format=log_format, level=logging.DEBUG, datefmt=date_format)

# Configure stream handler for console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR)
console_handler.setFormatter(logging.Formatter(log_format, datefmt=date_format))

# log, log, log ... ;)
logger = logging.getLogger(__name__)
logger.addHandler(console_handler)

## Functions
### find_fit_files

```markdown
Walk through a dir to find files.
```

In [None]:
def find_fit_files(directory):
    try:
        fit_files = []
        for root, dirs, files in os.walk(directory):
            for file in files:
                if is_fit_file(file):
                    fit_files.append(os.path.join(root, file))
        return fit_files

    except Exception as e:
            logger.error(str(e))
            print(f"Error in find_fit_files: {e}")

### is_fit_file

```markdown
Only if it end with fit, it is a fit! ;)
```

In [None]:

def is_fit_file(filename):
    try:
          return filename.lower().endswith('.fit')

    except Exception as e:
            logger.error(str(e))
            print(f"Error in is_fit_file: {e}")

### semicircles_to_degree

```markdown
Convert crappy Garmin semicircles to useful degrees
```

In [None]:

def semicircles_to_degree(semicircles):
    try:
        return semicircles * (180 / 2 ** 31)

    except Exception as e:
            logger.error(str(e))
            print(f"Error in semicircles_to_degree: {e}")


### mph_to_kph

```markdown
Convert miles per hour to kilometers per hour
```

In [None]:

def mph_to_kph(speeds):
    try:
        if speeds is None:
            return None
        # Convert a single speed value to a list
        if isinstance(speeds, (int, float)):
            speeds = [speeds]
        # Filtering the list to retain only valid speed values
        speeds = [s for s in speeds if s is not None and not np.isnan(s)]
        # Convert speeds to kph
        speeds = [s * 1.609344 for s in speeds]
        # Return the first value if it was originally a single value, otherwise return the list
        return speeds[0] if len(speeds) == 1 else speeds

    except Exception as e:
            logger.error(str(e))
            print(f"Error in mph_to_kph: {e}")

### load_env_variables

```markdown
Load environment variables needed for the db connection
```

In [None]:
def load_env_variables():
    try:
        # check if .env-Datei available
        if not os.path.isfile('.env'):
            raise FileNotFoundError("Die .env-Datei wurde nicht gefunden")

        # load env-data from .env
        load_dotenv()
        host = os.getenv('DB_HOST')
        database = os.getenv('DB_NAME')
        user = os.getenv('DB_USER')
        password = os.getenv('USER_PASSWD')

        # check if all variables are available
        if not all([server, database, username, password]):
            raise ValueError("One or more environment variables are missing!")

        return host, database, user, password

    except Exception as e:
            logger.error(str(e))
            print(f"Error in load_env_variables: {e}")

### write_to_database

```markdown
Function to write things into sqlite db
```

In [None]:

def write_to_database(df, table_name):
    try:
        # Establish database connection
        if db_type == "PGSQL":
            host, database, user, password = load_env_variables()
            conn = psycopg2.connect(host, database, user, password)
        elif db_type == "SQLITE":
            conn = sqlite3.connect(sqlite_db)
        else:
                return
        
        cursor = conn.cursor()

        # Get the columns of the table
        cursor.execute(f"PRAGMA table_info({table_name})")
        table_info = cursor.fetchall()
        table_columns = [tup[1] for tup in table_info]
        
        # Add missing columns to the table
        for col in df.columns:
            if col != 'id' and col not in table_columns:
                cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {col} TEXT")
                conn.commit()
                table_columns.append(col)
                
        # Write DataFrame to database
        for row in df.itertuples(index=False):
            # Create a dictionary mapping column names to their values for the current row
            row_dict = {col: getattr(row, col) for col in df.columns}
            # Create a list of values in the same order as the columns in the table
            values = [row_dict.get(col, '') for col in table_columns if col != 'id']
            cursor.execute(f"INSERT INTO {table_name} ({','.join([col for col in table_columns if col != 'id'])}) VALUES ({','.join(['?']*len(values))})", tuple(values))
            conn.commit()

        # Close database connection
        conn.close()

        print(f"Data successfully written to table {table_name} in database.")

    except Exception as e:
            logger.error(str(e))
            print(f"Error in write_to_database: {e}")

### read_from_database

```markdown
Function to read some stuff from sqlite db
```

In [None]:
def read_from_database(query):
    try:
        # Establish database connection
        if db_type == "PGSQL":
            host, database, user, password = load_env_variables()
            conn = psycopg2.connect(host, database, user, password)
        elif db_type == "SQLITE":
            conn = sqlite3.connect(sqlite_db)
        else:
                return
        
        cursor = conn.cursor()
        requested_data = pd.read_sql_query(query, conn)

        # Close connection and cursor
        cursor.close()
        conn.close()

        # Return requested data as a Pandas DataFrame
        return requested_data

    except Exception as e:
            logger.error(str(e))
            print(f"Error in read_from_database: {e}")

### get_activity_type

```markdown
Which type of activity is recorded in actual file?
```

In [None]:
def get_activity_type(fit_file):
    try:
        activity_type = None

        # open the file
        with fitparse.FitFile(fit_file) as fitfile:

            # look for all records with type "activity"
            for record in fitfile.get_messages("activity"):

                # read data field "sport" to get activity type
                sport_field = record.get("sport")
                if sport_field:
                    activity_type = sport_field.value

        return activity_type

    except Exception as e:
            logger.error(str(e))
            print(f"Error in get_activity_type: {e}")

### get_totals

```markdown
Get all totals from fit file and calculate some of them
```

In [None]:

def get_totals(filename):
    try:
        # Open the fit file
        fitfile = fitparse.FitFile(filename)
        
        # Create an empty list to hold the totals data
        totals_data = []

        # Loop over all the messages in the FIT file
        for record in fitfile.get_messages():
            # Check if this is a "session" message
            if record.name == "session":
                # Get the activity type
                activity_type = record.get_value("sport")

                # Create an empty dictionary to hold the totals fields for this message
                totals_fields = {"activity_type": activity_type}

                # Loop over all the fields in this message
                for field in record:
                    # Check if this field is a "totals" field
                    if field.name.startswith("total_"):
                        # If so, add it to the dictionary
                        totals_fields[field.name] = field.value

                # Add the totals fields for this message to the list
                totals_data.append(totals_fields)

        # Create a pandas dataframe from the totals data
        df_totals = pd.DataFrame(totals_data)

        # Get the base filename without path
        filename = os.path.basename(filename)
        # Extract the first part of the filename before the first underscore
        name_part = filename.split('_')[0]
        # Add a new column with the name part of the file name
        df_totals.insert(0, "activity_number", name_part)

        return df_totals

    except Exception as e:
            logger.error(str(e))
            print(f"Error in get_totals: {e}")

### read_fit_file

```markdown
Read all data from a single .fit file and write result into pandas dataframe.
```

In [None]:
def read_fit_file(file_path):
    try:
        fitfile = fitparse.FitFile(file_path)

        data = []
        for record in fitfile.get_messages('record'):
            # Create an empty dictionary to hold the data for this record
            record_data = {}
            for data_point in record:
                # Convert Garmin-Semicircles to Degree
                if data_point.name == 'position_lat' or data_point.name == 'position_long':
                    record_data[data_point.name] = semicircles_to_degree(data_point.value)
                elif data_point.name == 'radar_speeds':
                    record_data[data_point.name] = mph_to_kph(data_point.value)
                elif data_point.name == 'passing_speedabs':
                    # Include the passing speed in kph directly in the record_data dictionary
                    record_data['passing_speed_kph'] = mph_to_kph(data_point.value)
                else:
                    record_data[data_point.name] = data_point.value

            data.append(record_data)

        # Convert the list of data to a Pandas DataFrame
        df = pd.DataFrame(data)
        # Give the longitude a proper field name
        df = df.rename(columns={'position_long': 'position_lon'})

        return df

    except Exception as e:
            logger.error(str(e))
            print(f"Error in read_fit_file: {e}")

## Main-Function: run_fitfile2db

```markdown
Main function to do all the funny things.
```

In [None]:
def run_fitfile2db():
    try:
        # Set the directory to search for .fit files
        if fit_path:
            directory = fit_path
        else:
            directory = os.getcwd()

        # Find all .fit files in the directory
        fit_files = find_fit_files(directory)

        # Store filenames in dataframe
        df_fn = pd.DataFrame({'filename': fit_files})
        fieldname = df_fn.columns[0]
        df_fn =df_fn.rename(columns={fieldname: 'filename'})

        # Only if db flag is given:
        if use_db:
            # Check, if filename are known and build a dataframe with new files only
            querystring = 'select * from known_fitfiles'
            df_known = read_from_database(querystring)
            df_fn = df_fn[~df_fn['filename'].isin(df_known['filename'])]
            # Exit, if there is no new file
            if df_fn.empty:
                print('No new files to proceed!')
                return
        
        # Read the data from each (new) .fit file and combine it into a single DataFrame
        dfs = []
        for fit_file in df_fn['filename']:
            df = read_fit_file(fit_file)
            dfs.append(df)
        combined_df = pd.concat(dfs)

        # Convert all fields, except position data, to text
        combined_df = combined_df.astype({col: str for col in combined_df.columns if col not in ['position_lat', 'position_lon']})

        # Convert position fields to real
        if 'position_lat' in combined_df.columns:
            combined_df['position_lat'] = pd.to_numeric(combined_df['position_lat'], errors='coerce')
        if 'position_lon' in combined_df.columns:
            combined_df['position_lon'] = pd.to_numeric(combined_df['position_lon'], errors='coerce')
        if 'passing_speed_kph' in combined_df.columns:
            combined_df['passing_speed_kph'] = pd.to_numeric(combined_df['passing_speed_kph'], errors='coerce')

        # Read the totals from each (new) .fit file and combine it into a single DataFrame
        dftotals = []
        for fit_file in df_fn['filename']:
            dft = get_totals(fit_file)
            dftotals.append(dft)
        combined_df_totals = pd.concat(dftotals)

        # Write the combined DataFrames into database or to a CSV file
        if use_db:
            write_to_database(combined_df, 'fitfile_data')
            write_to_database(combined_df_totals, 'fitfile_totals')
            write_to_database(df_fn, 'known_fitfiles')
        elif csv_path:
            combined_df.to_csv(f'{csv_path}/{today}_output.csv', index=False)
            combined_df_totals.to_csv(f'{csv_path}/{today}_output_totals.csv', index=False)
        else:
            combined_df.to_csv('output.csv', index=False)
            combined_df_totals.to_csv('output_totals.csv', index=False)

    except Exception as e:
            logger.error(str(e))
            print(f"Error in run_fitfile2db: {e}")

## Go!

In [None]:
run_fitfile2db()