In [141]:
from kafka import KafkaConsumer
from json import loads
from clickhouse_driver import Client
import csv
from csv import DictReader
from datetime import datetime
import time

In [142]:
client = Client(host='localhost', port=9000)
client.execute('CREATE DATABASE IF NOT EXISTS satellite')
client.execute('USE satellite')
client.execute("DROP TABLE IF EXISTS satellite.space")
client.execute("DROP TABLE IF EXISTS satellite.list")

#Создание и заполнение таблицы со списком спутников
def iter_csv(filename):
    converters = {
     'INTLDES String,'
     'NORAD_CAT_ID: Int,' 
     'OBJECT_TYPE': String,
     'SATNAME': String,
     'COUNTRY': String,
     'LAUNCH': lambda x: datetime.strptime(x, '%Y-%m-%d'),
     'SITE': String,
     'DECAY': String,
     'PERIOD': Float32,
     'INCLINATION': Float32,
     'APOGEE': Int,
     'PERIGEE': Int,
     'COMMENT': String,
     'COMMENTCODE': Int,
     'RCSVALUE': Int,
     'RCS_SIZE': String,
     'FILE': Int,
     'LAUNCH_YEAR': Int,
     'LAUNCH_NUM': Int,
     'LAUNCH_PIECE': String,
     'CURRENT': String,
     'OBJECT_NAME': String,
     'OBJECT_ID': String,
     'OBJECT_NUMBER': Int
    }
    with open(filename, 'r') as f:
        reader = DictReader(f)
        for line in reader:
            yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()}

            '''#Создание и заполнение таблицы со списком спутников
def iter_csv(filename):
    converters = {
     'INTLDES': VARCHAR,
     'NORAD_CAT_ID': Int, 
     'OBJECT_TYPE': String,
     'SATNAME': String,
     'COUNTRY': String,
     'LAUNCH': lambda x: datetime.strptime(x, '%Y-%m-%d'),
     'SITE': String,
     'DECAY': String,
     'PERIOD': Float32,
     'INCLINATION': Float32,
     'APOGEE': Int,
     'PERIGEE': Int,
     'COMMENT': String,
     'COMMENTCODE': Int,
     'RCSVALUE': Int,
     'RCS_SIZE': String,
     'FILE': Int,
     'LAUNCH_YEAR': Int,
     'LAUNCH_NUM': Int,
     'LAUNCH_PIECE': String,
     'CURRENT': String,
     'OBJECT_NAME': String,
     'OBJECT_ID': String,
     'OBJECT_NUMBER': Int
    }
    with open(filename, 'r') as f:
        reader = DictReader(f)
        for line in reader:
            yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()}'''
            
client.execute(
    '''CREATE TABLE IF NOT EXISTS satellite.list (
     INTLDES String,
     NORAD_CAT_ID Int, 
     OBJECT_TYPE String,
     SATNAME String,
     COUNTRY String,
     LAUNCH date,
     SITE String,
     DECAY String,
     PERIOD Float32,
     INCLINATION Float32,
     APOGEE Int,
     PERIGEE Int,
     COMMENT String,
     COMMENTCODE String,
     RCSVALUE int,
     RCS_SIZE String,
     FILE Int,
     LAUNCH_YEAR Int,
     LAUNCH_NUM Int,
     LAUNCH_PIECE String,
     CURRENT String,
     OBJECT_NAME String,
     OBJECT_ID String,
     OBJECT_NUMBER Int
    ) ENGINE = MergeTree()
    PRIMARY KEY NORAD_CAT_ID 
    ORDER BY NORAD_CAT_ID;'''
)


[]

