This is a dataset of unidentified flying objects (UFO) reports collected by a US based ufology organisation. The data has been scraped from teh National UFO Research Center (NUFORC) and is quite raw.

Source: https://data.world/timothyrenner/ufo-sightings  


The data primary though not exclusively covers US sightings or events in a period from 1969-2019.

Though the source data is in CSV format its not well formatted, some columns of interest are ambiguous, clean-up is required.

The column named stats is a misnomer - its not a statistic type column despite the name

City is maybe city or a combination of city and country  
Correct alignment of location - the data is skewed towards a US audience 
Lat/Long is available in many of the records, though the level of accuracy is not clear  
Duration column is free text not an useable duration, aim to create useable duration field for calculations and future analysis etc  

Report link column could function as an index if required though the leading URL component will be stripped out.


In [13]:
import re
import string
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace,col, lower

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import functions as F

from pyspark.sql.functions import udf
from pyspark.sql.functions import expr

sparkSession = SparkSession.builder.appName("ufo-data").getOrCreate()

In [14]:
sourcefile = '/datain/nuforc_reports.csv' 
#sourcefile = '/datain/subset.csv'

uin = sparkSession.read.csv(sourcefile , header=True)

#Review the schema
uin.printSchema()

# Quick look at the data
uin.show()


root
 |-- summary: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- shape: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- stats: string (nullable = true)
 |-- report_link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- city_latitude: string (nullable = true)
 |-- city_longitude: string (nullable = true)

+--------------------+--------------------+-----+-------------------+---------+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             summary|                city|state|          date_time|    shape|      duration|               stats|         report_link|                text|              posted|       city_latitude|      city_longitude|
+--------------------+--------------------+-----+-------------------+--------

In [15]:
# Review the schema prior to clean up
uin.printSchema()
uin = uin.drop("summary")
uin = uin.drop("posted")

# After
uin.printSchema()

# Remove long URL component from link
uin = uin.withColumn('report_link', regexp_replace('report_link', 'http://www.nuforc.org/webreports/', ''))
uin = uin.withColumn('report_link', regexp_replace('report_link', '.html', ''))

uin.show()


root
 |-- summary: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- shape: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- stats: string (nullable = true)
 |-- report_link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- posted: string (nullable = true)
 |-- city_latitude: string (nullable = true)
 |-- city_longitude: string (nullable = true)

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- date_time: string (nullable = true)
 |-- shape: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- stats: string (nullable = true)
 |-- report_link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- city_latitude: string (nullable = true)
 |-- city_longitude: string (nullable = true)

+--------------------+-----+-------------------+---------+--------------+--------------------+-----------+---------

In [16]:
# Note, not using dictionary as the needs to applied in a specific order
# Idea is to normalise the data into a reasonable format then evaluate the resulting 
# expression to create a seconds based duration value.

#First step is to normalise seonds, hours
dictionary_list = (
    'second:sec',
    'secs:sec',
    'asec:1sec',
    'sec:/zec/',
    's:',
    'ahr:1hr',
    'hour:hr',
    'ahr:1hr',
    'minute:min',
    'amin:1min',
    'hr:/hr/',
    'min:/min/',
    'zec:sec',
    'one:1',
    'two:2',
    'three:3',
    'four:4',
    'five:5',
    'six:6',
    'seven:7',
    'eight:8',
    'nine:9',
    'couple:2',
    'few:3',
    'several:4',
    'approx:1'
)

def duration_clean(a):
    
    # Use try/catch exception to handle non-conforming data, setting it to blank
    try:
        # Some initial cleaning & normalisation
        a = a.replace(" ","")
        a = a.replace("-","_")
        # = a.lower()
        a = re.sub("[.]","__",a)
        a = re.sub("\W","",a )
        # \W - matches any non-word character (equal to [^a-zA-Z0-9_])
        a = a.replace("__",".")

        # Substitions
        for i in dictionary_list:
            r = i.split(":")
            a = a.replace(r[0], r[1])

        # remove x-y approximations in favour of y
        a = re.sub("\d+_","",a )
    
    except:
        a = ''

    return a


# register user defined function
duration_clean_udf = udf(duration_clean)

# spark.table('uin_table').select(F.lower('duration').alias('duration')).show()
uin = uin.withColumn("dur2", duration_clean_udf(uin.duration)) 

#example
#uin.show()
#uin.printSchema()

