# Local Example

This notebook plays around with spark using data found at https://github.com/astronexus/HYG-Database.
For your convienence the data is copied in this repository but it is quite a large file.

This setup assumes everything is located locally and all the data you are playing with is sitting inside your local development environment.
This is a great way to get started on a large scale job by pulling a subset of data locally on to your machine to play with or to fix issue on a specific file that is causing you trouble.

## System Args

In a real glue executing, few parameters are passed in to your python script.
Since we are running this in a local environment, we will fake these parameters.
The following code will pass in the job name and the temp directory this script should use.

In [1]:
import sys
sys.argv += ['--JOB_NAME', 'glue_script']

## Import Libs

The first thing we do is to import the necessary library for our application.
Since we installed Spark and Glue in the docker container, we are able to load the spark and glue modules here.


In [2]:
# Import in all of the pyspark functions in
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, StructField, StructType, DataType, DecimalType, DoubleType
from pyspark.context import SparkContext

# Import glue module components in
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame

If you executed the above inside the container, you should not get any errors and everything should work!

## Configuration

The first portion just specifies where the input directory is. This container puts your working directory in the root `/workspaces` folder. If you changed the default name of the checked out git repository, update the 2 line below.

In [3]:
INPUT_PATH = "file:///workspaces/aws-glue-job-jupyter-starter/data"
OUTPUT_PATH = "file:///workspaces/aws-glue-job-jupyter-starter/output/"

Finally we load the command line argument and convert it to a format that Glue wants them.

In [4]:
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

## Spark/Glue Context

Now create the Spark and Glue context.
The following is pretty standard in most Glue example you will find.
The only difference is the `getOrCreate()` method we are using.
This method will allow us to rerun this block multiple times within this notebook.
In a real Glue Job, you can simply create the context since you would only ever run it once.

You will also notice the `AWS_REGION` is specified here as an environment variable.
Since Glue library uses the AWS SDK, it will error if this environment variable is missing.

In [5]:
%env AWS_REGION=us-east-1

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sc.getConf().getAll()

env: AWS_REGION=us-east-1


