In [0]:
from pyspark.sql import Row
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev, log, log10
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.functions import lit

#additional imports
from pyspark.sql.functions import when 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors, Vector
from pyspark.ml.functions import vector_to_array

In [0]:
spark = SparkSession(sc)
sqlc=SQLContext(sc)
print(sc.getConf().getAll())

In [0]:
#import table to dataframe
train_df = spark.read.csv('/FileStore/tables/train_data.csv', header='true')
train_df.show()
train_df.cache()

In [0]:
#neo and pha are both catagorical, turn Y to 1, N to 0
distinctDF = train_df.select('neo').distinct()
distinctDF.show()
distinctDF = train_df.select('pha').distinct()
distinctDF.show()
countDF = (train_df.groupBy('neo').count())
countDF.show()
countDF = (train_df.groupBy('pha').count())
countDF.show()

In [0]:
df1 = train_df.withColumn('neo', when(train_df.neo.isNull(),lit('N')).otherwise(train_df.neo))
countDF = (df1.groupBy('neo').count())
countDF.show()

In [0]:
df1 = df1.withColumn('neo', regexp_replace('neo', 'N', '0'))
df1 = df1.withColumn('neo', regexp_replace('neo', 'Y', '1'))
df1 = df1.withColumn('pha', regexp_replace('pha', 'N', '0'))
df1 = df1.withColumn('pha', regexp_replace('pha', 'Y', '1'))

In [0]:
distinctDF = df1.select('neo').distinct()
distinctDF.show()
distinctDF = df1.select('pha').distinct()
distinctDF.show()
countDF = (df1.groupBy('neo').count())
countDF.show()
countDF = (df1.groupBy('pha').count())
countDF.show()

In [0]:
#class feature has 13 catagories, turn into index and onehot
distinctDF = df1.select('class').distinct()
distinctDF.show()
countDF = (df1.groupBy('class').count())
countDF.show()

In [0]:
indexer = StringIndexer(inputCol = 'class', outputCol = 'class_index')
model = indexer.fit(df1)
indexed = model.transform(df1)
distinctDF = indexed.select('class','class_index').distinct()
distinctDF.show()
countDF = (indexed.groupBy('class','class_index').count())
countDF.show()

In [0]:
encoder = OneHotEncoder(inputCol = 'class_index', outputCol = 'class_binary')
model = encoder.fit(indexed)
encoded = model.transform(indexed)
distinctDF = encoded.select('class','class_index','class_binary').distinct()
distinctDF.show()
countDF = (encoded.groupBy('class','class_index','class_binary').count())
countDF.show()

In [0]:
#useful catagorical strings have been converted, change all schema to double
df3 = encoded.withColumn("diameter",col("diameter").cast(DoubleType())).withColumn("diameter_sigma",col("diameter_sigma").cast(DoubleType())).withColumn("albedo",col("albedo").cast(DoubleType())).withColumn("epoch",col("epoch").cast(DoubleType())).withColumn("epoch_mjd",col("epoch_mjd").cast(DoubleType())).withColumn("epoch_cal",col("epoch_cal").cast(DoubleType())).withColumn("e",col("e").cast(DoubleType())).withColumn("a",col("a").cast(DoubleType())).withColumn("i",col("i").cast(DoubleType())).withColumn("q",col("q").cast(DoubleType())).withColumn("om",col("om").cast(DoubleType())).withColumn("w",col("w").cast(DoubleType())).withColumn("ma",col("ma").cast(DoubleType())).withColumn("ad",col("ad").cast(DoubleType())).withColumn("n",col("n").cast(DoubleType())).withColumn("tp",col("tp").cast(DoubleType())).withColumn("tp_cal",col("tp_cal").cast(DoubleType())).withColumn("per",col("per").cast(DoubleType())).withColumn("per_y",col("per_y").cast(DoubleType())).withColumn("moid",col("moid").cast(DoubleType())).withColumn("moid_ld",col("moid_ld").cast(DoubleType())).withColumn("sigma_e",col("sigma_e").cast(DoubleType())).withColumn("sigma_a",col("sigma_a").cast(DoubleType())).withColumn("sigma_q",col("sigma_q").cast(DoubleType())).withColumn("sigma_i",col("sigma_i").cast(DoubleType())).withColumn("sigma_om",col("sigma_om").cast(DoubleType())).withColumn("sigma_w",col("sigma_w").cast(DoubleType())).withColumn("sigma_ma",col("sigma_ma").cast(DoubleType())).withColumn("sigma_ad",col("sigma_ad").cast(DoubleType())).withColumn("sigma_n",col("sigma_n").cast(DoubleType())).withColumn("sigma_tp",col("sigma_tp").cast(DoubleType())).withColumn("sigma_per",col("sigma_per").cast(DoubleType())).withColumn("rms",col("rms").cast(DoubleType())).withColumn("class_index",col("class_index").cast(DoubleType())).withColumn("H",col("H").cast(DoubleType())).withColumn("neo",col("neo").cast(DoubleType())).withColumn("pha",col("pha").cast(DoubleType()))