uin.show()


+--------------------+-----+-------------------+---------+--------------+--------------------+-----------+--------------------+--------------------+--------------------+----------+
|                city|state|          date_time|    shape|      duration|               stats|report_link|                text|       city_latitude|      city_longitude|      dur2|
+--------------------+-----+-------------------+---------+--------------+--------------------+-----------+--------------------+--------------------+--------------------+----------+
|             Chester|   VA|2019-12-12T18:43:00|    light|     5 seconds|Occurred : 12/12/...|151/S151739|My wife was drivi...|  37.343151515151526|    -77.408581818182|    5/sec/|
|          Rocky Hill|   CT|2019-03-22T18:30:00|   circle|   3-5 seconds|Occurred : 3/22/2...|145/S145297|I think that I ma...|   41.66480000000001|   -72.6392999999999|    5/sec/|
|                null| null|               null|     null|          null|Occurred : 4/1/20...|1

In [17]:
# Set up the evalualtion by creating a calculation

def calc_duration(a): 
    try: 
        b = a.replace('/min/',' * 1 * 60 +')
        b = b.replace('/sec/',' * 1 +')
        b = b.replace('/hr/',' * 1 * 3600 +')
        b = b.strip('+')
        b = eval(b)
    except:
        b = eval("0")
    
    return b  

calc_duration_udf = udf(calc_duration)

# duration = uin.select("dur2")
uin = uin.withColumn("calc_duration", calc_duration_udf("dur2")) 

uin.show()


+--------------------+-----+-------------------+---------+--------------+--------------------+-----------+--------------------+--------------------+--------------------+----------+-------------+
|                city|state|          date_time|    shape|      duration|               stats|report_link|                text|       city_latitude|      city_longitude|      dur2|calc_duration|
+--------------------+-----+-------------------+---------+--------------+--------------------+-----------+--------------------+--------------------+--------------------+----------+-------------+
|             Chester|   VA|2019-12-12T18:43:00|    light|     5 seconds|Occurred : 12/12/...|151/S151739|My wife was drivi...|  37.343151515151526|    -77.408581818182|    5/sec/|            5|
|          Rocky Hill|   CT|2019-03-22T18:30:00|   circle|   3-5 seconds|Occurred : 3/22/2...|145/S145297|I think that I ma...|   41.66480000000001|   -72.6392999999999|    5/sec/|            5|
|                null| nu

Run a brief sanity check on the duration.  

This by removing all numeric characters and grouping the outcome. There may be several cycles of this to identify problems. Done in this way in order to view any error durations that may have been missed by the process above.  

If that is the case, the string should be feed back into the dictionary/list above for rationalisation.  



In [18]:
# This is a manual process until the calc_duration field is empty.

temp = uin.select("calc_duration")

temp = temp.withColumn('calc_duration', regexp_replace('calc_duration', '\d', ''))
#temp = temp.withColumn('dur2', regexp_replace('dur2', r'(\d+)', ''))

temp = temp.filter(temp.calc_duration.isNotNull())

temp.groupBy('calc_duration').count().sort('count', ascending=False).show()


+-------------+-----+
|calc_duration|count|
+-------------+-----+
|             |87233|
|            .|  894|
+-------------+-----+



In [19]:
# A similar process required for co-ordinate data lat/long, except want to simply remove non-numeric data

uin = uin.withColumn('city_latitude', regexp_replace('city_latitude', "[^0-9.]", ''))
uin = uin.withColumn('city_longitude', regexp_replace('city_longitude', "[^0-9.-]", ''))

# 
uin.head()

