In [1]:
import sys
import os
import time
from datetime import timedelta, datetime, tzinfo
import numpy as np
import matplotlib.pyplot as plt
import h5py
import pickle

from kafka import KafkaConsumer, TopicPartition

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, OffsetRange

from SharpWriter import SharpWriter
from SparkSharpRunner import SparkSharpRunner

# make graphics inline
%matplotlib inline

# Defining Experiment-Specific Parameters

In [2]:
sid = 17554

sharpWriter = SharpWriter()
# pixel size (um), distance (m), wavelength (nm), det_side 
sharpWriter.init(55, 0.5, 0.083, 100) 
prbfile = '../../data/17554/recon_17554_probe.npy'
cxifile = '../../data/17554/hxn17554.kafka.cxi'

sharpRunner = SparkSharpRunner()
# print interval, the cxi file
args = ['sharp-nsls2', '-o', '10',  cxifile]

# Defining the Kafka Consumer Method that Reads Frames and Scan Points

In [3]:
def get_data(consumer, n):
    
    frames = []
    xs = []
    ys = []   
    
    for i in range(0, n):
        msg = next(consumer)
        value = pickle.loads(msg.value)
        # print(i, value[0], len(value), msg.offset)
        frames.extend(value[1])
        xs.extend(value[2])
        ys.extend(value[3])
        
    return frames, np.asarray(xs), np.asarray(ys)   

# Creating the Kafka Consumer

In [4]:
topic='topic-d'

partition = TopicPartition(topic, 0)

consumer = KafkaConsumer(
    group_id='my-group',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=False)

consumer.assign([partition])

# Getting the Scan Data from the Kafka Topic

Select the initial offset

In [5]:
# consumer.seek(partition, 9009)
start_offset = consumer.position(partition)
print(start_offset)

2002


Get the number of messages of the scan data

In [6]:
msg = next(consumer)
value = pickle.loads(msg.value)
n = value[0]
print("%s:%d:%d: n:%d" % (msg.topic, msg.partition, msg.offset, n) ) 

topic-d:0:2002: n:1000


Get frames and scan points

In [7]:
print("getting frames and scan points from Kafka, updating a file ...");
t1 = datetime.now();
frames, xs, ys = get_data(consumer, n)
t2 = datetime.now();
print ("processing time: ", (t2 - t1))

getting frames and scan points from Kafka, updating a file ...
processing time:  0:00:07.726523


# Updating the SHARP-NSLS2 Input File

In [8]:
print("update a cxi file ...");
t1 = datetime.now();
sharpWriter.write(cxifile, prbfile, frames, xs, ys)
t2 = datetime.now();
print ("processing time: ", (t2 - t1))

update a cxi file ...
processing time:  0:00:01.404288


# Running the SHARP-NSLS2 MPI/GPU Application on Four Workers

In [9]:
partitions = 4

print("running sharp-mpi on spark workers...");
t1 = datetime.now();
tsharp = sharpRunner.run_with_spark(args, partitions)
t2 = datetime.now();
print ("total processing time: ", (t2 - t1))

print ("sharp time on each worker: ")
for i in range(0, partitions):
    print(i, "initialization + reconstruction time: ", tsharp[i])

running sharp-mpi on spark workers...
total processing time:  0:00:31.917395
sharp time on each worker: 
0 initialization + reconstruction time:  0:00:25.322300
1 initialization + reconstruction time:  0:00:25.797465
2 initialization + reconstruction time:  0:00:25.319605
3 initialization + reconstruction time:  0:00:24.772900
