# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [5]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, regexp_extract,sum,isnan,when,count
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session cfb204e9-1265-4d58-afc5-63c9b4489ec1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session cfb204e9-1265-4d58-afc5-63c9b4489ec1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session cfb204e9-1265-4d58-afc5-63c9b4489ec1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session cfb204e9-1265-4d58-afc5-63c9b4489ec1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [52]:
dyf_a = glueContext.create_dynamic_frame.from_options(connection_type="s3",connection_options={"paths":["s3://bunny970/Datasets/athletes.csv"]},
                                                   format="csv",format_options={"withHeader":True,"optimizePerformance":True})
dyf_c = glueContext.create_dynamic_frame.from_options(connection_type="s3",connection_options={"paths":["s3://bunny970/Datasets/coaches.csv"]},
                                                   format="csv",format_options={"withHeader":True,"optimizePerformance":True})
dyf_m = glueContext.create_dynamic_frame.from_options(connection_type="s3",connection_options={"paths":["s3://bunny970/Datasets/medals.csv"]},
                                                   format="csv",format_options={"withHeader":True,"optimizePerformance":True})
dyf_mt = glueContext.create_dynamic_frame.from_options(connection_type="s3",connection_options={"paths":["s3://bunny970/Datasets/medals_total.csv"]},
                                                   format="csv",format_options={"withHeader":True,"optimizePerformance":True})
dyf_t = glueContext.create_dynamic_frame.from_options(connection_type="s3",connection_options={"paths":["s3://bunny970/Datasets/technical_officials.csv"]},
                                                   format="csv",format_options={"withHeader":True,"optimizePerformance":True})

dyf_a.printSchema()

root
|-- name: string
|-- short_name: string
|-- gender: string
|-- birth_date: string
|-- birth_place: string
|-- birth_country: string
|-- country: string
|-- country_code: string
|-- discipline: string
|-- discipline_code: string
|-- residence_place: string
|-- residence_country: string
|-- height_m/ft: string
|-- url: string


In [53]:
dyf_c.printSchema()

root
|-- name: string
|-- short_name: string
|-- gender: string
|-- birth_date: string
|-- country_code: string
|-- discipline: string
|-- function: string
|-- event: string
|-- url: string


In [54]:
dyf_m.printSchema()

root
|-- medal_type: string
|-- medal_code: string
|-- medal_date: string
|-- athlete_short_name: string
|-- athlete_name: string
|-- athlete_sex: string
|-- athlete_link: string
|-- country_code: string
|-- discipline_code: string
|-- event: string
|-- country: string
|-- discipline: string


In [55]:
dyf_mt.printSchema()

root
|-- Rank: string
|-- Country Code: string
|-- Gold Medal: string
|-- Silver Medal: string
|-- Bronze Medal: string
|-- Total: string
|-- Country: string


In [56]:
dyf_t.printSchema()

root
|-- name: string
|-- short_name: string
|-- gender: string
|-- birth_date: string
|-- country: string
|-- discipline: string
|-- function: string
|-- url: string


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [58]:
df_a = dyf_a.toDF()
df_c=dyf_c.toDF()
df_m=dyf_m.toDF()
df_mt=dyf_mt.toDF()
df_t=dyf_t.toDF()




In [42]:
df_null = df_a.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ),c 
                           )).alias(c)
                    for c in df_a.columns])
df_null.show()

+----+----------+------+----------+-----------+-------------+-------+------------+----------+---------------+---------------+-----------------+-----------+---+
|name|short_name|gender|birth_date|birth_place|birth_country|country|country_code|discipline|discipline_code|residence_place|residence_country|height_m/ft|url|
+----+----------+------+----------+-----------+-------------+-------+------------+----------+---------------+---------------+-----------------+-----------+---+
|   0|         0|   159|       159|       4048|         3336|      0|           0|       159|              0|           4407|             5111|       7001|  0|
+----+----------+------+----------+-----------+-------------+-------+------------+----------+---------------+---------------+-----------------+-----------+---+


In [43]:
df1_a = df_a.filter((col("gender") != "") & (col("birth_date") != "") & (col("discipline") != ""))
df1_a = df1_a.drop('birth_place', 'birth_country', 'residence_place', 'residence_country', 'height_m/ft')




