<!-- Initialize **API** **managers** -->

In [1]:
import os
from Powerfleet_APIs_Management import PowerFleetAPIsManager as ApiManager
from colorama import Fore, Style
import sys  # For printing caught exceptions
import re
import json
from datetime import datetime


print(Fore.YELLOW + f"MAIN CWD={os.getcwd()}")
print(f"MAIN, LISTDIR={os.listdir()}" + Style.RESET_ALL)

# Define the ApiParameterExtractor class
class ApiParameterExtractor:
    def __init__(self, json_file):
        with open(json_file, 'r') as file:
            self.parameters = json.load(file)

    def extract_parameters(self, api_type):
        # Normalize API type to lowercase
        api_type = api_type.lower()
        
        # Validate API type
        if api_type not in self.parameters:
            raise ValueError(f"Invalid API type: {api_type}. Valid types are 'live_api' or 'snapshot_api'.")
        
        # Return the corresponding dictionary for the given API type
        return self.parameters[api_type]


extractor = ApiParameterExtractor("PARAMETERS.json")

# Extract parameters for live_api and snapshot_api
live_api_params         = extractor.extract_parameters("live_api")
snapshot_api_params     = extractor.extract_parameters("snapshot_api")

# Pass the extracted parameters to PowerFleetAPIsManager
live_api_manager        = ApiManager(live_api_params)
snapshot_api_manager    = ApiManager(snapshot_api_params)




