### <center>QuakeMatch </center>
### <center>Luca Longo, Università degli Studi di Catania </center>
<img src="https://user-images.githubusercontent.com/50534107/256270970-f0de1c54-ba7f-46a0-ab32-c4ad00e6b26e.svg" alt="QuakeMatch" />

## Table of contents:
* [Introduction](#Introduction)
* [Quake Match Pipeline](#QuakeMatchPipeline)
* [Data Ingestion](#dataIngestion)
    * [Python Script](#pythonScript)
    * [Logstash](#logstash)
* [Streaming with Kafka](#kafka)
* [Processing with Apache Spark](#spark)
* [Data indexing with Elasticsearch](#elasticsearch)
* [Data visualization with Kibana](#kibana)
* [Requirements](#requirements)
* [Usage](#usage)

# Introduction <a class="anchor" id="Introduction"></a>

<p>
QuakeMatch is a tool for matching detection of antipodal earthquakes that use sismics data to look for a match between two or more events. It uses technologies such as Logstash, Kafka with Kafka Zookeeper, Apache Spark, Elasticsearch and Kibana for data ingestion, filtering, elaboration and results visualization.
</p>

## Quake Match Pipeline<a class="anchor" id="QuakeMatchPipeline"></a>
<p>
    <img src="https://user-images.githubusercontent.com/50534107/256268865-21fcd7c9-1762-45da-8a8e-aed7e15b7468.png" alt="Pipeline" />
</p>


# Data Ingestion <a class="anchor" id="dataIngestion"></a>

## Python Script <a class="anchor" id="pythonScript"></a>
The following code allows you to generate a file containing all the urls needed for the API requests made by Logstash:

In [1]:
import datetime

base_url = "https://www.seismicportal.eu/fdsnws/event/1/query?limit=7000&start={}&end={}"

start_date = datetime.datetime(1998, 7, 19)
end_date = datetime.datetime(2023, 7, 19)

current_date = start_date
links = []

i=0
while current_date <= end_date:
    start_time = current_date.strftime("%Y-%m-%dT%H:%M:%S.0")
    end_time = (current_date + datetime.timedelta(days=1) - datetime.timedelta(seconds=1)).strftime("%Y-%m-%dT%H:%M:%S.0")
    link = base_url.format(start_time, end_time)
    links.append(link)
    current_date += datetime.timedelta(days=1)
    i=i+1

with open("./logstash/seismic_portal_links.txt", "w") as file:
    for link in links:
        file.write(link + "\n")


        

You can choose the date range by modifying the start_date and end_date values

## Logstash <a class="anchor" id="logstash"></a>

Logstash will read the file generated by the python script, execute the API requests to Seismic Portal, filter the data and send the generated messages to Kafka.

# Streaming with Kafka <a class="anchor" id="kafka"></a>

Messages sent from Logstash to Kafka will be in the "earthquakes" topic. Each message will present five lists including all timestamps, regions, latitudes, longitudes and magnitudes of all events

# Processing with Apache Spark <a class="anchor" id="spark"></a>

Spark will take care of the following tasks:
<ul>
    <li>Taking messages from the Kafka topic "earthquakes"</li>
    <li>Filtering by magnitude ≥ 5.5</li>
    <li>Antipode calculation</li>
    <li>Retrieving a match with one or more events whose latitude and longitude is in the range of ± 30° with respect to the calculated antipode and which occurred within the following three days</li>
    <li>Creating a CSV file with the obtained data</li>
    <li>Creating an index on Elasticsearch</li>
    <li>Data indexing</li>
</ul>

# Data indexing with Elasticsearch <a class="anchor" id="elasticsearch"></a>

The data will be indexed in Elasticsearch under the index "earthquakes" and mapping "earthquake_mapping". Each item will have the following fields:
<ul>
    <li>unique id</li>
    <li>timestamp</li>
    <li>region</li>
    <li>latitude</li>
    <li>longitude</li>
    <li>latitude_antipode</li>
    <li>longitude_antipode</li>
    <li>matches</li>
</ul>

# Data visualization with Kibana <a class="anchor" id="kibana"></a>

<img src="https://user-images.githubusercontent.com/50534107/256273468-eec68964-ff00-4630-9f45-3998e94b6037.png" alt="kibana" />

The Kibana dashboard will show five lenses: the first indicates the number of matches that QuakeMatch was able to detect, followed by a descriptive table of the matches and finally three other lenses that show the data of the earthquakes occurred in Italy, the distribution of the number of earthquakes by magnitude and the distribution of earthquakes in the european area.

# Requirements <a class="anchor" id="requirements"></a>

<ul>
    <li>Docker</li>
    <li>Python</li>
    <li>wget</li>
    <li>A solution with, at least, 16GB of RAM</li>
</ul>

# Usage <a class="anchor" id="usage"></a>

<li>Install Docker in your system, than run the following command for create a docker network:</li>

In [2]:
%%bash
. ~/.bashrc
docker network create kafka-network

Error response from daemon: network with name kafka-network already exists


CalledProcessError: Command 'b'. ~/.bashrc\ndocker network create kafka-network\n'' returned non-zero exit status 1.

<li>Create a container with Kafka Zookeeper:</li>

In [3]:
%%bash
. ~/.bashrc
docker run -d \
  --name zookeeper \
  --network kafka-network \
  -p 2181:2181 \
  -e ZOOKEEPER_CLIENT_PORT=2181 \
  confluentinc/cp-zookeeper:latest

20954fd1d3ac17271ee7af18712431b76fdfd715ead1ae43801c24435ac4164a


<li>Wait Zookeper to be fully loaded, then create a container with Kafka:</li>

In [4]:
%%bash
. ~/.bashrc
docker run -d \
  --name kafka \
  --network kafka-network \
  -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  confluentinc/cp-kafka:latest

84adbd04597e07a877e2d919e24b8ac86f564ad0d1725ac273bd0d811b94dc7a


<li>And a container with Kafka UI:</li>

In [5]:
%%bash
. ~/.bashrc
docker run -d \
  --name kafka-ui \
  --network kafka-network \
  -p 8080:8080 \
  -e KAFKA_CLUSTERS_0_NAME=local \
  -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 \
  -e KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181 \
  -e KAFKA_CLUSTERS_0_ENABLESR=false \
  -e KAFKA_CLUSTERS_0_SASLMECHANISM= \
  -e KAFKA_CLUSTERS_0_SASLPLAIN_USERNAME= \
  -e KAFKA_CLUSTERS_0_SASLPLAIN_PASSWORD= \
  -e KAFKA_CLUSTERS_0_SASLPLAIN_PASSWORD_FILE= \
  -e KAFKA_CLUSTERS_0_TRUSTEDCERTS= \
  -e KAFKA_CLUSTERS_0_CLIENTCERT= \
  -e KAFKA_CLUSTERS_0_CLIENTKEY= \
  -e KAFKA_CLUSTERS_0_CLIENTKEYPASSWORD= \
  -e KAFKA_CLUSTERS_0_CONSUMERCONFIGS= \
  -e KAFKA_CLUSTERS_0_ADMINCONFIGS= \
  provectuslabs/kafka-ui:latest

e7203654af8b480a34e790900fa902152c495d3569c693fcb1a35a19da140624


<li>Create Elasticsearch docker image:</li>

In [6]:
%%bash
. ~/.bashrc
docker build -t elastic-image ./elasticsearch

#0 building with "default" instance using docker driver

#1 [internal] load .dockerignore
#1 transferring context: 2B done
#1 DONE 0.0s

#2 [internal] load build definition from Dockerfile
#2 transferring dockerfile: 207B done
#2 DONE 0.0s

#3 [internal] load metadata for docker.elastic.co/elasticsearch/elasticsearch:7.17.0
#3 DONE 1.4s

#4 [1/2] FROM docker.elastic.co/elasticsearch/elasticsearch:7.17.0@sha256:577b382dda5d05385aea8c7b60dad97e02ff41ca0da54f723151c2aed9ac8f54
#4 DONE 0.0s

#5 [2/2] RUN sysctl -w vm.max_map_count=262144
#5 CACHED

#6 exporting to image
#6 exporting layers done
#6 writing image sha256:1191ba2020cb7d705cf82c256ef21733d9c8ec31ae0fc109e4c9619ad716da12 done
#6 naming to docker.io/library/elastic-image done
#6 DONE 0.0s

What's Next?
  View summary of image vulnerabilities and recommendations → docker scout quickview


<li>Then run Elasticsearch:</li>

In [7]:
%%bash
. ~/.bashrc
docker run -d --name elasticsearch --network kafka-network -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elastic-image

b83ec72383241a1896816a9a956787bb17fa553f379370a2a97e944c2e8ae5c1


<li>Create a Kibana container:</li>

In [8]:
%%bash
. ~/.bashrc
docker run -d --name kibana -p 5601:5601 --network kafka-network -e "ELASTICSEARCH_HOSTS=http://elasticsearch:9200" docker.elastic.co/kibana/kibana:7.15.1

46ae3311054b64b58fbb1ffbc745f601177570ccbdcf1ff8409c427a21d150c2


<li>Run python script:</li>

In [9]:
%%bash
. ~/.bashrc
python3 ./logstash/urls_dates.py

<li>Create Logstash docker image:</li>

In [15]:
%%bash
. ~/.bashrc
docker build -t logstash-image ./logstash

#0 building with "default" instance using docker driver

#1 [internal] load build definition from Dockerfile
#1 transferring dockerfile: 503B done
#1 DONE 0.0s

#2 [internal] load .dockerignore
#2 transferring context: 2B done
#2 DONE 0.0s

#3 [internal] load metadata for docker.elastic.co/logstash/logstash:8.8.1@sha256:9b2e080605e208ef1165fd6cfd68a8b05c2031c8818b8520f82f73238dbb471c
#3 DONE 0.0s

#4 [1/6] FROM docker.elastic.co/logstash/logstash:8.8.1@sha256:9b2e080605e208ef1165fd6cfd68a8b05c2031c8818b8520f82f73238dbb471c
#4 DONE 0.0s

#5 [internal] load build context
#5 transferring context: 80B done
#5 DONE 0.0s

#6 [2/6] COPY logstash.conf /usr/share/logstash/pipeline/logstash.conf
#6 CACHED

#7 [3/6] COPY seismic_portal_links.txt /usr/share/logstash/seismic_portal_links.txt
#7 CACHED

#8 [4/6] RUN logstash-plugin install logstash-filter-xml
#8 CACHED

#9 [5/6] RUN logstash-plugin install logstash-filter-mutate
#9 CACHED

#10 [6/6] RUN logstash-plugin install logstash-input-http
#1

<li>Then run Logstash:</li>

In [16]:
%%bash
. ~/.bashrc
docker run -d --name logstash-container --network kafka-network  logstash-image

8c039f5496a6b1b143e0a443b0de68193587ad26e85042733000dcaa072c768c


<li>Run following code for download elasticsearch-spark</li>

In [12]:
%%bash
. ~/.bashrc
wget https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.12/7.15.1/elasticsearch-spark-20_2.12-7.15.1.jar -P ./spark

--2023-07-27 19:24:45--  https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.12/7.15.1/elasticsearch-spark-20_2.12-7.15.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2101583 (2.0M) [application/java-archive]
Saving to: ‘./spark/elasticsearch-spark-20_2.12-7.15.1.jar’

     0K .......... .......... .......... .......... ..........  2% 3.50M 1s
    50K .......... .......... .......... .......... ..........  4% 4.24M 0s
   100K .......... .......... .......... .......... ..........  7% 17.7M 0s
   150K .......... .......... .......... .......... ..........  9% 20.0M 0s
   200K .......... .......... .......... .......... .......... 12% 6.69M 0s
   250K .......... .......... .......... .......... .......... 14% 42.9M 0s
   300K .......... .......... .......... ..........

<li>And spark-sql-kafka:</li>

In [13]:
%%bash
. ~/.bashrc
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar -P ./spark

--2023-07-27 19:24:54--  https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 427253 (417K) [application/java-archive]
Saving to: ‘./spark/spark-sql-kafka-0-10_2.12-3.4.1.jar’

     0K .......... .......... .......... .......... .......... 11% 3.47M 0s
    50K .......... .......... .......... .......... .......... 23% 4.47M 0s
   100K .......... .......... .......... .......... .......... 35% 18.8M 0s
   150K .......... .......... .......... .......... .......... 47% 18.9M 0s
   200K .......... .......... .......... .......... .......... 59% 6.95M 0s
   250K .......... .......... .......... .......... .......... 71% 37.7M 0s
   300K .......... .......... .......... .......... ..........

<li>Create Apache Spark docker image:</li>

In [14]:
%%bash
. ~/.bashrc
docker build -t spark-earthquakes ./spark

#0 building with "default" instance using docker driver

#1 [internal] load build definition from Dockerfile
#1 transferring dockerfile: 281B done
#1 DONE 0.0s

#2 [internal] load .dockerignore
#2 transferring context: 2B done
#2 DONE 0.0s

#3 [internal] load metadata for docker.io/bitnami/spark:latest
#3 DONE 1.7s

#4 [internal] load build context
#4 transferring context: 139B done
#4 DONE 0.0s

#5 [1/6] FROM docker.io/bitnami/spark:latest@sha256:9467c6ec2cfd0cde0cb23ea81f44f85430cb0a8154d8a06982ec8895b1734b00
#5 resolve docker.io/bitnami/spark:latest@sha256:9467c6ec2cfd0cde0cb23ea81f44f85430cb0a8154d8a06982ec8895b1734b00 0.0s done
#5 sha256:9467c6ec2cfd0cde0cb23ea81f44f85430cb0a8154d8a06982ec8895b1734b00 529B / 529B done
#5 sha256:219f5d3a1e16c649755fafa7b036617e9512bfb743d3a928cf6b6d0dc67698fc 430B / 430B done
#5 sha256:31fc611bafaf2f5ca78e64a1558568050417c31d036aa2f7166f7eb02c2a014e 7.92kB / 7.92kB done
#5 sha256:8ef45242f2ce875baf17b450ce8518e0993b113a0b3ddda37ad2170d85caafbe 0B /

<li>Wait until all messages in Kafka's "earthquakes" topic are ready, then run Apache Spark container:</li>

In [None]:
%%bash
. ~/.bashrc
docker run -d --network kafka-network --name spark-earthquakes_analyzer spark-earthquakes

<li>In the browser, put the url "http://localhost:5601", go to "Kibana / Saved Objects", click on "import" and select "export.ndjson" in kibana folder</li>

<li>Open left menu in Kibana, select Dashboard, select the imported dashboard for see all lens</li>