In [143]:
schema = {
    'INTLDES': str,
    'NORAD_CAT_ID': int, 
    'OBJECT_TYPE': str,
    'SATNAME': str,
    'COUNTRY': str,
    'LAUNCH': lambda x: datetime.strptime(x, '%Y-%m-%d'),
    'SITE': str,
    'DECAY': str,
    'PERIOD': float,
    'INCLINATION': float,
    'APOGEE': int,
    'PERIGEE': int,
    'COMMENT': str,
    'COMMENTCODE': str,
    'RCSVALUE': int,
    'RCS_SIZE': str,
    'FILE': int,
    'LAUNCH_YEAR': int,
    'LAUNCH_NUM': int,
    'LAUNCH_PIECE': str,
    'CURRENT': str,
    'OBJECT_NAME': str,
    'OBJECT_ID': str,
    'OBJECT_NUMBER': int
}
bypass = lambda x: x

with open('../data_in/satellite_list.csv') as f:
    #print (f)
    gen = ({k: schema.get(k, bypass)(v) for k, v in row.items()} for row in csv.DictReader(f))
    print(gen)
    client.execute('INSERT INTO satellite.list VALUES', gen)


<generator object <genexpr> at 0x7f9d13aaacd0>


In [144]:
d=client.execute( '''select * from satellite.list limit 5;''')
print (d)

