In [29]:
import os
import obspy as obs
from obspy import UTCDateTime
from obspy.core.event import Event, Origin, Magnitude
from obspy.core.event import Pick, WaveformStreamID, Arrival, Amplitude, TimeWindow
from obspy.core.event import EventDescription, CreationInfo

In [42]:
# additional functions

# read earthquake list and make dictionary from it
def make_dict_for_eq(data_list):
    data_dict = {'Origin Time': None, 'Origin Error': None, 'Latitude': None, 'Delta Error': None, 
                 'Longitude': None, 'Depth': None, 'Depth Error': None, 'Magnitude': None}
    for i in data_dict:
        for j in data_list:
            if i in j:
                data_dict[i] = j[j.find(']')+1:].replace(' ', '')
                break
    
    data_dict['Latitude'] = data_dict['Latitude'].replace('N', '')
    data_dict['Longitude'] = data_dict['Longitude'].replace('E', '')
    data_dict['Magnitude'] = data_dict['Magnitude'][data_dict['Magnitude'].find('=')+1:data_dict['Magnitude'].find('(')]
    return data_dict


# make 2 lists: in first - data about origin and magnitude
# in second - data about picks, arrivals and amplitudes
def read_ssd(ssd_file):
    with open(ssd_file) as file:
        earthquake_params = []
        arrivals_params = []

        current_component = []
        for i in file:
            if '#EARTHQUAKE' in i:
                earthquake_params.append(i.replace('\n', '').replace('\t', ' '))
            else:
                if '#CHANNEL' in i and len(current_component) == 0:
                    current_component.append(i.replace('\n', '').replace('\t', ' '))
                elif '#CHANNEL' in i and len(current_component) != 0:
                    arrivals_params.append(current_component)
                    current_component = []
                    current_component.append(i.replace('\n', '').replace('\t', ' '))
                else:
                    current_component.append(i)
        arrivals_params.append(current_component)
        return make_dict_for_eq(earthquake_params), devide_p_s(arrivals_params)

    
# if it see list where exist two phases
# devide it in two lists
def devide_p_s(data_list):
    new_list = []
    pases_number = 0
    p_phrase = []
    for i in data_list:
        for line in i:
            if '#ARRIVAL [Phase]' in line:
                p_phrase.append(line)
                pases_number += 1
        if pases_number <= 1:
            new_list.append(i)
            p_phrase = []
            pases_number = 0
        else:
            new_list.append(i[:i.index(p_phrase[-1])]) 
            new_list.append([i[0]] + i[i.index(p_phrase[-1]):])
            p_phrase = []
            pases_number = 0
    return new_list
        

# if in string contained component return it
# else raise error
def channel_check(string):
    channels = ['HHN', 'HHE', 'HHZ']
    for channel in channels:
        if channel in string:
            return channel
    raise ValueError('No channels in string!')


# check all dictionary and if key is none return false
# else return true
def check_dict(dictionary):
    for i in dictionary:
        if dictionary[i] is None:
            return False
    return True

