# handle test and train data


In [2]:
import json

def flatten_json(json_obj, prefix=''):
    """
    Recursively flattens the JSON structure into a flat dictionary of key-value pairs.

    :param json_obj: The JSON object (dict, list, or primitive).
    :param prefix: The current prefix to build the path.
    :return: A flat dictionary with keys as the path to the original value.
    """
    items = []
    if isinstance(json_obj, dict):
        for key, value in json_obj.items():
            new_key = f"{prefix}.{key}" if prefix else key
            items.extend(flatten_json(value, new_key).items())
    elif isinstance(json_obj, list):
        for index, value in enumerate(json_obj):
            new_key = f"{prefix}[{index}]"
            items.extend(flatten_json(value, new_key).items())
    else:
        items.append((prefix, json_obj))
    return dict(items)

def extract_paths(json_obj, current_path=""):
    paths = []

    if isinstance(json_obj, dict):
        for key, value in json_obj.items():
            new_path = f"{current_path}.{key}" if current_path else key
            paths.extend(extract_paths(value, new_path))

    elif isinstance(json_obj, list):
        for index, value in enumerate(json_obj):
            new_path = f"{current_path}[{index}]"
            paths.extend(extract_paths(value, new_path))

    else:
        paths.append(current_path)

    return paths

# Example usage
if __name__ == "__main__":
    # Sample JSON object
    json_data = {
        "user": {
            "profile": {
                "name": "John Doe",
                "age": 30
            },
            "settings": {
                "theme": "dark"
            }
        },
        "posts": [
            {"id": 1, "content": "Post 1"},
            {"id": 2, "content": "Post 2"}
        ]
    }

    # Extract paths from the JSON
    semantic_paths = extract_paths(json_data)
    fl = flatten_json(json_data)

    print(fl)

    # Print the paths
    for path in semantic_paths:
        print(path)


{'user.profile.name': 'John Doe', 'user.profile.age': 30, 'user.settings.theme': 'dark', 'posts[0].id': 1, 'posts[0].content': 'Post 1', 'posts[1].id': 2, 'posts[1].content': 'Post 2'}
user.profile.name
user.profile.age
user.settings.theme
posts[0].id
posts[0].content
posts[1].id
posts[1].content


In [3]:
import json


class Group:
    def __init__(self, group_id, name, key_words, samples):
        self.id = group_id
        self.name = name
        self.key_words = key_words
        self.samples = samples
        self.vectorized_samples = None

    def __repr__(self):
        return f"Group(id={self.id}, name={self.name}, samples={self.samples})"


def parse_labels(label_file: str) -> list[Group]:
    with open(label_file, 'r') as f:
        data = json.load(f)

    groups = [
        Group(
            group_id=label["id"],
            name=label["name"],
            key_words=label.get("key_words", []),
            samples=label.get("samples", [])
        )
        for label in data[0].get("labels", [])
    ]
    return groups

In [5]:
groups = parse_labels('labels.json')

In [6]:
print(groups)

