# Unsupervised grouping application of XAS spectra using pyspark, scikit-learn, and dash

### Motivation
My advisor at UC Santa Cruz has decades worth of *manually processed* XAS data, specifically EXAFS data. As I cranked through material research learning tips from my advisor, I dreamt of the potential of his decades of work reducing data as building a training set for a highly technical analysis. Before I got too far ahead of myself, I wanted to devise a realistic goal that will be part of the foundation for my loftier ideas. 

Enter unsupervised clustering. I thought it would be useful for anyone, from XAS experts to undergraduates, to have an automatic classifier that could detect a "new" spectrum that differs from previously collected data. This detection could relate to a new material, an experimental error, novel behavior, a phase transition, etc. 

To do this with a subset of the Bridges Historical EXAFS Database I wanted to approach with scalability in mind for future use. This is why I opted to delve into using Apache Spark, specifically pyspark. While the use of pyspark for this small dataframe (and loading the normalized mfcc dataframe) is not necessary, it serves as a starting point for scaling, as well as educational for people working on similar frameworks. 

To get pyspark running in a Colab environment
I am following https://towardsdatascience.com/pyspark-in-google-colab-6821c2faf41c except for the fact I am wget'ing an archvie spark, see link difference . . .




In [0]:
!wget http://archive.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz

--2020-02-26 22:47:48--  http://archive.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 163.172.17.199
Connecting to archive.apache.org (archive.apache.org)|163.172.17.199|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 226027370 (216M) [application/x-gzip]
Saving to: ‘spark-2.3.3-bin-hadoop2.7.tgz.2’


2020-02-26 22:48:16 (8.07 MB/s) - ‘spark-2.3.3-bin-hadoop2.7.tgz.2’ saved [226027370/226027370]



In [0]:
# Get java installed
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xvf spark-2.3.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7"


In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Then we load the mini hEXAFS database from my github:

In [0]:
!wget "http://exafs.ucsc.edu/bridges/data_agg_mini.csv"

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import pandas as pd

dataset = spark.read.load("data_agg_mini.csv",format="csv", sep=",", inferSchema="true", header="true",parserLib="univocity",multiLine=True)

general dataframe overview, make sure multiline worked for e,mu arrays

In [0]:
dataset.show()



```
# This is formatted as code
```

now strip all the "\n's" from the array


In [0]:
from pyspark.sql.functions import *
ele_selector="Fe"
df=dataset.na.drop(subset=["5"])

df=df.withColumn('5', regexp_replace('5', '[\n\[\]]', ' '))
df=df.withColumn('6', regexp_replace('6', '[\n\[\]]', ' '))
df2=df.withColumn("5", split("5", "\s"))
df2=df2.withColumn("6", split("6", "\s"))

## cleaning begins

We start with cutting to region of interest for each spectra, and casting the string array as an array of floats, which we can operate on




In [0]:
from pyspark.sql.column import _to_java_column, _to_seq, Column
from pyspark import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, FloatType,IntegerType
from pyspark.sql.functions import array
import numpy as np

#cast string array as an array of floats
df2=df2.withColumn("5", df2["5"].cast('array<float>'))
df2=df2.withColumn("6", df2["6"].cast('array<float>'))

#useful for checking the lengths match between e,mu after processing
slen = udf(lambda s: len(s), IntegerType())

# Removes 'none' entries in lists of e, mu
nonone = udf(lambda s: [s for s in s if s is not None], 'array<float>')

# Cuts out the pre-edge region
def edgecutter(t,r):
  try:
    diff = [j-i for i, j in zip(t[:-1], t[1:])]
    cl=diff.index(np.max(diff))
    if cl + 50 < len(diff):
      cl=diff.index(np.max(diff))+50
    else:
      cl=diff.index(np.max(diff))

  except:
    cl=0
  return r[cl:]

# apply functinos to arrays in e,mu columns
adf = df2.withColumn("5", nonone(df2["5"]))
adf = adf.withColumn("6", nonone(adf["6"]))
adf = adf.withColumn("ecnt", slen(adf["5"]))
adf = adf.withColumn("mucnt", slen(adf["6"]))

# Check mu and e are still the same length
bdf = adf.filter(adf["mucnt"]==adf["ecnt"])

# Init and apply pyspark udf for cutting of pre-edge
edgecut_udf=udf(edgecutter, 'array<float>')
cdf=bdf.withColumn("ecut", edgecut_udf(bdf["6"],bdf["5"]))
cdf=cdf.withColumn("mucut", edgecut_udf(cdf["6"],cdf["6"]))

Now we zero the energy and subtract the mean of mu

In [0]:
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql.types import *
import pandas as pd
import numpy as np

# This is the df->RDD->df wash cycle
# Washes off the scalar udf stench ...
# this issue needs to be adressed as PySpark development improves VectorUDF functionality

rdd = cdf.rdd.map(list)
zdf=rdd.toDF()

@pandas_udf('array<float>')
def min_off(v):
    return pd.Series(v).apply(lambda x: x-np.min(x))

@pandas_udf('array<float>')
def mean_off(v):
    return pd.Series(v).apply(lambda x: x-np.mean(x))