In [53]:
def create_qakeml_from_ssd(network, location, path_in, path_out):
    
    # some constants which don't have any matter in forming quakeml
    resource_id = 'smi:ru.ipgg.seislab'
    event_type = 'earthquake'
    event_type_certainty = 'known'
    origin_depth_type = 'from location'
    magnitude_type = 'Mb'
    pick_polarity = 'undecidable'
    pick_evaluation_mode = 'manual'
    pick_evaluation_status = 'final'
    amplitude_type = 'END'
    
    # reading ssd data
    eq_data, picks_data = read_ssd(path_in)
    # forming str time constant which using like resource_id for event
    time = UTCDateTime.strptime(eq_data['Origin Time'], '%Y.%m.%d%H:%M:%S.%f')
    
    # class object which provide information about autors and organisation
    creation_info = CreationInfo(agency_id='ipgg', author='ponasenkosn', creation_time=UTCDateTime.now())
    
    # create main event object
    event = Event()
    event.resource_id = resource_id + '/' + network + '/event' + '/' + time.strftime('%Y%m%d%H%M%S')
    event.event_type = event_type
    event.event_type_certainty = event_type_certainty
    event.creation_info = creation_info
    
    # create origin object
    origin = Origin()
    origin.resource_id = str(event.resource_id) + '/origin'
    origin.time = time
    origin.time_errors = float(eq_data['Origin Error'])
    origin.longitude = float(eq_data['Longitude'])
    origin.longitude_errors = float(eq_data['Delta Error'])
    origin.latitude = float(eq_data['Latitude'])
    origin.latitude_errors = float(eq_data['Delta Error'])
    origin.depth = float(eq_data['Depth'])
    origin.depth_errors = float(eq_data['Depth Error'])
    origin.depth_type = origin_depth_type
    origin.creation_info = creation_info
    
    # create magnitude object
    magnitude = Magnitude()
    magnitude.resource_id = str(event.resource_id) + '/magnitude'
    # calculate Mb magnitude from energy class
    magnitude.mag = 5.49 + 0.427 * (float(eq_data['Magnitude']) - 14)
    magnitude.magnitude_type = magnitude_type
    magnitude.origin_id = origin.resource_id
    
    # create picks, arrivals and amplitudes lists
    picks = []
    arrivals = []
    amplitudes = []
    # print(picks_data)
    for pick_list in picks_data:
        station = None
        channel = None
        pick_dict = {'Phase': None, 'Time': None, 'Dist-Az': None}
        amp_dict = {'Time': None, 'Amplitude': None, 'Period': None}
        for line in pick_list:
            # in this single string i get names of station and channel
            if '#CHANNEL' in line:
                channel = channel_check(line)
                station = line[line.find(' ') + 1:line.find(' ') + 1 + line[line.find(' ') + 1:].find(' ')]
            # here i get pick information and pack this into dictionary
            elif '#ARRIVAL' in line:
                for i in pick_dict:
                    if i in line:
                        pick_dict[i] = line[line.find('\t') + 1: line.find('\n')]
            elif '#AMPLITUDE' in line:
                for i in amp_dict:
                    if i in line:
                        amp_dict[i] = line[line.find('\t') + 1: line.find('\n')]
        
        # if obtained dictionary update data, i create pick and arrival class objects
        if check_dict(pick_dict):
            for chan in ['HHZ', 'HHE', 'HHN']:
                pick = Pick()
                pick.resource_id = str(event.resource_id) + '/pick' + '/' + pick_dict['Phase'] + '/' + station + '/' + chan
                pick.time = UTCDateTime.strptime(pick_dict['Time'], '%Y.%m.%d %H:%M:%S.%f')
                pick.phase_hint = pick_dict['Phase']
                pick.polarity = pick_polarity
                pick.evaluation_mode = pick_evaluation_mode
                pick.evaluation_status = pick_evaluation_status
                pick.waveform_id = WaveformStreamID(network, station, location, chan)
                pick.creation_info = creation_info
                picks.append(pick)

                arrival = Arrival()
                arrival.resource_id = str(event.resource_id) + '/arrival' + '/' + pick_dict['Phase'] + '/' + station + '/' + chan
                arrival.pick_id = str(pick.resource_id)
                arrival.phase = pick_dict['Phase']
                arrival.azimuth = pick_dict['Dist-Az'].replace('\t', '')[pick_dict['Dist-Az'].replace('\t', '').find(';') + 1:]
                arrival.distance = pick_dict['Dist-Az'].replace('\t', '')[:pick_dict['Dist-Az'].replace('\t', '').find(';')]
                arrival.creation_info = creation_info
                arrivals.append(arrival)
                
        if check_dict(amp_dict):
            amplitude = Amplitude()
            amplitude.resource_id = str(event.resource_id) + '/amplitude' +  '/' + station + '/' + channel
            amplitude.generic_amplitude = float(amp_dict['Amplitude'][:amp_dict['Amplitude'].find(' ')])
            amplitude.type = amplitude_type
            if amp_dict['Amplitude'][amp_dict['Amplitude'].find(' ') + 1:] == '[microns:second]':
                amplitude.unit = 'm/s'
            amplitude.period = float(amp_dict['Period'])
            amplitude.time_window = TimeWindow(reference=UTCDateTime.strptime(amp_dict['Time'], '%Y.%m.%d %H:%M:%S.%f'))
            amplitude.waveform_id = WaveformStreamID(network, station, location, channel)
            amplitude.magnitude_hint = magnitude_type
            amplitudes.append(amplitude)
    
    # in last part i put all lists in the relevant places
    origin.arrivals = arrivals
    event.origins = [origin]
    event.magnitudes = [magnitude]
    event.picks = picks
    event.amplitudes = amplitudes

    event.write(os.path.join(path_out, 'quake_' + time.strftime('%Y%m%d%H%M%S') + '.xml'), format='SC3ML') # format='QUAKEML')

In [56]:
# apply function for creating data 2021-2023

network = 'TE'
location= '00'
path = r'D:\Temp\Work\Seismology\__Tengiz\SSD'
path_out = r'D:\Temp\Work\Seismology\__Tengiz\QuakeMl'

for filename in os.listdir(path):
    if '.ssd' in filename:
        create_qakeml_from_ssd(network, location, os.path.join(path, filename), path_out)