In [0]:
#!!!!!UDF not working... simplify to function later
#fill in remaining null with avg of repsective column
df_stats = df3.select(_mean(col('diameter')).alias('mean')).collect()
mean_dia = df_stats[0]['mean']
df4 = df3.withColumn('diameter', when(df3.diameter.isNull(),lit(mean_dia)).otherwise(df3.diameter))
print("mean of diameter",mean_dia)

df_stats = df3.select(_mean(col('diameter_sigma')).alias('mean')).collect()
mean_diasig = df_stats[0]['mean']
df4 = df4.withColumn('diameter_sigma', when(df3.diameter_sigma.isNull(),lit(mean_diasig)).otherwise(df3.diameter_sigma))
print("mean of diameter uncertainty",mean_diasig)

df_stats = df3.select(_mean(col('albedo')).alias('mean')).collect()
mean_alb = df_stats[0]['mean']
df4 = df4.withColumn('albedo', when(df3.albedo.isNull(),lit(mean_alb)).otherwise(df3.albedo))
print("mean of albedo",mean_alb)

df_stats = df3.select(_mean(col('H')).alias('mean')).collect()
mean_H = df_stats[0]['mean']
df4 = df4.withColumn('H', when(df3.H.isNull(),lit(mean_H)).otherwise(df3.H))
print("mean of H",mean_H)

