In [1]:
import requests
import json
import csv
import json
from pprint import pprint



class ElasticSearchToCassandra:

    def __init__(self) -> None:
        self.files_to_download = [
            "time_series_covid19_confirmed_global",
            "time_series_covid19_recovered_global",
            "time_series_covid19_deaths_global",
         ]


    def download_csv_files(self):
        url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/"
        for endpoint in self.files_to_download:
            response = requests.get(f"{url}{endpoint}.csv")

            response.raise_for_status()
            with open(f"{endpoint}.csv", "w") as f:
                f.write(response.text)


    def convert_csv_to_json(self):
        for filename in self.files_to_download:
            data_list = []
            with open(f"{filename}.csv", encoding = 'utf-8') as csv_file_handler:
                csv_reader = csv.DictReader(csv_file_handler)
        
                for rows in csv_reader:
                    data_list.append(rows)
        
            with open(f"{filename}.json", 'w', encoding = 'utf-8') as json_file_handler:
                json_file_handler.write(json.dumps(data_list, indent = 4))


    def aggregate_json(self):
        year_2020 = 0
        year_2021 = 0
        year_2022 = 0
        year_2023 = 0

        final_dict = {}

        for filename in self.files_to_download:
            with open(f"{filename}.json", "r") as f:
                json_file = f.read()

                for line in json.loads(json_file):
                    line_dict = {}
                    for k, v in line.items():
                        if k.endswith("/20"):
                            year_2020+=int(v)
                        elif k.endswith("/21"):
                            year_2021+=int(v)
                        elif k.endswith("/22"):
                            year_2022+=int(v)
                        elif k.endswith("/23"):
                            year_2023+=int(v)
                        else:
                            line_dict[k] = v
                        
                    line_dict["2020"] = year_2020
                    line_dict["2021"] = year_2021
                    line_dict["2022"] = year_2022
                    line_dict["2023"] = year_2023
                    
                    final_dict.setdefault(filename, []).append(line_dict)

        return final_dict
                
if __name__ == "__main__":

    elasticsearch_to_cassandra_loader = ElasticSearchToCassandra()

    elasticsearch_to_cassandra_loader.download_csv_files()
    elasticsearch_to_cassandra_loader.convert_csv_to_json()
    data =  elasticsearch_to_cassandra_loader.aggregate_json()
    print(data)


