#### Assignment: Using Spark to mine Astro signals

###### Goal: To find the longest repeating cosmological signal in the captured data

###### The datset has a series of signals stored as: ascension (degrees), declination (degrees), time (seconds), frequency (MHz). These are radiofrequency signals captured by an array of instruments scanning a solid angle of the sky. The data is, like almost all real data, a little noisy and has sources of errors. In our case the angular coordinates have 0.1 degrees error, the signal frequency has 0.1 MHZ error and the timestamp error is <0.01s (that is one STD or standard deviation).

###### Your job is to find the source with the most signals (that is "blips", or entries in the datalog). Your target will be found in the same location (within error) of the sky, on the same frequency (within error) chirping for the most blips, regularly spaced in time during that active period.


In [2]:
# Create a Spark session
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.appName("BigDataAnalysis").getOrCreate()
sc = SparkContext.getOrCreate()

In [46]:
#importing necessary libraries
from pyspark.sql.functions import udf, col, desc
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [4]:
# Loading and reading the dataset
rdd = sc.textFile("pulsar.dat")

# Transforming the RDD to a structured format
rdd2 = rdd.map(lambda x: x.split(' ')).map(lambda x: [float(x[0]), float(x[1]), float(x[2]), float(x[3])])
rounded_rdd = rdd2.map(lambda sublist: [round(sublist[0], 1), round(sublist[1], 1), round(sublist[2], 2), round(sublist[3], 1)])

In [19]:
sort_freq = rounded_rdd.sortBy(lambda x:(x[3]))
sort_freq.take(10)

[[88.7, 79.8, 932.71, 1001.0],
 [90.8, 79.1, 2462.08, 1002.2],
 [90.9, 78.9, 2457.41, 1002.2],
 [96.3, 80.4, 803.96, 1002.4],
 [89.2, 75.9, 916.75, 1004.3],
 [88.9, 75.7, 921.66, 1004.4],
 [61.7, 110.4, 1958.6, 1004.4],
 [61.9, 110.1, 1954.87, 1004.8],
 [79.6, 60.1, 2984.53, 1008.6],
 [79.7, 60.1, 2969.28, 1008.7]]

In [20]:
#creating a dataframe                                                                          
df = spark.createDataFrame(sort_freq, ["ascension", "declination", "time", "frequency"])

In [21]:
#creating an user_defined function where we iterate through a sorted column, and group values that are within 4 standard deviations (0.4) of each other
def assign_pivot(value):
    global pivot
    if round(value, 1) <= round(pivot, 1) + 0.4:
            return round(pivot, 1)
 
    else:
        pivot = round(value, 1)
        return round(pivot, 1)
 
assign_pivot_udf = udf(assign_pivot, DoubleType())

In [23]:
#starting for “frequency” first
 
pivot = df.select("frequency").first()[0]
df1 = df.withColumn("new_frequency", assign_pivot_udf(df["frequency"]))
df1.show()

+---------+-----------+-------+---------+-------------+
|ascension|declination|   time|frequency|new_frequency|
+---------+-----------+-------+---------+-------------+
|     88.7|       79.8| 932.71|   1001.0|       1001.0|
|     90.8|       79.1|2462.08|   1002.2|       1002.2|
|     90.9|       78.9|2457.41|   1002.2|       1002.2|
|     96.3|       80.4| 803.96|   1002.4|       1002.2|
|     89.2|       75.9| 916.75|   1004.3|       1004.3|
|     88.9|       75.7| 921.66|   1004.4|       1004.3|
|     61.7|      110.4| 1958.6|   1004.4|       1004.3|
|     61.9|      110.1|1954.87|   1004.8|       1004.8|
|     79.6|       60.1|2984.53|   1008.6|       1008.6|
|     79.7|       60.1|2969.28|   1008.7|       1008.6|
|     79.8|       60.1|2976.91|   1008.7|       1008.6|
|    118.5|       69.5|1556.05|   1009.2|       1009.2|
|     68.9|       72.2|2268.91|   1009.2|       1009.2|
|    118.7|       69.2|1563.76|   1009.4|       1009.2|
|    108.7|      110.4| 299.14|   1009.6|       

In [24]:
#trying for “ascension” next

df1 = df1.sort("ascension")
pivot = df1.select("ascension").first()[0]
assign_pivot_udf = udf(assign_pivot, DoubleType())
 
df2 = df1.withColumn("new_ascension", assign_pivot_udf(df1["ascension"]))
df2 = df2.cache()
df2.show()

