# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

In [14]:
!pip install matplotlib

[33mDEPRECATION: Python 2.7 reached the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 is no longer maintained. pip 21.0 will drop support for Python 2.7 in January 2021. More details about Python 2 support in pip can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support[0m
Defaulting to user installation because normal site-packages is not writeable
Collecting matplotlib
  Downloading matplotlib-2.2.5-cp27-cp27mu-manylinux1_x86_64.whl (12.8 MB)
[K     |████████████████████████████████| 12.8 MB 38.2 MB/s eta 0:00:01
[?25hCollecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Downloading pyparsing-2.4.7-py2.py3-none-any.whl (67 kB)
[K     |████████████████████████████████| 67 kB 7.4 MB/s  eta 0:00:01
[?25hCollecting backports.functools-lru-cache
  Downloading backports.functools_lru_cache-1.6.4-py2.py3-none-any.whl (5.9 kB)
Collecting pytz
  Using cached pytz-2023.3-py2.py3-none-any.whl (502 kB)
Collecting six>=1.10


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 4

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import to_timestamp, from_utc_timestamp
from pyspark.sql.functions import *
#from pyspark.sql.functions import avg, mean, min, max, last, stddev, window, round, cast
  
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 4
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::321914074467:role/Tekraj-AWS-Roles
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 4
Session ID: 06936e71-d8dd-4e9e-a745-58b15240d5d6
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.3
--enable-glue-datacatalog true
Waiting for session 06936e71-d8dd-

In [2]:
s3_bucket = 'tekraj-test2'
preffix = 'sparkCognition_Data_Analysis/Datasets/DataSet/Device1_2020_07_01_00_00_02.969_2020_07_31_23_59_58.110.csv'
s3_path = 's3://'+s3_bucket+'/'+preffix

print('s3_path is : ', s3_path)

df = spark.read.csv(s3_path, header= True, inferSchema= True)

df = df.withColumn("TimeStamp", to_timestamp("TimeStamp"))

#df = df.withColumn("TimeStamp",to_timestamp("TimeStamp",'yyyy-MM-dd HH:mm:ss'))
df.show(10)
df.printSchema()


s3_path is :  s3://tekraj-test2/sparkCognition_Data_Analysis/Datasets/DataSet/Device1_2020_07_01_00_00_02.969_2020_07_31_23_59_58.110.csv
+--------------------+---------+-----+-------+
|           TimeStamp| variable|value| device|
+--------------------+---------+-----+-------+
|2020-07-01 00:00:...|MMXN1_Amp| null|Device1|
|2020-07-01 00:00:...|MMXN1_Amp| null|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|846.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp| null|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|769.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|732.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp| null|Device1|
|2020-07-01 00:00:...|MMXN1_Amp| null|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|644.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp| null|Device1|
+--------------------+---------+-----+-------+
only showing top 10 rows

root
 |-- TimeStamp: timestamp (nullable = true)
 |-- variable: string (nullable = true)
 |-- value: double (nullable = true)
 |-- device: string (nullable = true)


In [5]:
no_of_rows = df.count()




In [11]:
# Counting the number of Null values in 'value' columns 
null_counts = df.filter(col("value").isNull()).count()
print(null_counts)

34164004


In [12]:
percent_of_nulls = null_counts*100.0/(no_of_rows)
print(percent_of_nulls)

88.20707851136501


In [3]:
# Handle NaN values by replacing them with the Median of each variable as the percenatge of Missing values is very High

median_value = df.selectExpr("percentile_approx(value, 0.5)").collect()[0][0]

df_new = df.withColumn("value", when(col("value").isNull(), median_value).otherwise(col("value")))

df_new.show()

+--------------------+---------+-----+-------+
|           TimeStamp| variable|value| device|
+--------------------+---------+-----+-------+
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|846.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|769.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|732.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|644.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|605.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|609.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 00:00:...|MMXN1_Amp|232.0|Device1|
|2020-07-01 0

In [16]:
import matplotlib.pyplot as plt

# Plot the distribution before handling NaN values
variable_dist_before = df.select("variable", "value").toPandas()
plt.scatter(variable_dist_before["variable"], variable_dist_before["value"], alpha=0.5)
plt.xlabel("Variable")
plt.ylabel("Value")
plt.title("Distribution of Variables Before Handling NaN")
plt.show()

# # Save the scatter plot image to an in-memory buffer, as AWS Glue Notebook is
# buffer = io.BytesIO()
# plt.savefig(buffer, format='png')
# buffer.seek(0)

# # Upload the image buffer to S3
# s3 = boto3.client('s3')
# s3.upload_fileobj(buffer, s3_bucket, 'distribution_before.png')

Error: Interpreter died:



In [17]:
# Plot the distribution after handling NaN values
variable_dist_after = df_new.select("variable", "value").toPandas()
plt.scatter(variable_dist_after["variable"], variable_dist_after["value"], alpha=0.5)
plt.xlabel("Variable")
plt.ylabel("Value")
plt.title("Distribution of Variables After Handling NaN")
plt.show()

Error: Interpreter died:



In [8]:
#Counting distinct values in 'variable' column

distinct_variable = df_new.select("variable").distinct()
distinct_variable.count()

30


In [4]:
# Perform 10-minute aggregations for each column

window_duration = "10 minutes"

aggregated_df = df_new.groupBy(window("TimeStamp", window_duration).alias("TimeStamp"), "device", "variable").agg(
    round(avg("value"), 2).alias("avg"),
    round(min("value"), 2).alias("min"),
    max("value").alias("max"),
    last("value").alias("last"),
    round(stddev("value"), 2).alias("stddev")
)


# for column_name in aggregated_df.columns:
#     if column_name not in ["window", "device"]:
#         variable = column_name.split("_")[0]
#         new_column_name = f"{variable}_{column_name}"
#         aggregated_df = aggregated_df.withColumnRenamed(column_name, new_column_name)


# # Show the resulting aggregated dataframe
# aggregated_df.show(truncate=False)


aggregated_df.sort("variable", "device", "window")
aggregated_df.show()

+--------------------+-------+---------+------+-----+---------------+------+------+
|           TimeStamp| device| variable|   avg|  min|            max|  last|stddev|
+--------------------+-------+---------+------+-----+---------------+------+------+
|{2020-07-01 00:00...|Device1|MMXN1_Amp|701.32|232.0|         1936.0|1728.0|551.25|
|{2020-07-01 00:40...|Device1|MMXN1_Amp|480.09|232.0|         1155.0| 232.0|297.68|
|{2020-07-01 03:20...|Device1|MMXN1_Amp| 363.5|232.0|          616.0| 332.0|129.18|
|{2020-07-01 05:40...|Device1|MMXN1_Amp|346.13|232.0|          628.0| 232.0|129.29|
|{2020-07-01 06:50...|Device1|MMXN1_Amp|406.57|232.0|992.42163085938| 232.0|197.48|
|{2020-07-01 07:20...|Device1|MMXN1_Amp| 454.4|232.0|1182.5799560547| 232.0|252.49|
|{2020-07-01 08:10...|Device1|MMXN1_Amp|608.42|232.0|         1251.0| 924.0|378.73|
|{2020-07-01 09:40...|Device1|MMXN1_Amp| 689.5|232.0|         1551.0|1093.0|470.17|
|{2020-07-01 11:10...|Device1|MMXN1_Amp|558.14|232.0|         1344.0| 232.0|

In [13]:
columns = ['TimeStamp', 'device']

for i in df_new.select("variable").distinct().collect():
    variable = i.variable
    
    df2 = df_new.filter(col("variable") == variable).distinct()
    
    # Adding new column with prefix 'variable_' for avg, min, max, last, and stddev
    df2 = df2.withColumn(variable + '_' + 'avg', avg("value"))
    df2 = df2.withColumn(variable + '_' + 'min', min("value"))
    df2 = df2.withColumn(variable + '_' + 'max', max("value"))
    df2 = df2.withColumn(variable + '_' + 'last', last("value"))
    df2 = df2.withColumn(variable + '_' + 'stddev', stddev("value"))
    
    columns.extend([variable + '_avg', variable + '_min', variable + '_max', variable + '_last', variable + '_stddev'])
    

# Select the desired columns and sort the DataFrame
df3 = df_new.select(columns).distinct().sort("variable", "device", "TimeStamp")

df3.show()


AnalysisException: grouping expressions sequence is empty, and '`TimeStamp`' is not an aggregate function. Wrap '(avg(`value`) AS `WROT1_RotSpd_avg`)' in windowing function(s) or wrap '`TimeStamp`' in first() (or first_value) if you don't care which value you get.;
Aggregate [TimeStamp#48, variable#41, value#90, device#43, avg(value#90) AS WROT1_RotSpd_avg#379]
+- Deduplicate [TimeStamp#48, variable#41, value#90, device#43]
   +- Filter (variable#41 = WROT1_RotSpd)
      +- Project [TimeStamp#48, variable#41, CASE WHEN isnull(value#42) THEN 232.0 ELSE value#42 END AS value#90, device#43]
         +- Project [to_timestamp('TimeStamp, None) AS TimeStamp#48, variable#41, value#42, device#43]
            +- Relation[TimeStamp#40,variable#41,value#42,device#43] csv



In [5]:
columns = ['TimeStamp', 'device']

for i in df_new.select("variable").distinct().collect():
    variable = i.variable
    
    df2 = df_new.filter(col("variable") == variable).distinct()
    
    # Adding new column with prefix 'variable_' for avg, min, max, last, and stddev
    df2 = df2.withColumn(variable + '_' + 'avg', avg("value"))
    df2 = df2.withColumn(variable + '_' + 'min', min("value"))
    df2 = df2.withColumn(variable + '_' + 'max', max("value"))
    df2 = df2.withColumn(variable + '_' + 'last', last("value"))
    df2 = df2.withColumn(variable + '_' + 'stddev', stddev("value"))
    
    columns.extend([variable + '_avg', variable + '_min', variable + '_max', variable + '_last', variable + '_stddev'])
    

# Select the desired columns and sort the DataFrame
df3 = df_new.groupBy("TimeStamp", "device", *columns).agg().sort("variable", "device", "TimeStamp")

df3.show()


AnalysisException: grouping expressions sequence is empty, and '`TimeStamp`' is not an aggregate function. Wrap '(avg(`value`) AS `WNAC1_WdSpd2_avg`)' in windowing function(s) or wrap '`TimeStamp`' in first() (or first_value) if you don't care which value you get.;
Aggregate [TimeStamp#24, variable#17, value#57, device#19, avg(value#57) AS WNAC1_WdSpd2_avg#207]
+- Deduplicate [TimeStamp#24, variable#17, value#57, device#19]
   +- Filter (variable#17 = WNAC1_WdSpd2)
      +- Project [TimeStamp#24, variable#17, CASE WHEN isnull(value#18) THEN 232.0 ELSE value#18 END AS value#57, device#19]
         +- Project [to_timestamp('TimeStamp, None) AS TimeStamp#24, variable#17, value#18, device#19]
            +- Relation[TimeStamp#16,variable#17,value#18,device#19] csv



In [None]:
grouped_df = df_new.groupBy("TimeStamp", "device", "variable")

# Generate a dynamic list of aggregation expressions for each distinct value in 'variable'
aggregation_exprs = []
distinct_variables = df.select("variable").distinct().rdd.flatMap(lambda x: x).collect()
for variable in distinct_variables:
    aggregation_exprs.extend([
        round(avg(df["value"]).alias(variable + "_avg"), 2),
        round(min(df["value"]).alias(variable + "_min"), 2),
        max(df["value"]).alias(variable + "_max")
    ])

# Perform the aggregations using the generated expressions
aggregated_df = grouped_df.agg(*aggregation_exprs)

# Print the schema of the aggregated DataFrame
aggregated_df.printSchema()

# Show the contents of the aggregated DataFrame
aggregated_df.show()





In [None]:
# General function for getting n-minute (window duration) aggregate 

def get_aggregates(df, window_duration, columns):
    return df.groupBy(window("TimeStamp", window_duration)).agg(
        avg(*columns).alias("avg"),
        min(*columns).alias("min"),
        max(*columns).alias("max"),
        last(*columns).alias("last"),
        stddev(*columns).alias("stddev")
    )


#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [None]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='database_name', table_name='table_name')
dyf.printSchema()

#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [None]:
df = dyf.toDF()
df.show()

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)