[('spark.driver.port', '35925'),
 ('spark.executor.extraClassPath', '/opt/aws-glue-libs/jarsv1/*'),
 ('spark.app.id', 'local-1597587066157'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.driver.extraClassPath', '/opt/aws-glue-libs/jarsv1/*'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.host', '4e108c19a2ce')]

## Define Structure

Usually you wouldn't have this block of code.
The most common method I use is to run a Glue Crawler and figure out most of this for us.
Then use `ApplyMapping` to adjust any fields that are wrong.

In this section we are configuring what the Glue Crawler would have given us and creating a DataFrame from these information.

In [6]:
# Create the schema structure
schema = StructType([
    StructField("id", IntegerType(), False),              # 1
    StructField("hip", IntegerType(), True),              # 2
    StructField("hd", IntegerType(), True),               # 3
    StructField("hr", IntegerType(), True),               # 4
    StructField("gl", StringType(), True),                # 5
    StructField("bf", StringType(), True),                # 6
    StructField("proper", StringType(), True),            # 7
    StructField("ra", DoubleType(), False),               # 8
    StructField("dec", DoubleType(), False),              # 9
    StructField("dist", DoubleType(), False),             # 10
    StructField("pmra", DoubleType(), False),             # 11
    StructField("pmdec", DoubleType(), False),            # 12
    StructField("rv", DoubleType(), False),               # 13
    StructField("mag", DoubleType(), False),              # 14
    StructField("absmag", DoubleType(), False),           # 15
    StructField("spect", StringType(), False),            # 16
    StructField("ci", DoubleType(), False),               # 17
    StructField("x", DoubleType(), False),                # 18
    StructField("y", DoubleType(), False),                # 19
    StructField("z", DoubleType(), False),                # 20
    StructField("vx", DoubleType(), False),               # 21
    StructField("vy", DoubleType(), False),               # 22
    StructField("vz", DoubleType(), False),               # 23
    StructField("rarad", DoubleType(), False),            # 24
    StructField("decrad", DoubleType(), False),           # 25
    StructField("pmrarad", DoubleType(), False),          # 26
    StructField("pmdecrad", DoubleType(), False),         # 27
    StructField("bayer", StringType(), True),             # 28
    StructField("flam", IntegerType(), True),             # 29
    StructField("con", StringType(), True),               # 30
    StructField("comp", IntegerType(), False),            # 31
    StructField("comp_primary", IntegerType(), True),     # 32
    StructField("base", StringType(), True),              # 33
    StructField("lum", DoubleType(), False),              # 34
    StructField("var", StringType(), True),               # 35
    StructField("var_min", DoubleType(), True),           # 36
    StructField("var_max", DoubleType(), True),           # 37
])


hygdata_v3_df = spark.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option('quote', '"') \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .option("columnNameOfCorruptRecord", "_corrupt_column") \
    .option("mode", "PERMISSIVE") \
    .schema(schema) \
    .csv(INPUT_PATH)

## Create DynamicFrame
 
Finally we create a DynamicFrame from the Spark DataFrame 🎉.
You have a DynamicFrame running locally without spending those pretty pennies!
All lines that comes after this would be more or less the same as what you would run inside your Glue Job.

In [7]:
hygdata_v3_table = DynamicFrame.fromDF(hygdata_v3_df, glueContext, 'hygdata_v3')

## Manipulate Data
   
As an example from the data we have loaded, lets way we want to only keep stars that are close to our sun's luminosity (means something close to 1.0).
We can do this by using the DynamicFrame's filter method.

In [8]:
print("Original {}".format(hygdata_v3_table.toDF().count()))
filtered_data = hygdata_v3_table.filter(f = lambda x: x['lum'] >= 0.99 and x['lum'] <= 1.1, transformation_ctx="filtered_data")
print("Filtered {}".format(filtered_data.toDF().count()))

Original 119614
Filtered 705


Lets add calculate the light years.
The readme for the data mentios that anything above 100000 or negative is bad data.
We filter out items that we don't care about first, then calcualte the light year.

In [9]:
removed_bad_data = filtered_data.filter(f = lambda x: x['dist'] is not None and x['dist'] < 100000 and x['dist'] >=0, transformation_ctx="removed_bad_data")
print("Filtered bad data: {}".format(removed_bad_data.toDF().count()))

Filtered bad data: 705


In [10]:
def add_light_year(d):
    d['light_year'] = d['dist'] * 3.262
    return d
with_light_year = removed_bad_data.map(f=add_light_year, transformation_ctx="with_light_year")
with_light_year.select_fields(["id", "hr", "bf", "dist", "light_year"]).toDF().orderBy("light_year").show()

+------+----+---------+-------+------------------+
|    id|  hr|       bf|   dist|        light_year|
+------+----+---------+-------+------------------+
|     0|null|     null|    0.0|               0.0|
| 15333|1010|  Zet2Ret|12.0322|        39.2490364|
| 80095|6094|     null|12.7779|        41.6815098|
| 79431|6060|18    Sco|13.9005|45.343430999999995|
| 42319|3391| 3Pi 1UMa|14.3554|46.827314799999996|
| 74952|5699|  Nu 2Lup|14.8126|        48.3187012|
| 19028|1262|39    Tau|16.9377|        55.2507774|
| 62014|4845|10    CVn|17.3762|        56.6811644|
| 43602|3538|     null|17.3853|56.710848600000006|
| 98643|7644|     null|17.7274|        57.8267788|
| 98605|7683|     null|18.7899|        61.2926538|
| 84783|6465|     null|19.5236|63.685983199999995|
|  1799|  88| 9    Cet|20.8638| 68.05771560000001|
|119398|null|     null|20.9205|         68.242671|
|101710|7914|     null|20.9468|        68.3284616|
|117998|null|     null|21.0482|        68.6592284|
| 18366|null|     null|21.9829|

The first entry is our sun so no surprise that it shows up at the top.
Then we find our closes start that has a simlar light output as our sun (https://en.wikipedia.org/wiki/Zeta_Reticuli).

## Saving

Finally we want to save the file somewhere so we can use it.

Since we are purely working in local environment, we simply use spark to save the data out.
As far as I know, the DynamicFrame does not have a way to save to local disk.

In [11]:
column_order_fixed = with_light_year.toDF().select(hygdata_v3_df.columns + ['light_year'])
column_order_fixed.write.mode("overwrite").csv(OUTPUT_PATH)