In [1]:
from pprint import pprint
from itertools import chain, product
from functools import reduce, partial
from glob import iglob

from yaml import safe_load
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql import SparkSession, DataFrame, functions as f
from dltools import load_combiner
from dltools.sacla import restructure, load_analyzer

In [2]:
# %% Load config file
with open("aq035--aq036; Config.yaml", "r") as file:
    print("Loading config file...")
    config = safe_load(file)

# %% Load momentum model
print("Loading momentum model...")
analyzer = load_analyzer(config["momentum_analyzer"].copy())
print("Done!")

Loading config file...
Loading momentum model...
Done!


In [3]:
# %% Load PySpark
builder = (
    SparkSession
    .builder
    .config("spark.executor.memory", "8g")
    .config("spark.driver.memory", "8g")
    .config("spark.driver.maxResultSize", "2g")
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:2.7.0,"
        "org.mongodb.spark:mongo-spark-connector_2.11:2.3.1,"
        "org.diana-hep:spark-root_2.11:0.1.15,"
    )
)

print("Loading PySpark...")
spark = builder.getOrCreate()
print("Done!")

Loading PySpark...
Done!


In [4]:
# %% Load data
from pyspark.sql.types import (
    ArrayType, BooleanType, LongType, StructField, StructType, DoubleType,
)
from dltools import SpkHits


print("Loading data...")
df = (
    spark
    .read
    .format("com.mongodb.spark.sql.DefaultSource")
    .option("uri", "mongodb://mongodb/sacla_2017b8065.resorted")
    .option("pipeline", """[
        {
            $match: {
                aq: {$in: [35, 36]},
            },
        },
    ]""")
    .schema(
        StructType([
            StructField("tag", LongType()),
            StructField("hits", SpkHits)
        ])
    )
    .load()
#     .cache()
)
df.printSchema()
# df.show()
print("Done!")

Loading data...
root
 |-- tag: long (nullable = true)
 |-- hits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- t: double (nullable = false)
 |    |    |-- x: double (nullable = false)
 |    |    |-- y: double (nullable = false)
 |    |    |-- as_: map (nullable = false)
 |    |    |    |-- key: string
 |    |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |    |-- pz: double (nullable = false)
 |    |    |    |    |-- px: double (nullable = false)
 |    |    |    |    |-- py: double (nullable = false)
 |    |    |    |    |-- ke: double (nullable = false)
 |    |    |-- flag: integer (nullable = true)

Done!


In [5]:
# %% Analyze the data
print("Analyzing the data...")
analyzed = (
    df
    .withColumn("hits", analyzer(f.col("hits")))
)
analyzed.printSchema()

(
    analyzed
    .select(f.explode("hits").alias("h"))
    .select(f.explode("h.as_").alias("as_", "h"))
    .select(
        "as_",
        f.col("h.pz").alias("pz"),
        f.col("h.px").alias("px"),
        f.col("h.py").alias("py"),
        f.col("h.ke").alias("ke"),
    )
    .show()
)

Analyzing the data...
root
 |-- tag: long (nullable = true)
 |-- hits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- t: double (nullable = false)
 |    |    |-- x: double (nullable = false)
 |    |    |-- y: double (nullable = false)
 |    |    |-- as_: map (nullable = false)
 |    |    |    |-- key: string
 |    |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |    |-- pz: double (nullable = false)
 |    |    |    |    |-- px: double (nullable = false)
 |    |    |    |    |-- py: double (nullable = false)
 |    |    |    |    |-- ke: double (nullable = false)
 |    |    |-- flag: integer (nullable = true)

+---+-------------------+-------------------+-------------------+------------------+
|as_|                 pz|                 px|                 py|                ke|
+---+-------------------+-------------------+-------------------+------------------+
|H1p| -94.64085870354376| -37.85135724690025| -22.51806040189454| 

In [6]:
# Insert data to MongoDB
print("Updating data...")
(
    analyzed
    .write
    .format("com.mongodb.spark.sql.DefaultSource")
    .option("uri", "mongodb://mongodb/sacla_2017b8065.resorted")
    .option("replaceDocument", "false")
    .option("shardKey", "{tag: true}")
    .mode("append")
    .save()
)
print("Done!")

Updating data...
Done!
