# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


####  Run this cell to set up and start your interactive session.


In [1]:
#AWS Glue Studio Notebook
###Run this cell to set up and start your interactive session
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::398712513676:role/AdminRole
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 4502466e-efaf-4d9b-8634-830868fbf5e8
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 4502466e-efaf-4d9b-86

In [2]:
###Libraries
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import time
from datetime import datetime

## ARIMA model
import pmdarima as pm
from pmdarima import auto_arima
from statsmodels.tsa.arima.model import ARIMA
from awsglue.dynamicframe import DynamicFrame

## Evaluation metrics
from sklearn.metrics import mean_squared_error
from math import sqrt




In [3]:
## Start Time 
start_time = int(time.time())
start_time = datetime.fromtimestamp(start_time)
print('Start_time',start_time)

Start_time 2023-04-23 19:31:32


#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [4]:
###Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema
dyf = glueContext.create_dynamic_frame.from_options(
            connection_type="s3",
            connection_options={"paths": ["s3://project.final/weather_data.csv"], "recurse": False},
            format="csv",
            format_options={"withHeader": True},
            transformation_ctx="source_transformation_context",
        )




#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [31]:
###Convert the DynamicFrame to a Spark DataFrame and display a sample of the data¶
df = dyf.toDF()
###Convert Spark DataFrame to Pandas DataFrame
pandasDF = df.toPandas()




In [32]:
pandasDF = pandasDF[['name', 'temp']]
# convert column "a" of a DataFrame
pandasDF['temp'] = pd.to_numeric(pandasDF['temp'])
#Drop na values
pandasDF = pandasDF.dropna()




In [33]:
pandasDF.dtypes

name     object
temp    float64
dtype: object


In [38]:
# create an empty DataFrame for storing predictions and evaluation results for all states
all_states_results = pd.DataFrame(columns=['name', 'Date', 'ARIMA Predictions', 'RMSE', 'MSE'])

# iterate over each state in pandasDF
for state in pandasDF['name'].unique():

    # filter data for the current state
    state_data = pandasDF[pandasDF['name'] == state]

    # convert temperature data to float
    state_data['temp'] = state_data['temp'].astype(float)

    # Data Train Test Split
    train = state_data.iloc[:-30]
    test = state_data.iloc[-30:]

    # ARIMA Model
    model = ARIMA(train['temp'], order=(1, 1, 3))
    model = model.fit()

    # Future Predictions
    index_future_dates = pd.date_range(start='2023-04-15', end='2023-05-15')
    pred = model.predict(start=len(state_data), end=len(state_data)+30, typ='levels').rename('ARIMA Predictions')
    pred.index = index_future_dates
    future_predictions = pd.DataFrame(pred)

    # reset_index
    future_predictions = future_predictions.reset_index()

    # Rename_column 
    future_predictions = future_predictions.rename(columns={'index': 'Date'})

    # add State column
    future_predictions['name'] = state

    # evaluate RMSE and MSE
    rmse = np.sqrt(mean_squared_error(test['temp'], future_predictions['ARIMA Predictions'].iloc[-30:]))
    mse = mean_squared_error(test['temp'], future_predictions['ARIMA Predictions'].iloc[-30:])

    # add predictions and evaluation results to all_states_results DataFrame
    for index, row in future_predictions.iterrows():
        all_states_results = all_states_results.append({'name': state, 'Date': row['Date'], 'ARIMA Predictions': row['ARIMA Predictions'], 'RMSE': rmse, 'MSE': mse}, ignore_index=True)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

In [39]:
all_states_results.head()

      name       Date  ARIMA Predictions      RMSE        MSE
0  Georgia 2023-04-15          11.555208  6.269621  39.308142
1  Georgia 2023-04-16          11.555208  6.269621  39.308142
2  Georgia 2023-04-17          11.555208  6.269621  39.308142
3  Georgia 2023-04-18          11.555208  6.269621  39.308142
4  Georgia 2023-04-19          11.555208  6.269621  39.308142


In [40]:
all_states_results = spark.createDataFrame(all_states_results)




In [41]:
## End Time 
End_time = int(time.time())
end_time = datetime.fromtimestamp(End_time)
print('End_time',end_time)

End_time 2023-04-23 20:09:34


In [42]:
## Uploading future Prediction output to S3
glueContext.write_dynamic_frame.from_options(
    frame = DynamicFrame.fromDF(all_states_results, glueContext, "output"),
    connection_type = "s3",
    connection_options = {"path": "s3://project.final/output/Worker_1/"},
    format = "csv",
    format_options = {"writeHeader": True}
)

<awsglue.dynamicframe.DynamicFrame object at 0x7fb9533e3210>


In [43]:
s3output = glueContext.getSink(
  path="s3://project.final/output/Worker_6/",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)


