
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [47]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




In [48]:
data=spark.read.csv("s3://sravan-capstone/ugs /usgs_main.csv",header=True,inferSchema=True)





In [49]:
data.describe()

DataFrame[summary: string, latitude: string, longitude: string, depth: string, mag: string, magType: string, nst: string, gap: string, dmin: string, rms: string, net: string, id: string, place: string, type: string, horizontalError: string, depthError: string, magError: string, magNst: string, status: string, locationSource: string, magSource: string]


In [50]:
data.groupby("type").count().show()

+------------------+-----+
|              type|count|
+------------------+-----+
|         explosion|  376|
|         ice quake|   11|
|      quarry blast|  665|
|       other event|    5|
|        earthquake|74752|
|chemical explosion|    1|
+------------------+-----+


In [51]:
data.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- nst: double (nullable = true)
 |-- gap: double (nullable = true)
 |-- dmin: double (nullable = true)
 |-- rms: double (nullable = true)
 |-- net: string (nullable = true)
 |-- id: string (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- place: string (nullable = true)
 |-- type: string (nullable = true)
 |-- horizontalError: double (nullable = true)
 |-- depthError: double (nullable = true)
 |-- magError: double (nullable = true)
 |-- magNst: double (nullable = true)
 |-- status: string (nullable = true)
 |-- locationSource: string (nullable = true)
 |-- magSource: string (nullable = true)


In [53]:
data.show(5,False)

+-----------------------+----------+------------+-----+----+-------+----+-----+--------+----+---+------------+-----------------------+----------------------------------+----------+---------------+----------+--------+------+---------+--------------+---------+
|time                   |latitude  |longitude   |depth|mag |magType|nst |gap  |dmin    |rms |net|id          |updated                |place                             |type      |horizontalError|depthError|magError|magNst|status   |locationSource|magSource|
+-----------------------+----------+------------+-----+----+-------+----+-----+--------+----+---+------------+-----------------------+----------------------------------+----------+---------------+----------+--------+------+---------+--------------+---------+
|2022-03-04 21:28:02.44 |38.7596664|-122.7196655|1.61 |1.24|md     |14.0|115.0|0.004494|0.04|nc |nc73701241  |2022-03-04 21:29:36.906|3km SW of Anderson Springs, CA    |earthquake|0.3            |0.36      |0.1     |5.0   |

In [54]:
data=data.na.drop()




In [55]:
data=data.drop("place","time","magSource")




In [56]:
data=data.withColumnRenamed("updated","time")




In [57]:
data=data.withColumnRenamed("locationSource","source")




In [58]:
data.show()

+-----------------+-------------------+-----+----+-------+----+-----+--------+----+---+----------+--------------------+------------+---------------+----------+-----------------+------+---------+------+
|         latitude|          longitude|depth| mag|magType| nst|  gap|    dmin| rms|net|        id|                time|        type|horizontalError|depthError|         magError|magNst|   status|source|
+-----------------+-------------------+-----+----+-------+----+-----+--------+----+---+----------+--------------------+------------+---------------+----------+-----------------+------+---------+------+
|       38.7596664|       -122.7196655| 1.61|1.24|     md|14.0|115.0|0.004494|0.04| nc|nc73701241|2022-03-04 21:29:...|  earthquake|            0.3|      0.36|              0.1|   5.0|automatic|    nc|
|       38.8338318|       -122.8154984| 1.82|1.13|     md|22.0| 66.0| 0.01632|0.02| nc|nc73701236|2022-03-04 21:29:...|  earthquake|           0.19|      0.53|             0.14|   4.0|automati

In [59]:
from pyspark.sql.functions import *





In [60]:
from pyspark.ml.feature import StringIndexer





In [61]:
l=["magType","net","type","source"]
#indexer=StringIndexer(inputCol=["magType","net","type","source"],outputCol=["magType1","net1","type1","source1"],handleInvalid="keep",stringOrderType="frequencyDesc")





In [62]:
indexer = [
StringIndexer(inputCol=c, outputCol="{0}1".format(c))
for c in l
]




In [63]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import unix_timestamp





In [64]:
# Convert the "time" column to the number of seconds since the Unix epoch
data = data.withColumn("time", unix_timestamp(data["time"]) / 1000)
va=VectorAssembler(inputCols=["latitude","longitude","depth","magType1","net1","mag","nst","time"],outputCol="features")

pipeline = Pipeline(stages=indexer + [va])
df_tfm=pipeline.fit(data).transform(data)
train, test = df_tfm.randomSplit([0.7, 0.3])
num_rows_train = train.count()
num_cols_train = len(train.columns)




In [65]:
print("Training:",num_rows_train,"x",num_cols_train)


Training: 25645 x 24


In [66]:
num_rows_test = test.count()
num_cols_test = len(test.columns)




In [67]:
print("Training:",num_rows_test,"x",num_cols_test)


Training: 11128 x 24


In [68]:
df_tfm.columns
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(df_tfm)
train=scaler_model.transform(df_tfm)
test=scaler_model.transform(test)
train.show(3,False )

+----------+------------+-----+----+-------+----+-----+--------+----+---+----------+-----------+----------+---------------+----------+--------+------+---------+------+--------+----+-----+-------+------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|latitude  |longitude   |depth|mag |magType|nst |gap  |dmin    |rms |net|id        |time       |type      |horizontalError|depthError|magError|magNst|status   |source|magType1|net1|type1|source1|features                                                    |scaled_features                                                                                                                                            |
+----------+------------+-----+----+-------+----+-----+--------+----+---+----------+-----------+----------+---------------+----------+--------+------+---------+------+-------

In [69]:
from pyspark.ml.classification import LogisticRegression





In [70]:
log=LogisticRegression(featuresCol='scaled_features',labelCol='type1')
lrmodel=log.fit(train)
prediction=lrmodel.transform(test)
test.show(3)

+--------+---------+-----+----+-------+----+-----+------+----+---+------------+-----------+----------+---------------+----------+--------+------+--------+------+--------+----+-----+-------+--------------------+--------------------+
|latitude|longitude|depth| mag|magType| nst|  gap|  dmin| rms|net|          id|       time|      type|horizontalError|depthError|magError|magNst|  status|source|magType1|net1|type1|source1|            features|     scaled_features|
+--------+---------+-----+----+-------+----+-----+------+----+---+------------+-----------+----------+---------------+----------+--------+------+--------+------+--------+----+-----+-------+--------------------+--------------------+
| 17.1625|  -68.232| 57.0|3.97|     md|23.0|258.0|1.3595|0.54| pr|pr2022081001|1647942.683|earthquake|           3.35|     10.75|     0.1|   7.0|reviewed|    pr|     1.0| 6.0|  0.0|    6.0|[17.1625,-68.232,...|[1.24474005280910...|
| 17.9153| -66.8736| 13.0|3.66|     md|23.0|195.0|0.0601|0.15| pr|pr2022

In [73]:
from pyspark.ml.evaluation import RegressionEvaluator





In [74]:
test.groupby("type").count().show()


+------------------+-----+
|              type|count|
+------------------+-----+
|         explosion|   94|
|      quarry blast|  147|
|       other event|    1|
|        earthquake|10885|
|chemical explosion|    1|
+------------------+-----+


In [75]:
train.groupby("type").count().show()


+------------------+-----+
|              type|count|
+------------------+-----+
|         explosion|  370|
|      quarry blast|  501|
|       other event|    2|
|        earthquake|35899|
|chemical explosion|    1|
+------------------+-----+


In [76]:
# Use the MulticlassClassificationEvaluator to evaluate the model's accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator




In [77]:
evaluator = MulticlassClassificationEvaluator(labelCol="type1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Accuracy:", accuracy)

Accuracy: 0.9758267433501079


In [79]:
# Select the "prediction" and "label" columns
predictions_df = prediction.select(["prediction", "type1"])




In [80]:
# Convert the predictions and labels to Pandas dataframes for easier inspection
predictions_pd = predictions_df.toPandas()




In [81]:
# Print the first 10 predictions and their corresponding true labels
print(predictions_pd.head(10))

   prediction  type1
0         0.0    0.0
1         0.0    0.0
2         0.0    0.0
3         0.0    0.0
4         0.0    0.0
5         0.0    0.0
6         0.0    0.0
7         0.0    0.0
8         0.0    0.0
9         0.0    0.0


In [82]:
# Set the hyperparameters for the logistic regression model
lr = LogisticRegression(labelCol='type1', featuresCol='features')




In [83]:
# Fit the model to the training data
lr_model = lr.fit(train)




In [84]:
# Make predictions on the test data
predictions = lr_model.transform(test)
# Save the model to a file
#lr_model.save("logistic_regression_model1")





In [None]:
# Load the saved model
#loaded_model = LogisticRegression.load("/content/logistic_regression_model1")

In [85]:

accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.9757368799424874


In [86]:
from pyspark.ml.classification import RandomForestClassifier
rand=RandomForestClassifier(featuresCol='scaled_features',labelCol='type1')
rmodel=rand.fit(train)
predictionrand=rmodel.transform(test)




In [87]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="type1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictionrand)
print("Accuracy:", accuracy)

Accuracy: 0.9883177570093458


In [88]:
# Select the "prediction" and "label" columns
predictions_df = predictionrand.select(["prediction", "type1"])





In [90]:
# Convert the predictions and labels to Pandas dataframes for easier inspection
predictions_pd = predictions_df.toPandas()




In [91]:
# Print the first 10 predictions and their corresponding true labels
print(predictions_pd.head(10))

   prediction  type1
0         0.0    0.0
1         0.0    0.0
2         0.0    0.0
3         0.0    0.0
4         0.0    0.0
5         0.0    0.0
6         0.0    0.0
7         0.0    0.0
8         0.0    0.0
9         0.0    0.0


In [92]:
# Set the hyperparameters for the logistic regression model
regrand = RandomForestClassifier(labelCol='type1', featuresCol='features',numTrees=100,maxDepth=5)




In [93]:
# Fit the model to the training data
regmodel = regrand.fit(train)

# Make predictions on the test data
predictions = regmodel.transform(test)




In [94]:
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.985981308411215


In [95]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator





In [96]:
# Define the hyperparameters to tune
hyperparameters = [
    {'regParam': [0.1, 0.01, 0.001], 'elasticNetParam': [0.0, 0.5, 1.0]},
    {'regParam': [0.1, 0.01, 0.001], 'elasticNetParam': [0.0, 0.5, 1.0], 'maxIter': [10, 50, 100]}
]
param_grid = ParamGridBuilder().addGrid(log.regParam, hyperparameters[0]['regParam'])\
                               .addGrid(log.elasticNetParam, hyperparameters[0]['elasticNetParam'])\
                               .build()
cv = CrossValidator(estimator=log, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2)
model = cv.fit(train)
model.params
model.bestModel
predictions = model.transform(test)

accuracy = evaluator.evaluate(predictions)
print("Accuracy: ", accuracy)

Accuracy:  0.9781631919482386