{'time_series_covid19_confirmed_global': [{'Province/State': '', 'Country/Region': 'Afghanistan', 'Lat': '33.93911', 'Long': '67.709953', '2020': 8501751, '2021': 39518380, '2022': 67783564, '2023': 14184774}, {'Province/State': '', 'Country/Region': 'Albania', 'Lat': '41.1533', 'Long': '20.1683', '2020': 12229295, '2021': 90624521, '2022': 175788238, '2023': 36909069}, {'Province/State': '', 'Country/Region': 'Algeria', 'Lat': '28.0339', 'Long': '1.6596', '2020': 22812670, '2021': 147797509, '2022': 272320311, '2023': 55362283}, {'Province/State': '', 'Country/Region': 'Andorra', 'Lat': '42.5063', 'Long': '1.5218', '2020': 23495492, '2021': 152881839, '2022': 287848049, '2023': 58614918}, {'Province/State': '', 'Country/Region': 'Angola', 'Lat': '-11.2027', 'Long': '17.8739', '2020': 24838902, '2021': 167678984, '2022': 324581126, '2023': 65766489}, {'Province/State': '', 'Country/Region': 'Antarctica', 'Lat': '-71.9499', 'Long': '23.347', '2020': 24838902, '2021': 167679182, '2022': 

In [2]:
# get all data form elastic as curl 
import requests #curl
from requests.auth import HTTPBasicAuth 
from pprint import pprint
from elasticsearch import Elasticsearch,helpers
import json
import uuid

'''
generator to push bulk data from a JSON
file into an Elasticsearch index
'''

def bulk_json_data(json_list, _index, doc_type):
    for doc in json_list:
        # use a `yield` generator so that the data
        # isn't loaded into memory
        pprint(doc)
        if '{"index"' not in doc:
            yield {
                "_index": _index,
                "_type": doc_type,
                "_id": uuid.uuid4(),
                "_source": doc
            }

def connect_elastic():
    elastic = Elasticsearch('http://localhost:9200', http_auth=('elastic', 'secret'), timeout=30)
    return elastic

elastic = connect_elastic()

#elastic.delete_by_query(index='_all', body={"query": {"match_all": {}}})

def json_to_elastic(data_list):
    """
    covid_db_recovered_global
    covid_db_deaths_global
    covid_db_confirmed_global
    """
    for key in data_list.keys():
        doc_type = '_'.join(key.split('_')[-2:])
        try:
            response = helpers.bulk(elastic, bulk_json_data(data_list[key], f"covid_db_{doc_type}", doc_type), chunk_size=1000)
            print ("\nRESPONSE:", response)
        except Exception as e:
            print("\nERROR:", e)
            
json_to_elastic(data)

{'2020': 8501751,
 '2021': 39518380,
 '2022': 67783564,
 '2023': 14184774,
 'Country/Region': 'Afghanistan',
 'Lat': '33.93911',
 'Long': '67.709953',
 'Province/State': ''}
{'2020': 12229295,
 '2021': 90624521,
 '2022': 175788238,
 '2023': 36909069,
 'Country/Region': 'Albania',
 'Lat': '41.1533',
 'Long': '20.1683',
 'Province/State': ''}
{'2020': 22812670,
 '2021': 147797509,
 '2022': 272320311,
 '2023': 55362283,
 'Country/Region': 'Algeria',
 'Lat': '28.0339',
 'Long': '1.6596',
 'Province/State': ''}
{'2020': 23495492,
 '2021': 152881839,
 '2022': 287848049,
 '2023': 58614918,
 'Country/Region': 'Andorra',
 'Lat': '42.5063',
 'Long': '1.5218',
 'Province/State': ''}
{'2020': 24838902,
 '2021': 167678984,
 '2022': 324581126,
 '2023': 65766489,
 'Country/Region': 'Angola',
 'Lat': '-11.2027',
 'Long': '17.8739',
 'Province/State': ''}
{'2020': 24838902,
 '2021': 167679182,
 '2022': 324585141,
 '2023': 65767237,
 'Country/Region': 'Antarctica',
 'Lat': '-71.9499',
 'Long': '23.347',

In [86]:
url = 'http://elastic:secret@localhost:9200/covid_db_deaths_global/_search'

In [16]:
#elastic.delete_by_query(index='_all', body={"query": {"match_all": {}}})

In [3]:
 
elastic.indices.refresh(index="covid_db_recovered_global")

res = elastic.search(index="covid_db_recovered_global", body={"from" : 0, "size" : 1000,"query": {"match_all": {}}})
print(len(res['hits']['hits']))
print(res)

274
{'took': 9, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'skipped': 0, 'failed': 0}, 'hits': {'total': 274, 'max_score': 1.0, 'hits': [{'_index': 'covid_db_recovered_global', '_type': 'recovered_global', '_id': '9a23263a-996c-449c-8535-01c80f86fff9', '_score': 1.0, '_source': {'Province/State': '', 'Country/Region': 'Albania', 'Lat': '41.1533', 'Long': '20.1683', '2020': 7650563688, '2021': 67164662438, '2022': 196529309166, '2023': 45606703559}}, {'_index': 'covid_db_recovered_global', '_type': 'recovered_global', '_id': '3b2a3c08-2d36-4a1f-bb6f-baf1327ba1f9', '_score': 1.0, '_source': {'Province/State': '', 'Country/Region': 'Angola', 'Lat': '-11.2027', 'Long': '17.8739', '2020': 7658908868, '2021': 67190968940, '2022': 196529309166, '2023': 45606703559}}, {'_index': 'covid_db_recovered_global', '_type': 'recovered_global', '_id': 'caae5081-b678-4815-b1c2-e31dd29ced03', '_score': 1.0, '_source': {'Province/State': 'South Australia', 'Country/Region': 'Australia', 

# elastic to cassandra

In [19]:
#!pip3 install cassandra-driver



In [4]:
print(data)

{'time_series_covid19_confirmed_global': [{'Province/State': '', 'Country/Region': 'Afghanistan', 'Lat': '33.93911', 'Long': '67.709953', '2020': 8501751, '2021': 39518380, '2022': 67783564, '2023': 14184774}, {'Province/State': '', 'Country/Region': 'Albania', 'Lat': '41.1533', 'Long': '20.1683', '2020': 12229295, '2021': 90624521, '2022': 175788238, '2023': 36909069}, {'Province/State': '', 'Country/Region': 'Algeria', 'Lat': '28.0339', 'Long': '1.6596', '2020': 22812670, '2021': 147797509, '2022': 272320311, '2023': 55362283}, {'Province/State': '', 'Country/Region': 'Andorra', 'Lat': '42.5063', 'Long': '1.5218', '2020': 23495492, '2021': 152881839, '2022': 287848049, '2023': 58614918}, {'Province/State': '', 'Country/Region': 'Angola', 'Lat': '-11.2027', 'Long': '17.8739', '2020': 24838902, '2021': 167678984, '2022': 324581126, '2023': 65766489}, {'Province/State': '', 'Country/Region': 'Antarctica', 'Lat': '-71.9499', 'Long': '23.347', '2020': 24838902, '2021': 167679182, '2022': 

In [63]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

def get_elastic_hits(elastic, index):  
    elastic.indices.refresh(index)
    res = elastic.search(index, body={"query": {"match_all": {}}})
    hits = res['hits']['hits']
    return hits

covid_index_names = []
indices=elastic.indices.get_alias().keys()
for indice in indices:
    if indice.startswith("covid") and indice not in covid_index_names:
        covid_index_names.append(indice)
#print(covid_index_names)

# Set up authentication credentials (if needed)
auth_provider = PlainTextAuthProvider(username='elastic', password='secret')

# Set up a Cluster object with the contact points (IP addresses or hostnames) of the Cassandra nodes
cluster = Cluster(['localhost'])

# Connect to the cluster and create a session
session = cluster.connect()

#create keySpace covid
keyspace_stmt = "CREATE KEYSPACE IF NOT EXISTS covid19 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"
session.execute(keyspace_stmt)

#create table
def create_covid_table(table_name):
        query = f"CREATE TABLE IF NOT EXISTS covid19.{table_name} (id varchar PRIMARY KEY, type text, score float, province text, country text, lat text, long text, year_2020 float, year_2021 float, year_2022 float, year_2023 float)"
        session.execute(query)
        
#insert into table
def insert_data_in_cassandra(table,hits):
    # Insert the data into Cassandra Datastax
    for hit in hits:
        cql_query = f"INSERT INTO covid19.{table}(id, type, score, province, country, lat, long , year_2020, year_2021, year_2022, year_2023) VALUES('{hit['_id']}', '{hit['_type']}',{hit['_score']},'{hit['_source']['Province/State']}','{hit['_source']['Country/Region']}', '{hit['_source']['Lat']}', '{hit['_source']['Long']}', {hit['_source']['2020']},{hit['_source']['2021']},{hit['_source']['2022']}, {hit['_source']['2023']})"
       # print(cql_query)
        session.execute(cql_query)         
    return 0



# Execution des 3 indexs

indexes=[
    "covid_db_recovered_global",
    "covid_db_deaths_global",
   "covid_db_confirmed_global"
    ]
for indice in indexes:
    hits=get_elastic_hits(elastic, indice)
    create_covid_table(indice)
    insert_data_in_cassandra(indice,hits)
    


# result=(session.execute('SELECT * FROM covid19.covid_db_recovered_global;') )
# print(result.__dict__)
# print(result.__dict__['_current_rows'])
# #hits = get_elastic_hits(elastic, "covid_db_deaths_global")



# fermer les sessions cassandra 
session.shutdown()
cluster.shutdown()



# Visualisation Kibana

In [66]:
covid_index_names = []
indices=elastic.indices.get_alias().keys()
for indice in indices:
    if indice.startswith("covid") and indice not in covid_index_names:
        covid_index_names.append(indice)
print(covid_index_names)

['covid_db_confirmed_global', 'covid_db_deaths_global', 'covid_db_recovered_global']


In [82]:

import json, requests
from requests.auth import HTTPBasicAuth

id=0
for i in covid_index_names:
    print(i)
    id=id+1
    payload={
        "attributes":{
        "title":i+"*"
        }
    }
    ret = requests.post("http://localhost:5601/api/saved_objects/index-pattern/"+str(id), headers={"kbn-xsrf":"true", "Content-Type": "application/json"}, auth=HTTPBasicAuth('elastic', 'secret'), json=payload)
    print(ret.text)
print(covid_index_names)

covid_db_deaths_global
{"id":"1","type":"index-pattern","version":3,"attributes":{"title":"covid_db_deaths_global*"}}
covid_db_recovered_global
{"id":"2","type":"index-pattern","version":3,"attributes":{"title":"covid_db_recovered_global*"}}
covid_db_confirmed_global
{"id":"3","type":"index-pattern","version":3,"attributes":{"title":"covid_db_confirmed_global*"}}
['covid_db_deaths_global', 'covid_db_recovered_global', 'covid_db_confirmed_global']