In [44]:
df1_a = df1_a.withColumn("athlete_id", regexp_extract(col("url"), r'n(\d+)-', 1))
df1_a = df1_a.drop('url')




In [45]:
df1_a = df1_a.dropDuplicates(['name'])
df1_a.select([col(c).alias(c + "_null") for c in df1_a.columns]).show()
df1_a.groupBy("athlete_id").count().filter("count > 1").show()

+--------------------+---------------+-----------+---------------+--------------------+-----------------+-------------------+--------------------+---------------+
|           name_null|short_name_null|gender_null|birth_date_null|        country_null|country_code_null|    discipline_null|discipline_code_null|athlete_id_null|
+--------------------+---------------+-----------+---------------+--------------------+-----------------+-------------------+--------------------+---------------+
|         ABAD Nestor|         ABAD N|       Male|     1993-03-29|               Spain|              ESP|Artistic Gymnastics|                 GAR|        1355250|
|       ABASS Abobakr|        ABASS A|       Male|     1998-11-01|               Sudan|              SUD|           Swimming|                 SWM|        1344792|
|       ABBOTT Monica|       ABBOTT M|     Female|     1985-07-28|United States of ...|              USA|  Baseball/Softball|                 BSB|        1310126|
|ABDALLA Abubaker ...|

In [46]:
df1_null = df1_a.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ),c 
                           )).alias(c)
                    for c in df1_a.columns])
df1_null.show()

+----+----------+------+----------+-------+------------+----------+---------------+----------+
|name|short_name|gender|birth_date|country|country_code|discipline|discipline_code|athlete_id|
+----+----------+------+----------+-------+------------+----------+---------------+----------+
|   0|         0|     0|         0|      0|           0|         0|              0|         0|
+----+----------+------+----------+-------+------------+----------+---------------+----------+


In [59]:
total_nulls = df_c.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ),c 
                           )).alias(c)
                    for c in df_c.columns])
total_nulls.show()

+----+----------+------+----------+------------+----------+--------+-----+---+
|name|short_name|gender|birth_date|country_code|discipline|function|event|url|
+----+----------+------+----------+------------+----------+--------+-----+---+
|   0|         0|     0|         0|           0|         0|       0|  126|  0|
+----+----------+------+----------+------------+----------+--------+-----+---+


In [60]:
df1_c = df_c.drop('event')
df1_c = df1_c.withColumn("coach_id", regexp_extract(col("url"), r'n(\d+)-', 1))
df1_c = df1_c.drop('url')
df1_c = df1_c.dropDuplicates(['coach_id'])




In [61]:
df1_c.groupBy("coach_id").count().filter("count > 1").show()

+--------+-----+
|coach_id|count|
+--------+-----+
+--------+-----+


In [62]:
df1_c.show()

+--------------------+------------------+------+----------+------------+-----------------+----------+--------+
|                name|        short_name|gender|birth_date|country_code|       discipline|  function|coach_id|
+--------------------+------------------+------+----------+------------+-----------------+----------+--------+
|    SHAHINTAB Mehran|       SHAHINTAB M|  Male|1966-03-21|         IRI|       Basketball|     Coach| 1281792|
|         DANCER Sean|          DANCER S|  Male|1975-08-04|         IRL|           Hockey|Head Coach| 1282059|
|       GRAINGER Adam|        GRAINGER A|  Male|1980-07-06|         IRL|           Hockey|     Coach| 1282063|
|     LORRAINE Andrew|        LORRAINE A|  Male|1972-08-11|         ISR|Baseball/Softball|     Coach| 1282219|
|        MULBERG Nate|         MULBERG N|  Male|1992-01-05|         ISR|Baseball/Softball|     Coach| 1282237|
|         FULI Saiasi|            FULI S|  Male|1978-03-16|         FIJ|     Rugby Sevens|Head Coach| 1282562|
|

In [63]:
df_m = df_m.withColumn("athlete_id", regexp_extract(col("athlete_link"), r'n(\d+)-', 1))
df_m = df_m.drop('athlete_link')




In [64]:
df1_m = df_m.join(df1_a.select("athlete_id"), "athlete_id", "inner")
df1_m = df1_m.join(df1_a.select("athlete_id", "gender"), "athlete_id", "inner")
df1_m = df1_m.drop('athlete_sex')
df1_m.show()

