# LIGO - Gravitational Waves simple classification - Part 2
Here we take the parquet dataframe stored, we load up in memory and we try to classify it is describen in *"LIGO - Loading of the training dataset.ipynb"* 

> Find the proper file to run this all in /code/gw/training_linear_svc.py and /code/gw/linear_svc_test.py

In [78]:
#Initialize the spark context and tools for processing the stored rows.
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

#Going into spark context to load it all from a parquet format.
spark = SparkSession.builder.appName("pyspark-gw").getOrCreate()

In [81]:
# This tests if we can re load it the first few rows... 
parquet_df = spark.read.parquet('/dataset/gw_gravity_spy_dataframe_test')
parquet_df.head()

Row(event_time=1127316933.86035, ifo='H1', peak_time=1127316933, peak_time_ns=860351085, start_time=1127316933, start_time_ns=836914062, duration=0.6152300238609311, search='Omicron', process_id=0, event_id=214, peak_frequency=1181.22387695312, central_freq=1242.05822753906, bandwidth=2405.51831054688, channel='GDS-CALIB_STRAIN', amplitude=1.8502000469038799e-22, snr=11.189049720764197, confidence=0, chisq=0, chisq_dof=0, param_one_name='phase', param_one_value=-0.22053000000000003, url1='https://panoptes-uploads.zooniverse.org/production/subject_location/5b5bc81b-feba-4da6-94b0-97f24fe2d167.png', url2='https://panoptes-uploads.zooniverse.org/production/subject_location/285db5f3-5a63-4f47-a958-1c453760e358.png', url3='https://panoptes-uploads.zooniverse.org/production/subject_location/e657905e-b86a-4c28-bad3-73d219c379f9.png', url4='https://panoptes-uploads.zooniverse.org/production/subject_location/f2f9ad17-803b-446d-9a9c-d913c90cf8de.png', png=DenseVector([0.095, 0.1211, 0.1757, 0.09

In [80]:
# initialise sparkContext
spark = SparkSession.builder \
    .master('local') \
    .appName('myAppName') \
    .config('spark.executor.memory', '12gb') \
    .config("spark.cores.max", "2") \
    .getOrCreate()

sc = spark.sparkContext

# using SQLContext to read parquet file
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [56]:
# to read parquet file (full) # TODO - Research a way to read ONLY a given partition.
#parquet_df = spark.read.parquet('/dataset/gw_gravity_spy_dataframe')
parquet_df = spark.read.parquet('/dataset/gw_gravity_spy_dataframe')


In [82]:
#filter the training set
train_set= parquet_df.where(parquet_df['sample_type']=='train')

In [83]:
train_set.head()

Row(event_time=1127316933.86035, ifo='H1', peak_time=1127316933, peak_time_ns=860351085, start_time=1127316933, start_time_ns=836914062, duration=0.6152300238609311, search='Omicron', process_id=0, event_id=214, peak_frequency=1181.22387695312, central_freq=1242.05822753906, bandwidth=2405.51831054688, channel='GDS-CALIB_STRAIN', amplitude=1.8502000469038799e-22, snr=11.189049720764197, confidence=0, chisq=0, chisq_dof=0, param_one_name='phase', param_one_value=-0.22053000000000003, url1='https://panoptes-uploads.zooniverse.org/production/subject_location/5b5bc81b-feba-4da6-94b0-97f24fe2d167.png', url2='https://panoptes-uploads.zooniverse.org/production/subject_location/285db5f3-5a63-4f47-a958-1c453760e358.png', url3='https://panoptes-uploads.zooniverse.org/production/subject_location/e657905e-b86a-4c28-bad3-73d219c379f9.png', url4='https://panoptes-uploads.zooniverse.org/production/subject_location/f2f9ad17-803b-446d-9a9c-d913c90cf8de.png', png=DenseVector([0.095, 0.1211, 0.1757, 0.09

In [84]:
train_set.schema

StructType(List(StructField(event_time,DoubleType,true),StructField(ifo,StringType,true),StructField(peak_time,LongType,true),StructField(peak_time_ns,LongType,true),StructField(start_time,LongType,true),StructField(start_time_ns,LongType,true),StructField(duration,DoubleType,true),StructField(search,StringType,true),StructField(process_id,LongType,true),StructField(event_id,LongType,true),StructField(peak_frequency,DoubleType,true),StructField(central_freq,DoubleType,true),StructField(bandwidth,DoubleType,true),StructField(channel,StringType,true),StructField(amplitude,DoubleType,true),StructField(snr,DoubleType,true),StructField(confidence,LongType,true),StructField(chisq,LongType,true),StructField(chisq_dof,LongType,true),StructField(param_one_name,StringType,true),StructField(param_one_value,DoubleType,true),StructField(url1,StringType,true),StructField(url2,StringType,true),StructField(url3,StringType,true),StructField(url4,StringType,true),StructField(png,VectorUDT,true),StructFi

In [15]:
#Verify that the sample type is train (just the head...)
train_set.select("sample_type").show()

+-----------+
|sample_type|
+-----------+
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
|      train|
+-----------+
only showing top 20 rows



In [63]:
train_set.count()

5587

In [123]:
# Extract and prepare the df to be processed with just the due columns. In this case, we want just the image.
# (and a way to associate it with the rest of the columns after classification)
import pyspark.sql.functions as F

train = train_set.select("label","png")

print(train.columns)

#Reshaping the labels as "Chrip" = "True" and all the others as "False"; Chrips are Gravitational Waves.
result = train.where(train.label == "Chirp")
print("Gravitational Waves: {0}".format(result.count()))

train = train.withColumn('gw', (train.label == "Chirp"))
train = train.drop("label")

train = train.withColumn('features', train.png)
train = train.withColumn('label', train.gw)
import pyspark.sql.functions as sf

train = train.drop("png")
train = train.drop("gw")
train = train.withColumn('label', (F.col('label') == True).cast('integer'))

print(train.columns)

result = train.where(train.label == 1)
print("Gravitational Waves (after): {0}".format(result.count()))


['label', 'png']
Gravitational Waves: 1
['features', 'label']
Gravitational Waves (after): 1


In [124]:
train.head()


Row(features=DenseVector([0.095, 0.1211, 0.1757, 0.095, 0.1686, 0.1557, 0.0986, 0.1445, 0.0961, 0.1174, 0.0984, 0.0928, 0.1094, 0.1321, 0.1598, 0.1936, 0.2042, 0.1485, 0.1378, 0.1832, 0.146, 0.1305, 0.1228, 0.0866, 0.0961, 0.086, 0.0832, 0.1614, 0.2007, 0.1734, 0.1181, 0.1218, 0.1957, 0.2773, 0.327, 0.2758, 0.1474, 0.133, 0.101, 0.104, 0.1582, 0.1582, 0.2004, 0.1553, 0.1133, 0.134, 0.1485, 0.1693, 0.1126, 0.1805, 0.2966, 0.2617, 0.1582, 0.1348, 0.1038, 0.107, 0.1372, 0.1383, 0.0961, 0.0832, 0.0858, 0.0939, 0.0917, 0.0978, 0.1274, 0.1259, 0.1023, 0.1452, 0.1697, 0.0989, 0.0838, 0.0994, 0.131, 0.1323, 0.1007, 0.1511, 0.1773, 0.1498, 0.1529, 0.1182, 0.0982, 0.1018, 0.0855, 0.1132, 0.2176, 0.2422, 0.1516, 0.1614, 0.1599, 0.1531, 0.1605, 0.1269, 0.1388, 0.3067, 0.1936, 0.0972, 0.0994, 0.0967, 0.1158, 0.0838, 0.1326, 0.1711, 0.1122, 0.1228, 0.1733, 0.1179, 0.1241, 0.1211, 0.1625, 0.1068, 0.0832, 0.1056, 0.1432, 0.155, 0.1496, 0.1507, 0.1462, 0.1583, 0.1228, 0.0933, 0.0907, 0.0832, 0.1343, 0.

# Training a classification model
We try our training with LinearSVC because there is no conventional SVM alone. 

In [125]:
# Preparing the data... (yes, AGAIN...)
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
import numpy as np

In [126]:
# Linear Support Vector Machine

from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(train)

# Print the coefficients and intercept for linearsSVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))

Coefficients: [-0.0023354687604890275,-0.0027308564457817455,5.8452966695701415e-08,1.4937760172191883e-08,-6.115490972629929e-08,-0.0023135021875544787,-8.417003808340165e-08,2.8089096775116955e-07,-2.7472537577382946e-05,-3.1434046505285955e-06,-0.0021066878355851556,-0.0025167029208009247,9.918369632358402e-08,-1.0135775736715158e-07,-5.2077461883415965e-08,-0.0022117830085003547,-8.94753382230278e-08,5.857248336166786e-07,-8.876923846334899e-08,-0.0024551342130007714,-0.002593656202710736,-7.907970574636907e-08,1.406761136374017e-07,-0.0014081549539250258,-7.620344749998906e-08,-0.0021653633898972716,-0.002682971982335405,1.0940997509975455e-07,-0.0021474024528110452,-0.002663335290764835,6.874195893717143e-08,-5.9813331981471e-08,-0.0008672846820915103,-0.0014449900279040414,-5.4097005557367276e-08,-7.779458459694491e-08,-6.502321713717691e-08,-0.0011886870993511232,1.837415146684483e-07,-1.0507579464751092e-07,-0.0032023557450198805,-0.0013353513227570449,-3.886948234438308e-08,-

# Try and predict from the test set.

In [129]:
#filter the test set
test_set= parquet_df.where(parquet_df['sample_type']=='test')
test_set.count()

86

In [128]:
import pyspark.sql.functions as F

test = test_set.select("label","png")

print(test.columns)

#Reshaping the labels as "Chrip" = "True" and all the others as "False"; Chrips are Gravitational Waves.
result = test.where(test.label == "Chirp")
print("Gravitational Waves: {0}".format(result.count()))

test = test.withColumn('gw', (test.label == "Chirp"))
test = test.drop("label")

test = test.withColumn('features', test.png)
test = test.withColumn('label', test.gw)
import pyspark.sql.functions as sf

test = test.drop("png")
test = test.drop("gw")
test = test.withColumn('label', (F.col('label') == True).cast('integer'))

print(test.columns)

result = test.where(test.label == 1)
print("Gravitational Waves (after): {0}".format(result.count()))

['label', 'png']
Gravitational Waves: 0
['features', 'label']
Gravitational Waves (after): 0


In [136]:
test_dumb = test.drop('label')
type(test_dumb.first())


pyspark.sql.types.Row

In [137]:
test_dumb.first()[0]

DenseVector([0.0939, 0.1216, 0.1268, 0.0967, 0.1084, 0.1403, 0.1413, 0.1274, 0.0899, 0.1169, 0.1346, 0.1706, 0.1656, 0.1096, 0.0832, 0.0857, 0.0877, 0.1132, 0.104, 0.1068, 0.162, 0.1946, 0.1573, 0.1126, 0.1101, 0.1094, 0.2053, 0.3363, 0.2737, 0.1206, 0.0896, 0.0962, 0.1127, 0.1258, 0.2295, 0.2423, 0.0871, 0.1624, 0.183, 0.1451, 0.1174, 0.1054, 0.131, 0.1727, 0.2714, 0.1997, 0.0924, 0.089, 0.1537, 0.1992, 0.1148, 0.1383, 0.152, 0.097, 0.1405, 0.1301, 0.0933, 0.0832, 0.1046, 0.095, 0.1261, 0.1895, 0.1477, 0.1619, 0.1414, 0.0854, 0.101, 0.164, 0.1383, 0.1666, 0.1462, 0.1234, 0.1812, 0.1659, 0.132, 0.1492, 0.2241, 0.2113, 0.147, 0.1101, 0.1034, 0.1767, 0.1694, 0.1007, 0.1237, 0.1124, 0.2371, 0.2481, 0.2268, 0.2195, 0.2236, 0.2037, 0.1915, 0.1481, 0.1926, 0.17, 0.1269, 0.1426, 0.131, 0.0933, 0.1083, 0.1635, 0.1205, 0.0894, 0.0952, 0.1274, 0.1528, 0.1556, 0.2944, 0.3866, 0.3186, 0.1831, 0.1521, 0.1638, 0.1702, 0.1309, 0.0832, 0.0945, 0.1631, 0.1431, 0.0925, 0.0832, 0.1307, 0.2724, 0.2283, 0.

In [145]:
result = lsvcModel.transform(test_dumb)


In [153]:
result.count()

86

In [160]:
#We present the prediction for the first element from TEST.
result.select('prediction').collect()[0]

Row(prediction=0.0)

# Trying it with Pandas...
(unfortunately we went busted with this approach due to meemory issues. We do believe a possible alternative will be to process this all in batches or to increase the memory allowance for a single node. We do not believe that this could be a sound approach given the size of the dataset and the solution we need to deliver.)

In [68]:
import numpy as np
features = train.select('features')
label = train.select('label')
#features = features.collect()

In [69]:
#Non-linear support vector machine (But not Spark ML, either).
from sklearn.svm import SVC

svc = SVC(kernel="sigmoid", random_state=0, gamma=1, C=1)
model = svc.fit(features, label)

ValueError: setting an array element with a sequence.

In [77]:
spark.stop()