[Group(id=0, name=battery_voltage, samples=['batV', 'batt_v', 'batt_mV', 'bat_mV', 'Bat_V', 'Bat_mV', 'batteryVoltage', 'vbat', 'vBat', 'batVoltage', 'battery_vol', 'battVoltage', 'Vbat', 'BatV', 'BAT_V', 'batteryLevel', 'battery_level', 'batLevel', '[03] Voltage Battery/Input [V]']), Group(id=1, name=battery_voltage_mean_24h, samples=['battery_voltage_mean_24h']), Group(id=2, name=battery_percent, samples=['batteryPerc', 'battPerc', 'battery_capacity_percentage', 'batteryCapacity', 'battery_health', 'Battery']), Group(id=3, name=battery_status, samples=['Bat_status']), Group(id=4, name=outdoor_temperature, samples=['temperatures', 'temperature', 'temp_SOIL', 'TEMP_SOIL', 'air_temperature', 'Air Temperature', 'ambientTemp', 'ambient_temperature', 'Ambient_Temperature', 'barometer_temperature', 'temperature_head', 'head_temperature', 'surface_temperature', 'tempC_DS18B20', 'TempC_DS18B20', 'tempC_SHT', 'TempC_SHT', 'TempPH', 'TempNTU', 'SoildTemp5TE', 'TempLDO', 'temp_Channel', 'Temp_Ch

In [6]:
# train data
x = []
y = []
for e in groups:
  x.extend(e.samples)
  # x.extend(e.key_words)
  # y.extend([e.id] * (len(e.samples) + len(e.key_words)))
  y.extend([e.id] * len(e.samples))


In [7]:
#get test set
f = open('adeunis--arf8123aa.json', 'r')


json_data = json.load(f)
f.close()
# a = flatten_json(json_text)
x_test = extract_paths(json_data)

In [8]:
print(x_test)

['deduplicationId', 'time', 'deviceInfo.tenantId', 'deviceInfo.tenantName', 'deviceInfo.applicationId', 'deviceInfo.applicationName', 'deviceInfo.deviceProfileId', 'deviceInfo.deviceProfileName', 'deviceInfo.deviceName', 'deviceInfo.devEui', 'deviceInfo.deviceClassEnabled', 'devAddr', 'adr', 'dr', 'fCnt', 'fPort', 'confirmed', 'data', 'object.dl_counter', 'object.hdop', 'object.rssi_dl', 'object.temperature', 'object.gps_quality', 'object.battery_level', 'object.lati_hemisphere', 'object.ul_counter', 'object.snr_dl', 'object.sats', 'object.koterberg', 'object.longitude', 'object.payload', 'object.long_hemisphere', 'object.latitude', 'rxInfo[0].gatewayId', 'rxInfo[0].uplinkId', 'rxInfo[0].time', 'rxInfo[0].timeSinceGpsEpoch', 'rxInfo[0].fineTimeSinceGpsEpoch', 'rxInfo[0].rssi', 'rxInfo[0].snr', 'rxInfo[0].channel', 'rxInfo[0].rfChain', 'rxInfo[0].location.latitude', 'rxInfo[0].location.longitude', 'rxInfo[0].location.altitude', 'rxInfo[0].context', 'rxInfo[0].metadata.region_common_name

# vectorization & rfc


In [9]:
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer, HashingVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.svm import SVC


# Count vectorization for paths
vectorizer = CountVectorizer(analyzer='char', ngram_range=(3, 4))
# vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 5))
# vectorizer = HashingVectorizer(analyzer='char', ngram_range=(3, 5), n_features=1000)

X = vectorizer.fit_transform(x)  # sparse matrix

# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# classifier = RandomForestClassifier()
classifier = SVC(kernel='linear')
# rf_classifier = RandomForestClassifier(n_estimators=100)
# rf_classifier.fit(X_train, y_train)
# rf_classifier.fit(X, y)

# y_pred = rf_classifier.predict(X_test)
# from sklearn.metrics import accuracy_score

# accuracy = accuracy_score(y_test, y_pred)
# print(accuracy)


In [10]:
import numpy as np

In [11]:
scores = cross_val_score(classifier, X, y, cv=5, verbose=2)
print("Cross-validation scores:", scores)
print("Mean cross-validation score:", np.mean(scores))



[CV] END .................................................... total time=   0.1s
[CV] END .................................................... total time=   0.1s
[CV] END .................................................... total time=   0.1s
[CV] END .................................................... total time=   0.1s
[CV] END .................................................... total time=   0.1s
Cross-validation scores: [0.55172414 0.56896552 0.39655172 0.51724138 0.38596491]
Mean cross-validation score: 0.4840895341802782


In [12]:
# train on all available data
# from gensim.models import Word2Vec

vectorizer = CountVectorizer(analyzer='char', ngram_range=(3, 6))

X = vectorizer.fit_transform(x)  # sparse matrix

rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
rf_classifier.fit(X, y)

In [78]:

from gensim.models import Word2Vec
import re

delimiters = ['_', '.']
pattern = '|'.join(map(re.escape, delimiters))


result = re.split(pattern, text)
result = [word for word in result if word]

x_split = [re.split(pattern, e) for e in x]
print(x_split)


[['batV'], ['batt_v'], ['batt_mV'], ['bat_mV'], ['Bat_V'], ['Bat_mV'], ['batteryVoltage'], ['vbat'], ['vBat'], ['batVoltage'], ['battery_vol'], ['battVoltage'], ['Vbat'], ['BatV'], ['BAT_V'], ['batteryLevel'], ['battery_level'], ['batLevel'], ['[03] Voltage Battery/Input [V]'], ['battery_voltage_mean_24h'], ['batteryPerc'], ['battPerc'], ['battery_capacity_percentage'], ['batteryCapacity'], ['battery_health'], ['Battery'], ['Bat_status'], ['temperatures'], ['temperature'], ['temp_SOIL'], ['TEMP_SOIL'], ['air_temperature'], ['Air Temperature'], ['ambientTemp'], ['ambient_temperature'], ['Ambient_Temperature'], ['barometer_temperature'], ['temperature_head'], ['head_temperature'], ['surface_temperature'], ['tempC_DS18B20'], ['TempC_DS18B20'], ['tempC_SHT'], ['TempC_SHT'], ['TempPH'], ['TempNTU'], ['SoildTemp5TE'], ['TempLDO'], ['temp_Channel'], ['Temp_Channel'], ['Leaf_Temperature'], ['Flow_Temperature'], ['soil_temperature'], ['temperature_pt1000'], ['soil_temperature_at_depth'], ['wate

In [13]:

# x_test_vec = vectorizer.fit(x_test)
x_test_vec = vectorizer.transform(x_test)
y_pred = rf_classifier.predict(x_test_vec)



for i in range(len(y_pred)):
  classname = [group.name for group in groups if group.id == y_pred[i]][0]
  print(f'{x_test[i]} classified as {classname}')

deduplicationId classified as voltage
time classified as timestamp
deviceInfo.tenantId classified as voltage
deviceInfo.tenantName classified as voltage
deviceInfo.applicationId classified as motion_detected
deviceInfo.applicationName classified as motion_detected
deviceInfo.deviceProfileId classified as voltage
deviceInfo.deviceProfileName classified as voltage
deviceInfo.deviceName classified as voltage
deviceInfo.devEui classified as voltage
deviceInfo.deviceClassEnabled classified as illuminance_lx
devAddr classified as voltage
adr classified as voltage
dr classified as voltage
fCnt classified as voltage
fPort classified as voltage
confirmed classified as fire_alarm
data classified as voltage
object.dl_counter classified as common_error_count
object.hdop classified as horizontal_dilution_precision
object.rssi_dl classified as rssi_dBm
object.temperature classified as outdoor_temperature
object.gps_quality classified as voltage
object.battery_level classified as battery_voltage
obje

In [None]:
res = {}

for i in range(len(y_pred)):
  # get the leaf

  res[]
  classname = [group.name for group in groups if group.id == y_pred[i]][0]
  print(f'{x_test[i]} classified as {classname}')