[('1959-001A', 11, 'PAYLOAD', 'VANGUARD 2', 'US', datetime.date(1970, 1, 1), 'AFETR', '', 121.44999694824219, 32.880001068115234, 2939, 552, '', '', 0, 'MEDIUM', 7336, 1959, 1, 'A', 'Y', 'VANGUARD 2', '1959-001A', 11), ('1959-001B', 12, 'ROCKET BODY', 'VANGUARD R/B', 'US', datetime.date(1970, 1, 1), 'AFETR', '', 125.83999633789062, 32.900001525878906, 3324, 556, '', '', 0, 'MEDIUM', 7346, 1959, 1, 'B', 'Y', 'VANGUARD R/B', '1959-001B', 12), ('1959-007A', 20, 'PAYLOAD', 'VANGUARD 3', 'US', datetime.date(1970, 1, 1), 'AFETR', '', 124.5999984741211, 33.349998474121094, 3262, 509, '', '', 0, 'MEDIUM', 7345, 1959, 7, 'A', 'Y', 'VANGUARD 3', '1959-007A', 20), ('1959-009A', 22, 'PAYLOAD', 'EXPLORER 7', 'US', datetime.date(1970, 1, 1), 'AFETR', '', 96.30999755859375, 50.290000915527344, 679, 485, '', '', 0, 'MEDIUM', 7345, 1959, 9, 'A', 'Y', 'EXPLORER 7', '1959-009A', 22), ('1960-002B', 29, 'PAYLOAD', 'TIROS 1', 'US', datetime.date(1970, 1, 1), 'AFETR', '', 97.66999816894531, 48.38000106811523

In [152]:
client.execute('SHOW TABLES')

[('list',), ('space',)]

In [146]:
client.execute(
     '''CREATE TABLE IF NOT EXISTS satellite.space (
     SAT_ID Int,
     I Int,
     UTCDATETIME DateTime,
     LAT Float32,
     LON Float32
    ) ENGINE = MergeTree()
    PRIMARY KEY SAT_ID 
    ORDER BY SAT_ID;'''
)


[]

In [147]:
consumer = KafkaConsumer(
    'satellite_msg',
    auto_offset_reset='earliest',
    #reset_offset_on_start=False, #new 
    enable_auto_commit=True,
    group_id='my-group-1',
    value_deserializer=lambda m: loads(m.decode('utf-8')),
    bootstrap_servers='localhost:9092')

"\nfor m in consumer:\n    client.execute('INSERT INTO satellite.space VALUES  (%s)' % ', '.join(list(map(str, m.value.values()))))\n"

In [156]:
d=client.execute('SHOW TABLES')
print (d)
d=client.execute( '''select count(1) from satellite.list;''')
print (d)
d=client.execute( '''select count(1) from satellite.space;''')
print (d)
d=client.execute( '''select distinct(l.NORAD_CAT_ID), l.SATNAME
 from satellite.space s left join satellite.list l on l.NORAD_CAT_ID = s.SAT_ID;''')
print ('Список спутников с данными:')
print (d)

[('list',), ('space',)]
[(15369,)]
[(864,)]
Список спутников с данными
[(35243, 'FENGYUN 1C DEB'), (5149, 'THORAD AGENA D DEB'), (20580, 'HST'), (25544, 'ISS (ZARYA)'), (32181, 'FENGYUN 1C DEB'), (32218, 'FENGYUN 1C DEB')]


In [151]:
# ГЛАВНАЯ вставка данных о координатах проекций
msg_counter = 0
for msg in consumer:
    msg_counter +=1
    #print (msg.value)
    print ('-------------------NEW_MSG----------------------------------------------')
    l = msg.value.split('\n')
    tuples_space = []
    for row in l:
        tuples_space.append('(' + row + ')')
    values_for_insert = ','.join(tuples_space)
    #print (values_for_insert)
    sql = '''INSERT INTO satellite.space VALUES {};'''.format(values_for_insert)
    print (sql)
    d=client.execute(sql)
    print ('------------------Count_msg = ' + str(msg_counter))
    print ('------------------sleep 1')
    time.sleep(1)
    #client.execute('INSERT INTO satellite.space VALUES  (%s)' % ', '.join(list(map(str, m.value.values()))))

---------------------------NEW_MSG----------------------------------------------
INSERT INTO satellite.space VALUES (20580,0,'2020-06-28 00:00:00',24.116039647991453,28.544480541107013),(20580,1,'2020-06-28 00:10:00',28.407691185322264,68.06944671402),(20580,2,'2020-06-28 00:20:00',20.071998247557154,106.17426576805475),(20580,3,'2020-06-28 00:30:00',3.78728099593445,138.72618681524756),(20580,4,'2020-06-28 00:40:00',-13.82197268413227,169.9691296296501),(20580,5,'2020-06-28 00:50:00',-26.295358896774047,-154.41362478097935),(20580,6,'2020-06-28 01:00:00',-27.477201623782474,-114.47694936204898),(20580,7,'2020-06-28 01:10:00',-16.659050782392224,-77.79684778704552),(20580,8,'2020-06-28 01:20:00',0.49678610989595834,-46.15314206985387),(20580,9,'2020-06-28 01:30:00',17.487204270541447,-14.319963257267899),(20580,10,'2020-06-28 01:40:00',27.765007050972578,22.74549498783464),(20580,11,'2020-06-28 01:50:00',25.828857969084584,62.68642990873874),(20580,12,'2020-06-28 02:00:00',12.848670699

---------------------------NEW_MSG----------------------------------------------
INSERT INTO satellite.space VALUES (32218,0,'2020-06-28 00:00:00',81.34774924537444,-68.49106922320114),(32218,1,'2020-06-28 00:10:00',54.60903340885593,-154.1525736877301),(32218,2,'2020-06-28 00:20:00',19.628376799932834,-165.86660013890375),(32218,3,'2020-06-28 00:30:00',-16.077494732369985,-173.96505094221388),(32218,4,'2020-06-28 00:40:00',-51.732365750734324,174.96708210860834),(32218,5,'2020-06-28 00:50:00',-81.3084409931282,101.32134135861155),(32218,6,'2020-06-28 01:00:00',-54.01103384832115,13.07505482717412),(32218,7,'2020-06-28 01:10:00',-18.408010803157538,1.4269851073727011),(32218,8,'2020-06-28 01:20:00',17.309050805596712,-6.667887383186519),(32218,9,'2020-06-28 01:30:00',52.35918789129807,-17.78882323991831),(32218,10,'2020-06-28 01:40:00',81.25960679326805,-89.10851285566186),(32218,11,'2020-06-28 01:50:00',55.25755096431966,-178.8462286403758),(32218,12,'2020-06-28 02:00:00',20.299166997

---------------------------NEW_MSG----------------------------------------------
INSERT INTO satellite.space VALUES (5149,0,'2020-06-28 00:00:00',60.01635911663098,-77.1258420043091),(5149,1,'2020-06-28 00:10:00',25.209661253099974,-92.12503931056312),(5149,2,'2020-06-28 00:20:00',-10.128086593604483,-100.93266265110378),(5149,3,'2020-06-28 00:30:00',-44.94948325750878,-111.43410857947238),(5149,4,'2020-06-28 00:40:00',-76.69078077111638,-149.93594851166645),(5149,5,'2020-06-28 00:50:00',-62.870912416436376,92.6121057170082),(5149,6,'2020-06-28 01:00:00',-29.157443610877444,76.22775459776827),(5149,7,'2020-06-28 01:10:00',5.556707706814573,67.36050941740682),(5149,8,'2020-06-28 01:20:00',40.4218354070651,57.50948974952196),(5149,9,'2020-06-28 01:30:00',73.61649663059634,28.12248106707942),(5149,10,'2020-06-28 01:40:00',65.79736094275243,-97.05175287499922),(5149,11,'2020-06-28 01:50:00',31.345234007919974,-115.76996366843744),(5149,12,'2020-06-28 02:00:00',-3.993228914543774,-124.86928

---------------------------NEW_MSG----------------------------------------------
INSERT INTO satellite.space VALUES (32181,0,'2020-06-28 00:00:00',-34.36237746613956,110.1407563230008),(32181,1,'2020-06-28 00:10:00',0.5950316057373574,100.99116612186975),(32181,2,'2020-06-28 00:20:00',35.63124083140693,91.7225473666968),(32181,3,'2020-06-28 00:30:00',69.63364312369492,69.38295627087466),(32181,4,'2020-06-28 00:40:00',69.86598456674992,-59.33577925227295),(32181,5,'2020-06-28 00:50:00',35.12329185335538,-82.16621059544569),(32181,6,'2020-06-28 01:00:00',-1.1744340288802684,-91.60107898448598),(32181,7,'2020-06-28 01:10:00',-37.343364645639966,-101.21792343948322),(32181,8,'2020-06-28 01:20:00',-71.59002847697215,-126.4883809157321),(32181,9,'2020-06-28 01:30:00',-68.1438019331238,105.67926625433542),(32181,10,'2020-06-28 01:40:00',-34.224703007991025,85.1157524262504),(32181,11,'2020-06-28 01:50:00',0.7335976633153599,75.97691432153074),(32181,12,'2020-06-28 02:00:00',35.77004348828736,

---------------------------NEW_MSG----------------------------------------------
INSERT INTO satellite.space VALUES (35243,0,'2020-06-28 00:00:00',-64.28743743959289,-170.85707327129944),(35243,1,'2020-06-28 00:10:00',-29.153843147971788,172.79630353734157),(35243,2,'2020-06-28 00:20:00',6.315147576535997,164.34100712445357),(35243,3,'2020-06-28 00:30:00',41.349629730109655,154.96913647122005),(35243,4,'2020-06-28 00:40:00',74.55132925781793,126.12680193365553),(35243,5,'2020-06-28 00:50:00',65.6709652889765,-2.0787625273104333),(35243,6,'2020-06-28 01:00:00',30.695783386051858,-19.38143417573052),(35243,7,'2020-06-28 01:10:00',-5.683751804953188,-28.057441958744388),(35243,8,'2020-06-28 01:20:00',-42.260248454465504,-37.784529002869384),(35243,9,'2020-06-28 01:30:00',-76.5253836502623,-72.57827869091528),(35243,10,'2020-06-28 01:40:00',-62.37986873019327,162.59849712043479),(35243,11,'2020-06-28 01:50:00',-27.172417537893267,147.40342611334196),(35243,12,'2020-06-28 02:00:00',8.282109

KeyboardInterrupt: 