[33mMAIN CWD=/home/georger/WorkDocuments/Sxoli/Ptyxiaki/Ptyxiaki-Repository/Python_Scripts/Data_Analysis
MAIN, LISTDIR=['Powerfleet_APIs_Management.py', 'PARAMETERS.json', '__pycache__', 'data_analysis.ipynb'][0m


(Optional) Update mongoDB for visualizing coordinates

In [2]:
from Powerfleet_APIs_Management import MongoDBConnector as DBConnector


connector = DBConnector()
if connector.check_connection():
    print("Proceed with operations.")
else:
    print("Fix connection issues before proceeding.")

[32mConnection to MongoDB is successful![0m
Proceed with operations.


1. Convert **JSON** to **CSV** so that **pandas** be able to use it
2. Delete the <u>JSON</u> files
3. Return the <u>vehicleId</u> with the most lines for data Analisys



In [3]:
import json
import pandas as pd
import os
from pathlib import Path
import csv

# Get the Live API data
live_data = live_api_manager.get_live_data()

# Prepare and get Snapshot API data
vehicleId       = snapshot_api_params["vehicleId"]
startDate       = snapshot_api_params["startDate"]
endDate         = snapshot_api_params["endDate"]
snapshot_data   = snapshot_api_manager.get_snapshot_data(vehicleId, startDate, endDate)

#! Parse JSON strings into dictionaries or lists
try:
    # Attempt to decode Live API data
    live_data_dict = json.loads(live_data) if isinstance(live_data, str) else live_data
except json.JSONDecodeError as e:
    print(f"Error decoding Live JSON: {e}")
    live_data_dict = {}

try:
    # Attempt to decode Snapshot API data
    snapshot_data_dict = json.loads(snapshot_data) if isinstance(snapshot_data, str) else snapshot_data
except json.JSONDecodeError as e:
    print(f"Error decoding Snapshot JSON: {e}")
    snapshot_data_dict = {}

#! Handle Live Data: Convert to a list of dictionaries
if isinstance(live_data_dict, dict):
    live_data_list = [value for key, value in live_data_dict.items()]
elif isinstance(live_data_dict, list):
    live_data_list = live_data_dict  # Already a list of dictionaries
else:
    live_data_list = []  #* Fallback to empty list if structure is unexpected

#! Handle Snapshot Data: Convert to a list of dictionaries
if isinstance(snapshot_data_dict, dict):
    snapshot_data_list = [value for key, value in snapshot_data_dict.items()]
elif isinstance(snapshot_data_dict, list):
    snapshot_data_list = snapshot_data_dict  # Already a list of dictionaries
else:
    snapshot_data_list = []  # Fallback to empty list if structure is unexpected

#! Create DataFrames from the lists
live_df = pd.DataFrame(live_data_list)
snapshot_df = pd.DataFrame(snapshot_data_list)


#* Save DataFrames to CSV files
output_dir = "../../DataSets"

# Ensure the directory exists
os.makedirs(output_dir, exist_ok=True)

# Define CSV file paths
live_csv_path = Path(output_dir) / "Live_API_Response_data_set.csv"
snapshot_csv_path = Path(output_dir) / "Snapshot_API_Response_data_set.csv"

#* Save data to CSV files
live_df.to_csv(live_csv_path, index=False)
snapshot_df.to_csv(snapshot_csv_path, index=False)

print(f"Live data saved to {live_csv_path}")
print(f"Snapshot data saved to {snapshot_csv_path}")

# # === Solution to find the vehicleId with the most lines ===
# max_lines = 0
# max_vehicleId_lines = 1  

# for i in range(1, 21):
#     if isinstance(snapshot_data_dict, list):
#         for obj in snapshot_data_dict:
#             if "vehicleId" in obj:  # Ensure "vehicleId" exists in the object
#                 obj["vehicleId"] = str(i)
#     elif isinstance(snapshot_data_dict, dict):
#         snapshot_data_dict["vehicleId"] = str(i)
    
#     # Convert the modified JSON to a pretty-printed string
#     json_string = json.dumps(snapshot_data_dict, indent=4)
    
#     # Count the number of lines in the JSON string
#     num_lines = json_string.count('\n') + 1  # Count '\n' characters and add 1 for the last line
    
    
#     if num_lines > max_lines:
#         max_lines = num_lines
#         max_vehicleId_lines = i

# print(f"VehicleId with the most lines: {max_vehicleId_lines}")


**Live API**
[32mLive API Request Successful![0m
**Snapshot API**
[32mSnapshot API Request Successful![0m
Live data saved to ../../DataSets/Live_API_Response_data_set.csv
Snapshot data saved to ../../DataSets/Snapshot_API_Response_data_set.csv


3. <u>Sort</u> **Live data CSV** by <u>vehicleId</u>

In [4]:
#! Sort live_df by "vehicleId" in ascending order
if "vehicleId" in live_df.columns:
    try:
        # Convert "vehicleId" to numeric, handling non-numeric values gracefully
        live_df["vehicleId"] = pd.to_numeric(live_df["vehicleId"], errors="coerce")
        #! Drop rows where vehicleId couldn't be converted to a number (optional)
        live_df = live_df.dropna(subset=["vehicleId"])
        live_df = live_df.sort_values(by="vehicleId", ascending=True)
    except Exception as e:
        print(f"Error while sorting by 'vehicleId': {e}")

# Save DataFrames to CSV files
output_dir = "../../DataSets/API_Responses/"

# Ensure the directory exists
os.makedirs(output_dir, exist_ok=True)

# Define CSV file paths
live_csv_path = Path(output_dir) / "Live_API_DataSets.csv"
snapshot_csv_path = Path(output_dir) / "Snapshot_API_Response_data_set.csv"

# Save data to CSV files
live_df.to_csv(live_csv_path, index=False)
snapshot_df.to_csv(snapshot_csv_path, index=False)

print(f"Live data saved to {live_csv_path}")
print(f"Snapshot data saved to {snapshot_csv_path}")


Live data saved to ../../DataSets/API_Responses/Live_API_DataSets.csv
Snapshot data saved to ../../DataSets/API_Responses/Snapshot_API_Response_data_set.csv


Create data sets for each vehicle from: **2024-01-01 00:00:00** to: **2024-12-01 20:59:59**

In [5]:
target_file = "../../DataSets/API_Responses/Every_vehicle_resposne.csv"
# Read headers from the source file
with open(snapshot_csv_path, mode="r") as infile:
    reader = csv.reader(infile)
    headers = next(reader)  # Get the first row (headers)


# Write headers to the target file
with open(target_file, mode="w", newline="") as outfile:
    writer = csv.writer(outfile)
    writer.writerow(headers)  # Write the headers to the new file


with open("PARAMETERS.json", "r") as file:
    PARAMETERS = json.load(file)  #* Parse the JSON content into a dictionary
    
    
_cnt = 1
while (snapshot_data != None):
    with open(target_file, mode="r") as all_data:
        #* Read for every vehicle that ca be found
        PARAMETERS["snapshot_api"]["vehicleId"] = str(_cnt)      
        vehicleId                               = snapshot_api_params["vehicleId"]
        snapshot_data                           = snapshot_api_manager.get_snapshot_data(vehicleId, startDate, endDate)   
        print(snapshot_data)
        if _cnt == 3: break
        all_data                                = writer = csv.writer(snapshot_data)
        
        
    _cnt+=1
    print(_cnt)
    

**Snapshot API**
[32mSnapshot API Request Successful![0m
[
    {
        "vehicleId": 20,
        "lat": 38.0346416,
        "lng": 23.7481,
        "dateStored": 1716383693000,
        "velocity": 0.0,
        "odometer": 0.0,
        "engineVoltage": 0.0
    },
    {
        "vehicleId": 20,
        "lat": 38.0345733,
        "lng": 23.7482316,
        "dateStored": 1716383700000,
        "velocity": 10.0,
        "odometer": 0.0,
        "engineVoltage": 0.0
    },
    {
        "vehicleId": 20,
        "lat": 38.034535,
        "lng": 23.748225,
        "dateStored": 1716383706000,
        "velocity": 0.0,
        "odometer": 0.0,
        "engineVoltage": 0.0
    },
    {
        "vehicleId": 20,
        "lat": 38.0344333,
        "lng": 23.74818,
        "dateStored": 1716383761000,
        "velocity": 6.0,
        "odometer": 0.0,
        "engineVoltage": 0.0
    },
    {
        "vehicleId": 20,
        "lat": 38.0344083,
        "lng": 23.7482366,
        "dateStored": 171638

TypeError: argument 1 must have a "write" method

Create **Velocity Dictionary** from Multiple CSV Files

In [None]:
import pandas as pd
import os

# Define the CSV file path
snapshot_file_path = "../../DataSets/API_Responses/Snapshot_API_Response_data_set.csv"  # Replace with your actual file path

# Check if the file exists
if os.path.exists(snapshot_file_path) and os.path.getsize(snapshot_file_path) > 0:
    try:
        # Read the CSV file into a DataFrame
        snapshot_df = pd.read_csv(snapshot_file_path)

        # Ensure the columns exist
        if "lat" in snapshot_df.columns and "lng" in snapshot_df.columns and "velocity" in snapshot_df.columns:
            #* Create a dictionary to store coordinate pairs and corresponding velocities
            coordinates_dict = {}
        
            for index, row in snapshot_df.iterrows():
                #! Get the coordinate pair and velocity
                lat = round(row["lat"], 4)  #! Round to 4 decimals
                lng = round(row["lng"], 4)  #! Round to 4 decimals
                velocity = row["velocity"]

                #! Create a key for the coordinate pair
                coordinate_pair = (lat, lng)

                #* Add the velocity to the list of velocities for the coordinate pair
                if coordinate_pair not in coordinates_dict:
                    coordinates_dict[coordinate_pair] = [velocity]
                else:
                    coordinates_dict[coordinate_pair].append(velocity)

            print("Coordinates dictionary created successfully!")

        else:
            print("Error: CSV file is missing required columns ('lat', 'lng', 'velocity')")
    
    except Exception as e:
        print(f"Unexpected error: {e}")
else:
    print(f"Error: File is empty or does not exist at path {snapshot_file_path}")
    


print("First 5 coordinate pairs and velocities:")
for index, (coords, velocities) in enumerate(coordinates_dict.items()):
    if index >= 5:  
        break
    print(f"Coordinates: {coords}, Velocities: {velocities}")
print(f"\nTotal unique coordinate pairs: {len(coordinates_dict)}")



Visualize data **dispersion**