# Implementation of Self-Organizing Maps with Python for processing COVID 19 databases

Programmer Information

In [None]:
# Author = Darío Sebastián Cabezas Erazo
# Email = dario.cabezas@yachaytech.edu.ec
# LinkedIn =https://www.linkedin.com/in/dario-cabezas/

# LIBRARIES

In [None]:
import gdown
import math 
import numpy as np
import csv
import webbrowser
import timeit
import os

# UTILITY

In [None]:
def dimension (matriz):
    filas = len(matriz) 
    if filas > 0: 
        columnas = len(matriz[0]) 
    else: 
        columnas = 0 
    return filas, columnas

# DATABASE

In [None]:
url = 'https://drive.google.com/file/d/1VXq08OkQQBrS_iYe5UmJ38_A5dHjMQ8-'
output = 'Epidemic-Data-for-Novel-Coronavirus-COVID-19.csv'
gdown.download(url, output, quiet=False)

  .format(url='https://drive.google.com/uc?id={}'.format(file_id))
Downloading...
From: https://drive.google.com/file/d/1VXq08OkQQBrS_iYe5UmJ38_A5dHjMQ8-
To: /content/Epidemic-Data-for-Novel-Coronavirus-COVID-19.csv
66.1kB [00:00, 14.6MB/s]


'Epidemic-Data-for-Novel-Coronavirus-COVID-19.csv'

In [None]:
with open('Epidemic-Data-for-Novel-Coronavirus-COVID-19.csv', newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    data = list(reader)
    countries=[]
    data_to_work=[]
    for v in data:
        for i in v.keys():
            k=[]
            if i=="Country":
                k.append(v[i])
                k.append(v['GeoPosition'])
            if k!=[]:
                countries.append(k)                
            else:
                continue
    # Saving Countries
    for v in data:
        for i in v.keys():
            k=[]
            if i=="ConfirmedCases":
                k.append(v["ConfirmedCases"])
                k.append(v["RecoveredCases"])
                k.append(v["Deaths"])
            if k!=[]:
                data_to_work.append(k)                
            else:
                continue
    # Saving Raw Data for working with
# Partitioning Raw Data 
def clean_countries():
    k=0
    X=[]
    for i in countries:
        x=i[0].split(",")
        x.append(k)
        if isinstance(x[1], str):
            temp=''
            for j in x[1]:
                if j=="]" or j=='\"' or j==' ':
                    continue
                temp= temp + j
            x[1]=temp
        k = k+1
        x.pop(0)
        X.append(x)
    return X
    # Cleaning countries for being used as strings
def search_country(string):
    countries_cleaned=clean_countries()
    countries_list=[]
    for i in countries_cleaned:
        if i[0]==string:
            countries_list.append(i[1])
    return countries_list
    # Function to look for specific countries and return a list of databases found
def raw_data_manager(n):
    raw_data=[]
    d,e=dimension(data_to_work)
    for i in range(e):
        raw_data.append((data_to_work[n][i].split("{{{")[1].split("}}")[0]).split(","))
    r,c=dimension(raw_data)
    data=np.zeros((c, r))
    for i in range(r):
        for j in range(c):
            if  raw_data[i][j]==' Missing["NotAvailable"]' or raw_data[i][j]=='Missing["NotAvailable"]':
                raw_data[i][j]=0
                data[j][i]=raw_data[i][j]
            else:
                raw_data[i][j]=int(raw_data[i][j])
                data[j][i]=raw_data[i][j]
        np.append(data, raw_data[i])
    return data
    # Function to replace not consistent data in raw data

# MOST IMPORTANT FUNCTIONS

EUCLIDEAN DISTANCE

In [None]:
def eu_distance(A,B):
    return np.sqrt(np.sum((A-B)**2))

WEIGHT UPDATE FORMULA

In [None]:
def learning(init_learning_rate,i,n_iter):
    return init_learning_rate* np.exp(-i/n_iter)
def topological_neighborhood(distance,radius):
    return np.exp(-distance/(2*(radius**2)))
def neighborhood_size(init_radius,i,time_constant):
    return init_radius*np.exp(-i/time_constant)

BEST MATCHING UNIT

In [None]:
def find_bmu(t,net):
    min_dist=1000000
    for x in range(net.shape[0]):
        for y in range(net.shape[1]):
            unit=net[x,y].reshape(1,-1)
            t=t.reshape(1,-1)
            euc_dist=eu_distance(unit,t)
            if euc_dist < min_dist:
                min_dist=euc_dist
                bmu=net[x,y]
                bmu_idx=np.array([x,y])
    return bmu, bmu_idx

# SOM MAIN FUNCTION

In [None]:
def SOM_COVID():
    '''SETUP PARAMETERS'''
    
    #User input data
    
    country=input("What country do you want to analyze?\n")
    print(search_country(country))
    n=int(input("Which country database do you want to analyze?\n"))
    
    # Initialization Parameters & Benchmark
    
    start = timeit.default_timer()                                                      # Benchmark Constant 
    #
    data=raw_data_manager(n)
    iterations=1000         
    learning_rate=0.1       
    row=data.shape[0]       
    columns=data.shape[1]   
    # K is optimal number of neurons
    K=int(5*math.sqrt(row)) 
    N=2                     
    network_dim=np.array([K,N])
    # Creating Neural Network 'net'
    net=np.random.random((network_dim[0],network_dim[1],columns))
    
    init_radius=max(network_dim[0],network_dim[1])/2 
    time_constant=iterations/np.log(init_radius)     

    # TRAINING NEURAL NETWORK by Iterations

    print("\nProcessing COVID 19 databases...")
    for i in range(iterations):
        t=data[np.random.randint(0,row),:]
        r=neighborhood_size(init_radius,i,time_constant)
        l=learning(learning_rate,i,iterations)
        bmu,bmu_idx=find_bmu(t,net)
        # Calculate Best Matching Unit
        for x in range(net.shape[0]):
            for y in range(net.shape[1]):
                w=net[x,y].reshape(1,columns)
                w_dist=eu_distance(np.array([[x,y]]),bmu_idx.reshape(1,2))
                # Calculing weight distance for being used in neighborhood
                if w_dist<=r:
                    influence=topological_neighborhood(w_dist,r)
                    new_w=w+(l*influence*(t.reshape(1,-1)-w))
                    net[x,y]=new_w.reshape(1,3)
    print("\nCOVID 19 databases processed. Saving...")
    #OPERATIVE SISTEM
    os.mkdir(str(country) + " Processed Data",0o777)
    os.chdir(str(country) + " Processed Data")
    for i in range(len(net)):
        np.savetxt(str(country) + str(i) + str(".txt"),net[i])
    os.chdir("../")
    #It Saves trained neurons
    
    #USEFULL DATA
    end = timeit.default_timer()
    print("\nSOM",end - start,"seconds")
SOM_COVID()