# TapEth

## Project Goal

The main goal of this project is to statistically analyze incoming _pending transaction_ from the **Ethereum** main network and estimate their waiting time before being mined in a block.


## Developer

I'm **Alessandro Catalano** (you can find me everywhere as **Wornairz**) and i'm a CS student at UniCT. <br>
I'm interested on Software Engineering, Distributed Computing and Cloud Computing.

## Which API are used

| Service | Description |  |  |
| :------- | :----------- | ----- | ----- |
| **Infura**  | provides a **geth** node for free, used for JSON-RPC API of Ethereum | <img src="./img/infura.svg" width="100" height="60"/> | <img src="./img/geth.png" width="50" height="60"/> |
| **ETH Gas Station** | provides a **prediction table** based on the last 200 transactions | <img src="./img/ethgasstation.png" width="60" height="70"/> | |


## geth / Infura JSON-RPC API example

In [1]:
import nest_asyncio
nest_asyncio.apply()

In [2]:
import asyncio
import websockets

new_pending_transactions_request = '{"jsonrpc":"2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}'

wss_uri = input("Insert your infura wss uri: ")
async def newPendingTransactions():
    async with websockets.connect(wss_uri) as websocket:
        print("Connected successfully")
        await websocket.send(new_pending_transactions_request)

        for i in range(0, 5):
            transaction = await websocket.recv()
            print(f"< {transaction}")

asyncio.get_event_loop().run_until_complete(newPendingTransactions())

Insert your infura wss uri: wss://mainnet.infura.io/ws/v3/b8c47bf19cac4d448b3b329f89b0460e
Connected successfully
< {"jsonrpc":"2.0","id":1,"result":"0x49f3773c497b685e6eb1ebbf6b1a4beb"}
< {"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x49f3773c497b685e6eb1ebbf6b1a4beb","result":"0x182b4176962b26c816f1a5df2936e9574129c44222f165c9a7d7c37d8ccadf24"}}

< {"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x49f3773c497b685e6eb1ebbf6b1a4beb","result":"0x12c0ea814322320f50e00ee5ae790634bde1a4c7766fcd401088e9bfa35697c4"}}

< {"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x49f3773c497b685e6eb1ebbf6b1a4beb","result":"0xb0a95ed78d3d5f7cfd755fd8037904dc1ab0f45be018b0fe0b547ebf67963cb1"}}

< {"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x49f3773c497b685e6eb1ebbf6b1a4beb","result":"0x30a00a4e31b01e07753a2a53e0dda445f86a86caa58fc800d244862794c7e39b"}}



In [None]:
import asyncio
import websockets

new_blocks_request = '{"jsonrpc":"2.0", "id": 1, "method": "eth_subscribe", "params": ["newHeads"]}'

wss_uri = input("Insert your infura wss uri: ")
async def newBlocks():
    async with websockets.connect(wss_uri) as websocket:
        print("Connected successfully")
        await websocket.send(new_blocks_request)

        for i in range(0, 5):
            new_block = await websocket.recv()
            print(f"< {new_block}")

asyncio.get_event_loop().run_until_complete(newBlocks())

## ETH Gas Station API example

In [None]:
import requests

data = requests.get("https://ethgasstation.info/api/predictTable.json")
print(data.text)

## Technologies used

<img src="./img/technologies.png">

<img src="./meme/tom_tech.jpg"/>

<img src="./meme/soldiers.jpg" align="center"/>

## Data source

| | | |
|-|-|-|
|<img src="./img/ethereum.png" width="130" height="200">|<img src="./img/plus.png" width="80" height="80">|<img src="./img/infura.svg" width="250" height="150">|

- Ethereum it's a decentralized open source blockchain featuring smart contract functionality. It's the second largest cryptocurrency platform by market capitalization, behind Bitcoin.
- Infura is a service that provides a free (with limitations) geth node

# Kafka

Apache Kafka is an open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java.
<img src="./img/Kafka.png" width="400" height="300"/>
Apache Kafka aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. <br>
Kafka it's also scalable and fault-tolerant.

## Kafka Connect

Kafka Connect, an open source component of Apache Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. <br>
In TapEth, Kafka Connect is used for **data ingestion**, reading incoming pending transaction, through a websocket, from Infura. <br>
A custom Kafka Connector was developed, in **Java**, for this purpouse.

<img src="./meme/flume joke.jpg"/>

After reading the pending transactions details, the data is written into a Kafka **topic** that will be later consumed by Spark application.
<img src="./img/kafka topic.png"/>

# Spark

Apache Spark is an open-source distributed general-purpose cluster-computing framework; it's also fault tolerant.<br> It's an Apache's top project.
<img src="./img/spark.png" width="550" height="350"/>
Spark is usually used into the Hadoop ecosystem. Spark completely replaced Hadoop's Map-Reduce, as it's way faster since Spark performs operation in memory.

### The Spark application was written entirely in **Java**.
<img src="./meme/python no power.jpg"/>

<img src="./meme/nicotra vs java.jpg"/>

# Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.<br>
Data can be ingested from various sources, in our case from Kafka, reading from the Kafka topic. <br>
<img src="./img/spark streaming.png"/>

# Spark MLlib

MLlib is Apache Spark's scalable machine learning library.
<img src="./img/spark mllib.png" width="500" height="200" style="margin: 0"/>

## Machine Learning Model
A LinearRegressionModel is used for estimating the waiting time of each transaction. The model is created from the **Eth Gas Station** prediction table.

<img  src="./meme/zero.jpg"/>

<img src="./meme/negative values.jpg"/>

# Elastic Search

Elasticsearch is a search engine based on the Apache Lucene library. It provides a distributed full-text search engine with an HTTP web interface and schema-free JSON documents.
Elasticsearch can be used to search all kinds of documents. It provides scalable search and has near real-time search.

<img src="./img/elasticsearch.svg"/>


# Kibana

Kibana is an open source data visualization dashboard for Elasticsearch. It provides visualization capabilities on top of the content indexed on an Elasticsearch cluster. <br>
Users can create bar, line and scatter plots, or pie charts and maps on top of large volumes of data.

<img src="./img/kibana.png" width="500" height="300" />

Demo [here](http://localhost:5601)

<img src="./meme/shame java.jpg"/>

# Thank you for your attention

I Hope you enjoyed the presentation

# Contacts

| | | |
|-|-|-|
|<img src="./img/telegram.png" width="50" height="50"> | Telegram | @Wornairz |
|<img src="./img/email.png" width="50" height="50"> | Email | catalano.alessandro@studium.unict.it |
|<img src="./img/linkedin.png" width="50" height="50"> | LinkedIn | Alessandro Catalano |