In [1]:
import socket
import numpy as np
import pandas as pd
import time
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

UDP_IP = "192.168.1.14"
UDP_PORT = 5555
CHUNKSIZE=20000
sock = socket.socket(socket.AF_INET, # Internet
                    socket.SOCK_DGRAM) # UDP

sock.bind((UDP_IP, UDP_PORT))

def divide_chunks(l, n):    
    # looping til length l
    for i in range(0, len(l), n):
        yield l[i:i + n] #How many elements each list should have

def chunk_to_df(total, msgpack=False):
    received = "".join(total).split(",")[1:]
    chunks = list(divide_chunks(received, 4))
    measurements = np.vstack(chunks)
    df = pd.DataFrame(measurements, columns=["sensor", "x", "y", "z"])
    if msgpack:
        df = df.to_msgpack()
    return df

def prep_df(df):
    for c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    if df.shape[0]>CHUNKSIZE:
        df = df.iloc[:CHUNKSIZE]
    else:
        df = df.append(pd.DataFrame(index=range(CHUNKSIZE-len(df)), columns=df.columns), ignore_index=True)
        df.fillna(method="pad", inplace=True)
    return df.values

def prep_df(df, msgpack=False):
    for c in df.columns:
        if c=="sensor":
            df[c] = pd.to_numeric(df[c], errors="coerce", downcast="unsigned")
        else:
            df[c] = pd.to_numeric(df[c], errors="coerce", downcast="float")
    #df["sensor"] = df["sensor"].astype(np.int8)
    if df.shape[0]>CHUNKSIZE:
        df = df.iloc[:CHUNKSIZE]
    #else:
    #    df = df.append(pd.DataFrame(index=range(CHUNKSIZE-len(df)), columns=df.columns), ignore_index=True)
    #    df.fillna(method="pad", inplace=True)
    if msgpack:
        df = df.to_msgpack()
        return df
    else:
        return df

chunks = []
try:
    while True:
        reset = False
        print("Waiting for new stream.")
        while not reset:
            total = []
            reset_time = time.time()
            data, addr = sock.recvfrom(256) 
            if time.time()>reset_time+3:
                print("Stream reset")
                reset = True
        print("Starting stream in 3 seconds")
        time.sleep(5)
        print("Starting stream...")
        start = time.time()
        while time.time() < start+300:
            data, addr = sock.recvfrom(256) 
            total.append(data.decode("UTF-8"))
        print("Stream finished.")
        df = chunk_to_df(total, False)
        df = prep_df(df)
        break
        #chunks.append(chunk)
        #channel.basic_publish(exchange='', routing_key='hello', body=df)  

except KeyboardInterrupt:
    print("Closing connection.")
    sock.close()
    connection.close()

Waiting for new stream.
Stream reset
Starting stream in 3 seconds
Starting stream...
Stream finished.


In [2]:
df.shape

(9643, 4)

In [3]:
df

Unnamed: 0,sensor,x,y,z
0,3,-0.268,3.620000,8.811
1,5,-8.375,-20.875000,
2,3,-0.622,3.840000,9.290
3,5,-8.313,-20.188000,
4,3,-0.680,3.965000,7.748
5,4,-0.048,-0.189000,0.007
6,5,-9.063,-19.563000,
7,3,0.910,3.333000,9.740
8,4,-0.412,-1.142000,0.036
9,5,-9.063,-20.938000,


In [4]:
for n, g in df.groupby("sensor"):
    g.to_pickle("data/positive_v2_"+str(n))

In [5]:
for i in range(3,6):
    df = pd.read_pickle("data/positive_v2_"+str(i))
    print(df.head())
    print(df.shape)

    sensor      x      y      z
0        3 -0.268  3.620  8.811
2        3 -0.622  3.840  9.290
4        3 -0.680  3.965  7.748
7        3  0.910  3.333  9.740
10       3  0.632  3.400  8.447
(3464, 4)
    sensor      x      y      z
5        4 -0.048 -0.189  0.007
8        4 -0.412 -1.142  0.036
11       4 -0.059 -1.140  0.161
14       4 -0.240  0.118  0.746
17       4 -0.070 -0.321  0.131
(3072, 4)
    sensor       x       y   z
1        5  -8.375 -20.875 NaN
3        5  -8.313 -20.188 NaN
6        5  -9.063 -19.563 NaN
9        5  -9.063 -20.938 NaN
12       5 -13.250 -17.563 NaN
(3107, 4)


In [6]:
dfs = []
for i in range(3,6):
    df = pd.read_pickle("data/positive_v2_"+str(i))
    df.drop("sensor", axis=1, inplace=True)
    df.reset_index(drop=True, inplace=True)
    dfs.append(df)

train = pd.concat(dfs, ignore_index=True,  axis=1)
train.columns = [sensor+m for sensor in ["acc_", "gyro_", "mag_"] for m in ["x", "y", "z"]]
train.drop("mag_z", axis=1, inplace=True)
train.reset_index(drop=True, inplace=True)

In [7]:
train

Unnamed: 0,acc_x,acc_y,acc_z,gyro_x,gyro_y,gyro_z,mag_x,mag_y
0,-0.268,3.620,8.811,-0.048,-0.189,0.007,-8.375,-20.875
1,-0.622,3.840,9.290,-0.412,-1.142,0.036,-8.313,-20.188
2,-0.680,3.965,7.748,-0.059,-1.140,0.161,-9.063,-19.563
3,0.910,3.333,9.740,-0.240,0.118,0.746,-9.063,-20.938
4,0.632,3.400,8.447,-0.070,-0.321,0.131,-13.250,-17.563
5,2.002,2.892,9.261,-0.204,-0.573,-0.208,-20.313,-14.938
6,1.590,3.160,8.341,-0.060,-0.712,-0.088,-25.063,-6.000
7,1.437,3.007,9.021,-1.689,-0.302,1.523,-30.563,-0.563
8,-0.230,2.567,4.693,-2.348,-3.184,2.032,-34.063,2.125
9,0.326,1.111,7.470,-3.761,0.645,0.757,-38.188,5.500


In [9]:
train.dropna().to_pickle("data/df_positive_v2")