+---------+-----------+-------+---------+-------------+-------------+
|ascension|declination|   time|frequency|new_frequency|new_ascension|
+---------+-----------+-------+---------+-------------+-------------+
|     59.9|      110.8|1113.25|   1116.7|       1116.7|         59.9|
|     59.9|       64.9|1546.98|   1243.1|       1242.9|         59.9|
|     59.9|       70.5|2175.44|   1306.2|       1306.2|         59.9|
|     59.9|      107.6|3254.65|   2502.8|       2502.8|         59.9|
|     59.9|      110.6|1093.51|   3334.7|       3334.4|         59.9|
|     59.9|       66.5|1941.39|   3546.1|       3546.0|         59.9|
|     59.9|      119.0|3581.48|   4525.6|       4525.6|         59.9|
|     59.9|      116.7| 1124.8|   5629.3|       5629.3|         59.9|
|     59.9|       95.3|2385.33|   6413.9|       6413.7|         59.9|
|     59.9|       89.5|3489.34|   7152.2|       7152.0|         59.9|
|     59.9|       96.5|1790.79|   7531.6|       7531.6|         59.9|
|     60.0|      110

In [25]:
#trying for declination next

df2 = df2.sort("declination")
pivot = df2.select("declination").first()[0]
assign_pivot_udf = udf(assign_pivot, DoubleType())
 
df3 = df2.withColumn("new_declination", assign_pivot_udf(df2["declination"]))
df3 = df3.cache()
df3.show()

+---------+-----------+-------+---------+-------------+-------------+---------------+
|ascension|declination|   time|frequency|new_frequency|new_ascension|new_declination|
+---------+-----------+-------+---------+-------------+-------------+---------------+
|     76.4|       59.9|2601.51|   6051.2|       6051.2|         76.4|           59.9|
|     64.7|       60.0|1750.62|   5334.4|       5334.2|         64.4|           59.9|
|     74.7|       60.0|2413.95|   7426.7|       7426.4|         74.4|           59.9|
|     86.6|       60.0| 452.18|   5466.6|       5466.6|         86.4|           59.9|
|    102.7|       60.0| 807.69|   3186.2|       3186.2|        102.4|           59.9|
|    118.8|       60.0|2495.26|   4536.9|       4536.8|        118.4|           59.9|
|     70.8|       60.1|2014.15|   6850.1|       6850.1|         70.4|           59.9|
|     74.8|       60.1|2408.05|   7426.6|       7426.4|         74.4|           59.9|
|     75.1|       60.1|3472.79|   3696.5|       3696.5

In [27]:
#using groupBy function to group signals with similar frequency, ascension, and declination
grouped_df = df3.groupBy("new_frequency", "new_ascension", "new_declination").count().sort(desc("count"))
grouped_df.show()

+-------------+-------------+---------------+-----+
|new_frequency|new_ascension|new_declination|count|
+-------------+-------------+---------------+-----+
|       2231.1|        104.4|           81.9|   26|
|       7877.6|        117.9|           62.9|   18|
|       4999.8|         93.9|          114.9|   17|
|       6925.4|         86.9|           66.4|   12|
|       1413.4|        105.4|           96.4|    9|
|       2874.4|         97.9|          107.9|    9|
|       3277.0|         60.9|           60.4|    9|
|       5448.5|        109.9|           75.4|    8|
|       3782.1|         62.9|          105.9|    7|
|       7591.8|        118.9|          103.9|    6|
|       5783.0|        113.9|           97.9|    6|
|       7121.0|        110.4|          108.4|    6|
|       5813.2|         98.9|           79.4|    6|
|       7886.1|        115.4|           87.9|    6|
|       1542.3|         64.9|           85.4|    6|
|       4783.8|        117.9|           99.4|    6|
|       4915

From the output of grouped_df, we can see that signals that appear at 2231.1 frequency with 104.4 ascension and 82.1 declination occur the most times (26 times). The next highest number of signals occur around 18 times at 2 different combinations of frequency, ascension, and declination. 

As long as we can show that there are more than 18 signals happening at the first combination of parameters, in equal intervals, we can confirm that, the first signal will form the longest signal. 

In [29]:
filtered_df = df3.filter((col("new_frequency") == "2231.1") & (col("new_ascension") == "104.4") & (col("new_declination") == "81.9"))
filtered_df.sort("time").show()

+---------+-----------+-----+---------+-------------+-------------+---------------+
|ascension|declination| time|frequency|new_frequency|new_ascension|new_declination|
+---------+-----------+-----+---------+-------------+-------------+---------------+
|    104.5|       82.1|800.0|   2231.5|       2231.1|        104.4|           81.9|
|    104.6|       82.1|802.2|   2231.4|       2231.1|        104.4|           81.9|
|    104.5|       82.0|804.4|   2231.5|       2231.1|        104.4|           81.9|
|    104.6|       81.9|808.8|   2231.4|       2231.1|        104.4|           81.9|
|    104.5|       82.2|811.0|   2231.3|       2231.1|        104.4|           81.9|
|    104.5|       82.1|813.2|   2231.4|       2231.1|        104.4|           81.9|
|    104.5|       82.1|815.4|   2231.5|       2231.1|        104.4|           81.9|
|    104.7|       82.0|817.6|   2231.5|       2231.1|        104.4|           81.9|
|    104.6|       82.0|819.8|   2231.4|       2231.1|        104.4|         