df_stats = df3.select(_mean(col('ad')).alias('mean')).collect()
mean_ad = df_stats[0]['mean']
df4 = df4.withColumn('ad', when(df3.ad.isNull(),lit(mean_ad)).otherwise(df3.ad))
print("mean of ad",mean_ad)
df_stats = df3.select(_mean(col('per')).alias('mean')).collect()
mean_per = df_stats[0]['mean']
df4 = df4.withColumn('per', when(df3.per.isNull(),lit(mean_per)).otherwise(df3.per))
print("mean of per",mean_per)
df_stats = df3.select(_mean(col('per_y')).alias('mean')).collect()
mean_per_y = df_stats[0]['mean']
df4 = df4.withColumn('per_y', when(df3.per_y.isNull(),lit(mean_per_y)).otherwise(df3.per_y))
print("mean of per_y",mean_per_y)
df_stats = df3.select(_mean(col('sigma_ad')).alias('mean')).collect()
mean_sigma_ad = df_stats[0]['mean']
df4 = df4.withColumn('sigma_ad', when(df3.sigma_ad.isNull(),lit(mean_sigma_ad)).otherwise(df3.sigma_ad))
print("mean of sigma_ad",mean_sigma_ad)
df_stats = df3.select(_mean(col('sigma_per')).alias('mean')).collect()
mean_sigma_per = df_stats[0]['mean']
df4 = df4.withColumn('sigma_per', when(df3.sigma_per.isNull(),lit(mean_sigma_per)).otherwise(df3.sigma_per))
print("mean of sigma_per",mean_sigma_per)
df_stats = df3.select(_mean(col('sigma_e')).alias('mean')).collect()
mean_sigma_e = df_stats[0]['mean']
df4 = df4.withColumn('sigma_e', when(df3.sigma_e.isNull(),lit(mean_sigma_e)).otherwise(df3.sigma_e))
print("mean of sigma_e",mean_sigma_e)
df_stats = df3.select(_mean(col('sigma_a')).alias('mean')).collect()
mean_sigma_a = df_stats[0]['mean']
df4 = df4.withColumn('sigma_a', when(df3.sigma_a.isNull(),lit(mean_sigma_a)).otherwise(df3.sigma_a))
print("mean of sigma_a",mean_sigma_a)
df_stats = df3.select(_mean(col('sigma_q')).alias('mean')).collect()
mean_sigma_q = df_stats[0]['mean']
df4 = df4.withColumn('sigma_q', when(df3.sigma_q.isNull(),lit(mean_sigma_q)).otherwise(df3.sigma_q))
print("mean of sigma_q",mean_sigma_q)
df_stats = df3.select(_mean(col('sigma_i')).alias('mean')).collect()
mean_sigma_i = df_stats[0]['mean']
df4 = df4.withColumn('sigma_i', when(df3.sigma_i.isNull(),lit(mean_sigma_i)).otherwise(df3.sigma_i))
print("mean of sigma_i",mean_sigma_i)
df_stats = df3.select(_mean(col('sigma_om')).alias('mean')).collect()
mean_sigma_om = df_stats[0]['mean']
df4 = df4.withColumn('sigma_om', when(df3.sigma_om.isNull(),lit(mean_sigma_om)).otherwise(df3.sigma_om))
print("mean of sigma_om",mean_sigma_om)
df_stats = df3.select(_mean(col('sigma_w')).alias('mean')).collect()
mean_sigma_w = df_stats[0]['mean']
df4 = df4.withColumn('sigma_w', when(df3.sigma_w.isNull(),lit(mean_sigma_w)).otherwise(df3.sigma_w))
print("mean of sigma_w",mean_sigma_w)
df_stats = df3.select(_mean(col('sigma_ma')).alias('mean')).collect()
mean_sigma_ma = df_stats[0]['mean']
df4 = df4.withColumn('sigma_ma', when(df3.sigma_ma.isNull(),lit(mean_sigma_ma)).otherwise(df3.sigma_ma))
print("mean of sigma_ma",mean_sigma_ma)
df_stats = df3.select(_mean(col('sigma_n')).alias('mean')).collect()
mean_sigma_n = df_stats[0]['mean']
df4 = df4.withColumn('sigma_n', when(df3.sigma_n.isNull(),lit(mean_sigma_n)).otherwise(df3.sigma_n))
print("mean of sigma_n",mean_sigma_n)
df_stats = df3.select(_mean(col('sigma_tp')).alias('mean')).collect()
mean_sigma_tp = df_stats[0]['mean']
df4 = df4.withColumn('sigma_tp', when(df3.sigma_tp.isNull(),lit(mean_sigma_tp)).otherwise(df3.sigma_tp))
print("mean of sigma_tp",mean_sigma_tp)
df_stats = df3.select(_mean(col('rms')).alias('mean')).collect()
mean_rms = df_stats[0]['mean']
df4 = df4.withColumn('rms', when(df3.rms.isNull(),lit(mean_rms)).otherwise(df3.rms))
print("mean of rms",mean_rms)


In [0]:
#Additional features "diameter_est" and "psedo_target"
df4.select('diameter','albedo','H').show()