+----------+------------+----------+--------------------+------------------+--------------------+------------+---------------+--------------------+--------------------+-----------------+------+
|athlete_id|  medal_type|medal_code|          medal_date|athlete_short_name|        athlete_name|country_code|discipline_code|               event|             country|       discipline|gender|
+----------+------------+----------+--------------------+------------------+--------------------+------------+---------------+--------------------+--------------------+-----------------+------+
|   1310126|Silver Medal|         2|2021-07-27 00:00:...|          ABBOTT M|       ABBOTT Monica|         USA|            BSB|            Softball|United States of ...|Baseball/Softball|Female|
|   1444160|  Gold Medal|         1|2021-07-31 00:00:...|        ANDERSON F|      ANDERSON Freya|         GBR|            SWM|Mixed 4 x 100m Me...|       Great Britain|         Swimming|Female|
|   1324437|Silver Medal|     

In [65]:
total_nulls = df1_m.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ),c 
                           )).alias(c)
                    for c in df1_m.columns])
total_nulls.show()

+----------+----------+----------+----------+------------------+------------+------------+---------------+-----+-------+----------+------+
|athlete_id|medal_type|medal_code|medal_date|athlete_short_name|athlete_name|country_code|discipline_code|event|country|discipline|gender|
+----------+----------+----------+----------+------------------+------------+------------+---------------+-----+-------+----------+------+
|         0|         0|         0|         0|                 0|           0|           0|              0|    0|      0|         0|     0|
+----------+----------+----------+----------+------------------+------------+------------+---------------+-----+-------+----------+------+


In [66]:
total_nulls = df_mt.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ),c 
                           )).alias(c)
                    for c in df_mt.columns])
total_nulls.show()

+----+------------+----------+------------+------------+-----+-------+
|Rank|Country Code|Gold Medal|Silver Medal|Bronze Medal|Total|Country|
+----+------------+----------+------------+------------+-----+-------+
|   0|           0|         0|           0|           0|    0|      0|
+----+------------+----------+------------+------------+-----+-------+


In [70]:
total_nulls = df_t.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ),c 
                           )).alias(c)
                    for c in df_t.columns])
total_nulls.show()
df_t = df_t.withColumn("to_id", regexp_extract(col("url"), r'n(\d+)-', 1))
df_t=df_t.drop('url')

+----+----------+------+----------+-------+----------+--------+---+-----+
|name|short_name|gender|birth_date|country|discipline|function|url|to_id|
+----+----------+------+----------+-------+----------+--------+---+-----+
|   0|         0|     0|         0|      0|         0|       0|  0|    0|
+----+----------+------+----------+-------+----------+--------+---+-----+


In [71]:
df_t.show()

+-------------------+-----------+------+----------+--------------------+----------------+--------------------+-------+
|               name| short_name|gender|birth_date|             country|      discipline|            function|  to_id|
+-------------------+-----------+------+----------+--------------------+----------------+--------------------+-------+
|           ZWI Noam|      ZWI N|  Male|1958-04-20|              Israel|      Water Polo|               Judge|1571974|
|           ZWI Noam|      ZWI N|  Male|1958-04-20|              Israel|          Diving|               Judge|1571974|
|      ZWART Michiel|    ZWART M|  Male|1986-11-08|         Netherlands|      Water Polo|             Referee|1572083|
|        ZOURAK Adil|   ZOURAK A|  Male|1978-08-25|             Morocco|        Football|Video Assistant R...|1562057|
|      ZINGER Edward|   ZINGER E|  Male|1969-07-21|              Canada|       Wrestling|               Judge|1542965|
|        ZHU Linfang|      ZHU L|Female|1982-09-

In [50]:
df1_a.repartition(1).write.option("header", "true").mode("append").csv("s3://bunny970/Dataset_ETL_target/Athletes_updated.csv")
df1_c.repartition(1).write.option("header", "true").mode("append").csv("s3://bunny970/Dataset_ETL_target/Coaches_updated.csv")
df1_m.repartition(1).write.option("header", "true").mode("append").csv("s3://bunny970/Dataset_ETL_target/Medals_updated.csv")
df_mt.repartition(1).write.option("header", "true").mode("append").csv("s3://bunny970/Dataset_ETL_target/Medals_total_updated.csv")
df_t.repartition(1).write.option("header", "true").mode("append").csv("s3://bunny970/Dataset_ETL_target/Technical_updated.csv")