When we filter through our dataset with our first combination of parameters, we can see that there is a nice time interval of 2.2 seconds between each signal, but they are not consistent, and we can see some missing timestamps in the data frame. 

Hence, we are going to expand our range to see if there are any other points that fall in this expanded range(which is just plus/minus 1 of the combinations we have found above).

In [37]:
filtered_df1 = df3.filter((col("new_frequency") >= 2230.0) & (col("new_frequency") <= 2232.0) & (col("new_ascension") >= 103.0) & (col("new_ascension") <= 105.0) & (col("new_declination") >= 81.0) & (col("new_declination") <= 83.0))
filtered_df1.count()

34

This expansion of range gave 34 counts, which is more than 26 counts we got initially. This is where most of our missing timestamps could been hidden/can be found. 

In [45]:
filtered_df1 = df3.filter(
    (col("new_frequency") >= 2230.0) & (col("new_frequency") <= 2232.0) &
    (col("new_ascension") >= 103.0) & (col("new_ascension") <= 105.0) &
    (col("new_declination") >= 81.0) & (col("new_declination") <= 83.0)
)

filtered_df1.show()

+---------+-----------+-----+---------+-------------+-------------+---------------+
|ascension|declination| time|frequency|new_frequency|new_ascension|new_declination|
+---------+-----------+-----+---------+-------------+-------------+---------------+
|    104.4|       81.9|841.8|   2231.2|       2231.1|        104.4|           81.9|
|    104.6|       81.9|808.8|   2231.4|       2231.1|        104.4|           81.9|
|    104.4|       82.0|863.8|   2231.5|       2231.1|        104.4|           81.9|
|    104.5|       82.0|859.4|   2231.5|       2231.1|        104.4|           81.9|
|    104.5|       82.0|804.4|   2231.5|       2231.1|        104.4|           81.9|
|    104.5|       82.0|837.4|   2231.6|       2231.6|        104.4|           81.9|
|    104.6|       82.0|819.8|   2231.4|       2231.1|        104.4|           81.9|
|    104.7|       82.0|817.6|   2231.5|       2231.1|        104.4|           81.9|
|    104.4|       82.1|857.2|   2231.5|       2231.1|        104.4|         

Since we are using Pyspark, we are going to use an in-built Pyspark function (F.lag) to calculate the time interval between each timestamp

In [44]:
#define UDF
def calculate_interval(value, previous_value):
    if previous_value is not None:
        return round(value - previous_value, 1)
    else:
        return None

calculate_interval_udf = udf(calculate_interval, DoubleType())
window_spec = Window.orderBy("time")

+---------+-----------+-----+---------+-------------+-------------+---------------+-------------+
|ascension|declination| time|frequency|new_frequency|new_ascension|new_declination|time_interval|
+---------+-----------+-----+---------+-------------+-------------+---------------+-------------+
|    104.5|       82.1|800.0|   2231.5|       2231.1|        104.4|           81.9|         NULL|
|    104.6|       82.1|802.2|   2231.4|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.0|804.4|   2231.5|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.1|806.6|   2231.7|       2231.6|        104.4|           81.9|          2.2|
|    104.6|       81.9|808.8|   2231.4|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.2|811.0|   2231.3|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.1|813.2|   2231.4|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82

In [49]:
# Apply the withColumn operation
final_df = filtered_df1.withColumn(
    "time_interval",
    calculate_interval_udf(col("time"), F.lag(col("time")).over(window_spec))
)

# Show the resulting DataFrame
final_df.show()

+---------+-----------+-----+---------+-------------+-------------+---------------+-------------+
|ascension|declination| time|frequency|new_frequency|new_ascension|new_declination|time_interval|
+---------+-----------+-----+---------+-------------+-------------+---------------+-------------+
|    104.5|       82.1|800.0|   2231.5|       2231.1|        104.4|           81.9|         NULL|
|    104.6|       82.1|802.2|   2231.4|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.0|804.4|   2231.5|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.1|806.6|   2231.7|       2231.6|        104.4|           81.9|          2.2|
|    104.6|       81.9|808.8|   2231.4|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.2|811.0|   2231.3|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82.1|813.2|   2231.4|       2231.1|        104.4|           81.9|          2.2|
|    104.5|       82

In [50]:
final_df.count()

34

From the output for final_df above, we can see that the time difference is 2.2 for all the timestamps (look at the "time interval" column). This shows us that these signals are all occurring at equal time intervals (i.e. they are all the same signals)

Final Answer: There are 34blips at location 104, 82 degrees with a frequency of 2231MHZ and a period of 2.2 seconds interval 