In [0]:
#estimated diameter based on JPL method. link: https://cneos.jpl.nasa.gov/tools/ast_size_est.html
df5 = df4.withColumn('temp_albedo',3.1236 - (0.5 * log10(col('albedo'))))
df5 = df5.withColumn('temp_H', 0.2 * col('H'))
df5 = df5.withColumn('diameter_est', pow(10,(col('temp_albedo')-col('temp_H'))))
df5.select('diameter_est').show()

In [0]:
'''
Among NEOs, the definition of “potentially hazardous
asteroids” (PHAs) is reserved for those objects with absolute
magnitudes H  22 (i.e., diameter above ∼140 m, assuming
the average NEO albedo of 0.14 found by Mainzer et al. 2011a)
and whose minimum orbit intersection distance (MOID) is
within 0.05 au (7.5 million km) of the Earthʼs orbit.
'''

#pseudo_target based on rough classification from literature
df5 = df5.withColumn('pseudo_target', when((col('diameter_est') >=0.13) & (col('H') <=22) & (col('moid') <= 0.05), 1).otherwise(0))
distinctDF = df5.select('pseudo_target','pha').distinct()
distinctDF.show()
countDF = (df5.groupBy('pseudo_target','pha').count())
countDF.show()

In [0]:
#drop unnecessary columns
df5 = df5.drop(col('equinox'))
df5 = df5.drop(col('temp_albedo'))
train_clean_df = df5.drop(col('temp_H'))

In [0]:
#FINAL DF TO BE USED
# train_clean_df.show()
train_clean_df.printSchema()

In [0]:
# train_clean_df.write.format("orc").save("savetable/train_clean.orc")

In [0]:
train_clean_df.schema.names

In [0]:
# Section 2.4 Feature Selection
# first drop all columns that have correlation 1 with some other explanatory variables
# and drop all string column
df6 = train_clean_df.drop(col("epoch_mjd"))
df6 = df6.drop(col("epoch_cal"))
df6 = df6.drop(col("moid"))
df6 = df6.drop(col("q"))
df6 = df6.drop(col("per_y"))
df6 = df6.drop(col("tp_cal"))
df6 = df6.drop(col("sigma_ad"))
df6 = df6.drop(col("sigma_ma"))
df6 = df6.drop(col("sigma_tp"))

df6 = df6.drop(col('id'))
df6 = df6.drop(col('spkid'))
df6 = df6.drop(col('orbit_id'))
df6 = df6.drop(col('full_name'))
df6 = df6.drop(col('pdes'))
df6 = df6.drop(col('name'))
df6 = df6.drop(col('prefix'))
df6 = df6.drop(col('class')) # converted into both one hot and index

In [0]:
df6.printSchema()

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
colList = ["neo","H","diameter","albedo","diameter_sigma","epoch","e","a","i","om","w","ma","ad","n","tp",
           "per","moid_ld","sigma_e","sigma_a","sigma_q","sigma_i","sigma_om","sigma_w",
           "sigma_n","sigma_per","rms","diameter_est","pseudo_target","class_binary"]
assembler = VectorAssembler(inputCols=colList, outputCol="features")
df7 = assembler.transform(df6)

In [0]:
from pyspark.ml.regression import LinearRegression
#from pyspark.mllib.regression import LabeledPoint
#from pyspark.mllib.util import MLUtils

lr = LinearRegression(featuresCol = 'features', labelCol='pha', maxIter=10, regParam=0.00001, elasticNetParam=0.2)

# Fit the model
lrModel = lr.fit(df7)

# Print the weights and intercept for linear regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

In [0]:
# drop variables with coefficients 0. change
df8 = df6
for i in range(len(colList)):
  if lrModel.coefficients[i] == 0 and i <= len(colList)-2:
    df8 = df8.drop(col(colList[i]))

df8 = df8.drop(col("class_index")) # keep one hot vars
train_df_varsel = df8
train_df_varsel.printSchema()