<h1 align="center">TMDA (Transportation Mode Detector and Analyzer)</h1>

<h2 align="center">Introduction</h2>

Transportation mode detection (TMD) is a well-known sub-task of a more general one called Human Activity Recognition (HAR), which aim to understand what activities a user is performing, through data produced during those activities.

For this particular project only the classes bus, car, still, train and walking are considered to be a transportation mode, while data used to train a classifier comes from accelerometer and gyroscope sensors, read with a frequency of 20 Hz.

To show more about a detection, location data (latitude and longitude) are also included, but not used in the training phase.

<h2 align="center">Workflow</h2>

<img src="./workflow/WorkFlow.drawio.png" width="800" />

<h2 align="center">Data Acquisition</h2>

<img src="./img/phyphox.png" width="100" />

Data are collected via the mobile app **Phyphox**, that allows to select sensors to read from and the reading frequency (in this case 20 Hz).

Once the recording is started, sensors data are read and, at the end of it, can be exported as a zip file, containing one CSV for each sensors, plus metadata (such as time of recording and device used).

A script in python will listen to a folder containing the above zip files and, when there is something to read, it will extract and merge sensors CSVs into one single CSV called **sensors.csv**.

In **update_sensors_csv()** is also performed a filtering of sensors data older than 5 seconds from the start of the recording, so that only the first 5 seconds of reading for each sensors are kept.

In future this could be an embedded function of the recording app. 

In [None]:
while(True):
        # get a list of zip files
        zip_list = glob.glob('*.zip') 
        # if is not empty, update sensors.csv, else sleep for 5 sec
        if len(zip_list) > 0:
            zf = get_earliest_zipfile(zip_list)
            update_sensors_csv(zf)
        sleep(5)

<h2 align="center"> Data Ingestion</h2>

<img src="./img/logstash_logo.svg" width="100" />

Ingestion is provided by Logstash, that reads updated lines of the file sensors.csv constantly, sending them to a kafka topic called **sensors-raw**.

That's it, thank you Logstash <3.

<p align="center">
  <img src="./memes/logstash_heaven.jpg"/>
</p>

<h2 align="center"> Stream Processing</h2>

<img src="./img/stream_processing.png" width="250" />

A Kafka cluster is responsible for handling streams coming from Logstash and storing them efficientely to specific topics.

Data are also replicated across different Kafka brokers, so if one of them goes down (temporarily or permanently) we don't lose any data.

There are two main topics: sensors-raw (for data coming from Logstash) and sensors (for data cleaned by a Spark container) 

<h2 align="center">Data Cleaning</h2>

<img src="./img/spark_logo.png" width="150" />

Before making any prediction, data must be cleaned, in order to extract features like **mean**, **min**, **max** and **stddev** in a 5 seconds time window.

For this reason session windows were used.

<img src="./img/session_windows.png" width="800" />

A session window begins with a single data point and broadens itself in case of the upcoming element has been collected inside of the gap period.
When the last item is accepted, the session window ends when no items are acknowledged inside of the gap period.

Note that we can't control the dimension of a window, wich is determined by events themselves.

<p align="center">
  <img src="./memes/session_window_5sec.jpg"/>
</p>

Remember, in data acquisition a function that takes only 5 seconds of data for each sensor was already applied

<p align="center">
  <img src="./memes/normal_windows_why.jpg"/>
</p>

Because if user1 records at time **t** and user2 at time **t+1**, tumbling window is in \[**t**, **t+5**\].
So at time **t+5** we do grouping without including the last second of user2 in this window.
At the end we grouped 5 seconds for user1 and 4 seconds for user2.

<u>We need to use session windows to have dedicated windows for each user</u>, beginning when events for that particular user occour.

<p align="center">
  <img src="./memes/confusion.jpg"/>
</p>

If you didn't understand, don't worry and take a look at this https://towardsdatascience.com/spark-3-2-session-windowing-feature-for-streaming-data-e404d92e267

<h2 align="center">Value Extraction</h2>

<img src="./img/spark_logo.png" width="150" />

Using the model created at the end of the training phase, we enrich our data with predictions on transportation mode, given a set of input features (extracted earlier)

<p align="center">
  <img src="./memes/tuning.jpg"/>
</p>

<h2 align="center"> Data Indexing</h2>

<img src="./img/elastic_search_logo.svg" width="80" />

Data indexing allow to search and query our data at the speed of light, because: "Elasticsearch is fast. Really, really fast."

<h2 align="center"> Data Visualization</h2>

<img src="./img/kibana_logo.svg" width="80" />

Finally we can visualize the results of computed on streaming data through charts realized with Kibana

<p align="center">
  <img src="./img/charts.jpg"/>
</p>

<p align="center">
  <img src="./memes/we_did_it.gif"/>
</p>

# Thanks