## Notebook for etl on train and test csvs for cern competition
Using pyspark sql libraries
Filename is loaded into abs_tz_weighting_process and csv read in.  
Junk data removed, abs_tz calculated, transverse momentum calculated,
and weighted transverse momentum last.

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import abs, sqrt, udf
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import DoubleType

In [2]:
spark = SparkSession.builder.appName("abs_tz").getOrCreate()

In [27]:
def interpolate_weight_pt(pT):
    if pT < 0.5:
        weight_pt = 0.2
    elif pT > 3:
        weight_pt = 1.0
    else:
        weight_pt = 0.32*(pT - 0.5) + 0.2
    return weight_pt

interpolate_weight_function = udf(interpolate_weight_pt, DoubleType())

def abs_tz_weighting_process(fn_csv, local_spark_session, interpolate_weight_function):
    fn_full_path = fn_csv.split('/')
    fn = fn_full_path[-1]
    print('fn: ', fn)
    
    # Import file
    df = spark.read.csv(fn_csv, inferSchema=True, header=True)
    
    # Remove junk data
    df = df.where(df['particle_id'] != 0)
    
    # Add abs_tz column to new df abs_tz
    abs_tz = df.withColumn('abs_tz', abs(df['tz']))

    # Add weight_pt 
    # (from whitepaper: The high energy particles (large transverse momentum pT ) are the most
    #     interesting ones. As the bulk of the tracks have low pT , we have to explicitly favor
    #     high pT . weight pt is 0.2 if pT < 0.5GeV and 1. for pT > 3GeV, with a linear
    #     interpolation in between. Note that the lower the pT , the larger the geometrical
    #     curvature; at large pT tracks appear as straight lines.)
    pT = abs_tz.withColumn('pT', sqrt(abs_tz['tpx']**2+ abs_tz['tpy']**2))
    weight_pt = pT.withColumn('weight_pt', interpolate_weight_function(pT['pT']))
    
    # Final sorted df
    data = weight_pt.sortWithinPartitions( ['particle_id','abs_tz'] ) 
    # data.write.option("header", True).csv( ('/home/ec2-user/SageMaker/efs/weighted' + '/weighted-' + fn) )
    data.coalesce(1).write.option("header", True).csv( ('/home/ec2-user/SageMaker/efs/weighted' + '/weighted-' + fn) )
    print('finished writing ', ('weighted-' + fn) )

In [37]:
url = '/home/ec2-user/SageMaker/efs/dataset/train/event000001000-truth.csv'
df = spark.read.csv(url, inferSchema=True, header=True)
df = df.where(df['particle_id'] != 0)
print('Original file minus particle_id == 0: ', df.count())

Original file minus particle_id == 0:  103305


In [28]:
abs_tz_weighting_process(url, spark, interpolate_weight_function)

fn:  event000001000-truth.csv
finished writing  weighted-event000001000-truth.csv


In [29]:
df = spark.read.csv('/home/ec2-user/SageMaker/efs/weighted/weighted-event000001000-truth.csv', inferSchema=True, header=True)

In [30]:
df.printSchema()

root
 |-- hit_id: integer (nullable = true)
 |-- particle_id: long (nullable = true)
 |-- tx: double (nullable = true)
 |-- ty: double (nullable = true)
 |-- tz: double (nullable = true)
 |-- tpx: double (nullable = true)
 |-- tpy: double (nullable = true)
 |-- tpz: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- abs_tz: double (nullable = true)
 |-- pT: double (nullable = true)
 |-- weight_pt: double (nullable = true)



In [31]:
df.show()

+------+----------------+--------+--------+--------+----------+---------+---------+----------+-------+-------------------+-------------------+
|hit_id|     particle_id|      tx|      ty|      tz|       tpx|      tpy|      tpz|    weight| abs_tz|                 pT|          weight_pt|
+------+----------------+--------+--------+--------+----------+---------+---------+----------+-------+-------------------+-------------------+
| 20880|4503668346847232|-6.37305| 31.6019|  -20.08|-0.0748728| 0.319162|-0.203232|1.81814E-5|  20.08| 0.3278266591109393|                0.2|
| 29323|4503668346847232|-16.9621| 71.0005|-45.6381|-0.0964283| 0.311619|-0.204427|1.40257E-5|45.6381|0.32619751409520276|                0.2|
| 35621|4503668346847232|-31.1884| 112.046|-73.2019|  -0.11823| 0.302181|-0.206065|1.03894E-5|73.2019| 0.3244868096872352|                0.2|
| 42238|4503668346847232|-54.5593| 162.826|-108.475| -0.151918|  0.28737|-0.204184|7.53231E-6|108.475| 0.3250547578854984|                0.2|

In [39]:
print('Count of final file for validation: ', df.count())

Count of final file for validation:  103305


## Iterative functions over full filelist of train/test

In [35]:
# Train folder

PATH = '/home/ec2-user/SageMaker/efs/dataset/train/'
for filename in os.listdir(PATH):
    print("Evaluating ", filename)
    url = os.path.join(PATH, filename)
    abs_tz_weighting_process(url, spark, interpolate_weight_function)

event000005168-cells.csv
event000001690-truth.csv
event000006162-hits.csv
event000003454-particles.csv
event000001158-hits.csv
event000004704-hits.csv
event000002725-particles.csv
event000009443-cells.csv
event000004183-particles.csv
event000005433-hits.csv
event000004439-cells.csv
event000008800-truth.csv
event000005254-truth.csv
event000007018-particles.csv
event000004525-truth.csv
event000005236-particles.csv
event000003472-truth.csv
event000009511-particles.csv
event000002743-truth.csv
event000007215-hits.csv
event000001210-cells.csv
event000004507-particles.csv
event000003651-hits.csv
event000002657-cells.csv
event000007661-cells.csv
event000007036-truth.csv
event000004380-hits.csv
event000006932-cells.csv
event000003386-cells.csv
event000006307-truth.csv
event000001928-cells.csv
event000008390-cells.csv
event000001672-particles.csv
event000002922-hits.csv
event000005074-particles.csv
event000008620-particles.csv
event000001852-truth.csv
event000006324-hits.csv
event000003616-part