Row(city='Chester', state='VA', date_time='2019-12-12T18:43:00', shape='light', duration='5 seconds', stats='Occurred : 12/12/2019 18:43  (Entered as : 12/12/19 18:43) Reported: 12/19/2019 7:19:31 PM 19:19 Posted: 12/22/2019 Location: Chester, VA Shape: Light Duration:5 seconds', report_link='151/S151739', text='My wife was driving southeast on a fairly populated main side road, it was dark out side at about 6:43pm, And my wife exclaimed” falling star baby look quick!” When I looked up I saw not a falling star but a bright ball of light , one that was closer than any shooting star I have ever seen, it had a blue glow  changing into green colors of light as it fell from the sky. It fell as if falling from an invisible opening in the sky... the night was a crystal clear night sky  so no clouds or precipitation to obstruct our view and the object was closer than any I have ever witnessed before. The way the object fell was too slow to be a meteor or falling star, also noting that there wa

In [20]:
# Get rid of dur2, as no longer required
uin = uin.drop("dur2")

uin.describe().show()

uin.count()

uin = uin.withColumnRenamed('shape', 'shape'.lower())
uin = uin.withColumn('shape', regexp_replace('shape', r'(\d+)', ''))

uin.groupBy('shape').count().sort('count', ascending=False).show(30)

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|summary|                city|               state|           date_time|               shape|            duration|               stats|         report_link|                text|       city_latitude|      city_longitude|    calc_duration|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+
|  count|               87890|               82913|               86931|               85625|               84964|               88077|               88121|               88069|               73575|               73366|            88127|
|   mean|                null|                nu

In [21]:
# Retain only the first word
uin = uin.withColumn(
    'word_shape', 
    expr(r"regexp_extract(shape, '\\w+', 0)")
)

uin.count()


88127

In [22]:

shape_array = ['light','circle','triangle','fireball','unknown','sphere','other','disk','oval','formation','0','changing','cigar','flash','rectangle','cylinder','diamond','chevron','teardrop','egg','cone','cross']

uin = uin.filter(uin.word_shape.isin(*shape_array) == True)

uin = uin.drop("shape")
uin = uin.withColumnRenamed("word_shape", "shape")

uin.count()


85238

In [23]:
# Deal with shape in similar fashion



#uin = uin.drop("shape")
uin.show()

+--------------------+-----+-------------------+--------------+--------------------+-----------+--------------------+------------------+-------------------+-------------+---------+
|                city|state|          date_time|      duration|               stats|report_link|                text|     city_latitude|     city_longitude|calc_duration|    shape|
+--------------------+-----+-------------------+--------------+--------------------+-----------+--------------------+------------------+-------------------+-------------+---------+
|             Chester|   VA|2019-12-12T18:43:00|     5 seconds|Occurred : 12/12/...|151/S151739|My wife was drivi...|37.343151515151526|   -77.408581818182|            5|    light|
|          Rocky Hill|   CT|2019-03-22T18:30:00|   3-5 seconds|Occurred : 3/22/2...|145/S145297|I think that I ma...| 41.66480000000001|  -72.6392999999999|            5|   circle|
|              Ottawa|   ON|2019-04-17T02:00:00|    10 seconds|Occurred : 4/17/2...|145/S145697

Change ordering of columns  

Its not always possible to easily manage very inconsistent data like the text in this dataset. Want to gather the more consistently formed and formatted columns to the front, leaving the text column to the final column.  

The benefit is in later processing - if the text data is problematic for any reason, it might be possible to split it up from non-problem data for seperate treatment of different analysis types.



In [24]:

uin = uin.select(["city", "state", "date_time", "shape" , "duration", "calc_duration", "report_link", "city_latitude", "city_longitude", "stats", "text"])

# Display data snippet
uin.show()

# Write the data out to staging 
outfile = '/datain/ufo_stg_files' 

uin.write.format('csv').mode('overwrite').option("header", "true").save(outfile)



+--------------------+-----+-------------------+---------+--------------+-------------+-----------+------------------+-------------------+--------------------+--------------------+
|                city|state|          date_time|    shape|      duration|calc_duration|report_link|     city_latitude|     city_longitude|               stats|                text|
+--------------------+-----+-------------------+---------+--------------+-------------+-----------+------------------+-------------------+--------------------+--------------------+
|             Chester|   VA|2019-12-12T18:43:00|    light|     5 seconds|            5|151/S151739|37.343151515151526|   -77.408581818182|Occurred : 12/12/...|My wife was drivi...|
|          Rocky Hill|   CT|2019-03-22T18:30:00|   circle|   3-5 seconds|            5|145/S145297| 41.66480000000001|  -72.6392999999999|Occurred : 3/22/2...|I think that I ma...|
|              Ottawa|   ON|2019-04-17T02:00:00| teardrop|    10 seconds|           10|145/S145

The data has been cleaned up somewhat.  

Some columns have been removed as they were superfluous.

A newly created duration field has been calcuated based on a user defined function that could be reused and/or modifed to deal with new situations depending on the data.  

Data structure has been modified in consideration of future processing.  

The data has been restored as a staging file.  

