In [1]:
from glob import iglob
from functools import reduce
from itertools import islice

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf, array
import matplotlib.pyplot as plt
from sp8tools import SpkHits

Connect to PySpark master

In [2]:
builder = (SparkSession
           .builder
           .appName('PySpark Example')
           .master('spark://carbon.uedalab.local:7077')
           .config("spark.jars.packages", "org.diana-hep:spark-root_2.11:0.1.15")
           .config("spark.cores.max", 1)
           )
spark = builder.getOrCreate()

Read root files

In [3]:
globbed = iglob("/helium/analysis/SPring-8/2014B/06_Ar_15atm_258eV/ResortLi/ResortLess000[123].root")
loaded = (spark.read.format("org.dianahep.sparkroot").load(fn) for fn in globbed)
df = reduce(DataFrame.union, loaded)
df.printSchema()

root
 |-- IonNum: integer (nullable = true)
 |-- IonX0: double (nullable = true)
 |-- IonY0: double (nullable = true)
 |-- IonT0: double (nullable = true)
 |-- IonFlag0: integer (nullable = true)
 |-- IonX1: double (nullable = true)
 |-- IonY1: double (nullable = true)
 |-- IonT1: double (nullable = true)
 |-- IonFlag1: integer (nullable = true)
 |-- IonX2: double (nullable = true)
 |-- IonY2: double (nullable = true)
 |-- IonT2: double (nullable = true)
 |-- IonFlag2: integer (nullable = true)
 |-- IonX3: double (nullable = true)
 |-- IonY3: double (nullable = true)
 |-- IonT3: double (nullable = true)
 |-- IonFlag3: integer (nullable = true)
 |-- ElecNum: integer (nullable = true)
 |-- ElecX0: double (nullable = true)
 |-- ElecY0: double (nullable = true)
 |-- ElecT0: double (nullable = true)
 |-- ElecFlag0: integer (nullable = true)
 |-- ElecX1: double (nullable = true)
 |-- ElecY1: double (nullable = true)
 |-- ElecT1: double (nullable = true)
 |-- ElecFlag1: integer (nullable = tr

Restruct

In [8]:
@udf(SpkHits)
def combine_hits(xarr, yarr, tarr, flagarr, nhits):
    zipped = ({'x': x,
               'y': y,
               't': t,
               'flag': f
               } for x, y, t, f in zip(xarr, yarr, tarr, flagarr))
    return list(islice(zipped, nhits))


imhits = sum(1 for c in df.columns if c.startswith('IonT'))
emhits = sum(1 for c in df.columns if c.startswith('ElecT'))
restructured = (
    df
        .withColumn('itarr', array(*['IonT{}'.format(i) for i in range(imhits)]))
        .withColumn('ixarr', array(*['IonX{}'.format(i) for i in range(imhits)]))
        .withColumn('iyarr', array(*['IonY{}'.format(i) for i in range(imhits)]))
        .withColumn('iflagarr', array(*['IonFlag{}'.format(i) for i in range(imhits)]))
        .withColumnRenamed('IonNum', 'inhits')
        .withColumn('etarr', array(*['ElecT{}'.format(i) for i in range(emhits)]))
        .withColumn('exarr', array(*['ElecX{}'.format(i) for i in range(emhits)]))
        .withColumn('eyarr', array(*['ElecY{}'.format(i) for i in range(emhits)]))
        .withColumn('eflagarr', array(*['ElecFlag{}'.format(i) for i in range(emhits)]))
        .withColumnRenamed('ElecNum', 'enhits')
        .select(combine_hits('itarr', 'ixarr', 'iyarr', 'iflagarr', 'inhits').alias('ihits'),
                combine_hits('etarr', 'exarr', 'eyarr', 'eflagarr', 'enhits').alias('ehits'))
)

root
 |-- tag: long (nullable = false)
 |-- ihits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: double (nullable = false)
 |    |    |-- y: double (nullable = false)
 |    |    |-- t: double (nullable = false)
 |    |    |-- flag: integer (nullable = false)
 |-- ehits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: double (nullable = false)
 |    |    |-- y: double (nullable = false)
 |    |    |-- t: double (nullable = false)
 |    |    |-- flag: integer (nullable = false)



In [10]:
spark.stop()