zdf1=zdf.withColumn("e_zset", min_off(zdf["_14"]))
zdf2=zdf1.withColumn("mu_zset", mean_off(zdf1["_15"]))


Fit background spline, and subtract it from resampled spectra. 
We save the normalized data to be used in the Dash app plot of categorized, normalized EXAFS scans

In [0]:

## Trying to work spline fitting into a pysparkable udf, or maybe pandas udf. 2 arrays in, returns just m
from scipy.interpolate import splrep, splev, LSQUnivariateSpline, UnivariateSpline, interp1d
import numpy as np
from scipy.signal import find_peaks
from pyspark.sql.functions import udf, col

# set number of points in resample of all spectra
pt_res= 800

def spliner(v,u):
  
  R=.5 # 0.5 ang. is a good example of our low-cut for R-space background to avoid in EXAFS 
  num_kts=int(1+(2.*R/(3.14159265359))*(0.512393)*np.sqrt(u[-1]))  #define number of knots based similar approach as xray-larch 
  #num_kts=9  ### possible alternative to hard-code number of spline knots
  try:
    # Could be used to define peaks that would bound the background spline fit  
    #peaks, _=find_peaks(v,prominence=.033)
    #pk_pts=[v[i] for i in peaks]
    #xpk_pts=[u[i] for i in peaks]

    # Define knot array for bkg
    maxe=u[-1]
    kts=[(maxe/num_kts)*i+1 for i in range(1,num_kts-1)]
    u.insert(0,0.0)
    v.insert(0,0.0)

    # Slow oscillating background with <15 splines
    bkg=LSQUnivariateSpline(x=u, y=v,t=kts)
    pt_res= 800

    # Interpolation function of mu,e for resampling
    inter=interp1d(x=u,y=v)
    grid=[(maxe/pt_res)*nn for nn in range(pt_res)]

    # Uncomment and return subsline to see the bkg-spline that is taken off
    #subspline=[float(j-bkg(i)) for i,j in zip(u,v)]

    # Result of subtracting slow-moving bkg from resampled mu
    bkg_grid=[float(inter(i)-bkg(i)) for i in grid]

    output=bkg_grid

  except:
    output=[22.,33.]
    # a dummy output for when knot-definer fails

   
  return output

# init and apply above defined UDF
uspliner=udf(spliner,'array<float>')
ndf=zdf2.withColumn("norm_e", uspliner(zdf2["mu_zset"], zdf2["e_zset"]))

#Filter out the failures
from pyspark.sql.functions import size
ndf1=ndf.filter(size('norm_e') > 4)

# Convert to pandas dataframe and save, zip, download
ndf_out=ndf.toPandas()
ndf_out.to_csv("data_agg_mini_cleanest.csv")
!zip data_agg_mini_cleanest.zip data_agg_mini_cleanest.csv
import time

# May fail to download, as sleeptime is used to wait for zip file to be recognized in colab drive
time.sleep(10)
from google.colab import files
files.download("data_agg_mini_cleanest.zip")


Finally, we can use the process the normalized (and homogenously sampled) XAS spectra. In clustering regions of audio signals, [MFCC](https://en.wikipedia.org/wiki/Mel-frequency_cepstrum) arrays are useful. In following established techniques, I found this transformation very useful in conjunction with DBSCAN.

In [0]:
import numpy as np
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType,FloatType
import librosa
from librosa import display
import time
import matplotlib.pyplot as plt

#########
# Constants tweaked to work
pt_res=800
nfft=180
hopl=nfft+1
sri=800
##########

def mffcer(u):
  try:
    # reshape and weight mu by e (like in k*chi(k)), then transform list -> array
    bb=np.transpose(np.array(u))
    bb=[float(i*bb[i]) for i in range(len(u))]
    cc=np.array(bb)

    # Librosa package contains wealth of audio signal processing functions
    # We use it to stft -> mel spectrogram -> mfcc
    spec=librosa.stft(y=cc,n_fft=nfft)
    melspec=librosa.feature.melspectrogram(y=cc,S=spec)
    mffc_ex=librosa.feature.mfcc(y=cc, sr=sri, S=melspec, n_mfcc=50000)

    # Unpack the 2D array into 1D array, then assure they are floats
    flatmf=np.absolute(mffc_ex).flatten()
    flatmf2=[float(i) for i in flatmf]
    output=flatmf2

  except:
    output=[[22.,33.]]

   
  return output

# Init and apply MFCC UDF
spectralize=udf(mffcer, ArrayType(FloatType()))
mfcc_df=ndf1.withColumn("mfcc", spectralize(ndf1["norm_e"]))

#mfcc_df.select("mfcc").show()
# Reduce dataframe to the only needed columns
df=mfcc_df.select([c for c in mfcc_df.columns if c  in {'_2','_5','mfcc','4'}])

## Saving, zipping and downloading the stuff
df_out=df.toPandas()
df_out.to_csv("mfcc_edge_2col_gmini.csv")
!zip mfcc_gmini.zip mfcc_edge_2col_gmini.csv
time.sleep(7)
from google.colab import files
!ls -lht
files.download("mfcc_gmini.zip")
