# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [47]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import requests
from datetime import datetime
import boto3
import json
from io import StringIO

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 55e212f5-059b-46bb-a8ed-d86f6f022ef7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 55e212f5-059b-46bb-a8ed-d86f6f022ef7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 55e212f5-059b-46bb-a8ed-d86f6f022ef7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 55e212f5-059b-46bb-a8ed-d86f6f022ef7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



#### Get data from API function

In [42]:
#Get CSV data from the API
def get_api_data(api_url):
    try:
        # Make a GET request to the API
        response = requests.get(api_url)

        # Check if the request was successful (status code 200)
        if response.status_code == 200:
            # Parse the text data from the response
            data = response.text
            return data
        else:
            # Print an error message if the request was not successful
            print(f"Error: {response.status_code}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")




#### Write the data to a location in Amazon S3


In [43]:
#Write the data into the right folder in S3 bucket
def upload_to_s3(data, bucket_name, object_key):
    # Create an S3 client
    s3 = boto3.client('s3')
    
    # Upload the CSV data to S3
    try:
        # Convert CSV data to a Pandas DataFrame
        df = pd.read_csv(StringIO(data))

        # Convert DataFrame to CSV format
        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=False)
        csv_content = csv_buffer.getvalue()
        # Upload the CSV data as an S3 object
        s3.put_object(Body=csv_content, Bucket=bucket_name, Key=object_key)

        print(f"CSV data uploaded successfully to s3://{bucket_name}/{object_key}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")




In [44]:
#Get the API URL based on the dataset name
def get_api_url(dataset_name):
    if dataset_name == "Crashes":
        return "https://data.cityofchicago.org/resource/85ca-t3if.csv"
    elif dataset_name == "People":
        return "https://data.cityofchicago.org/resource/u6pd-qa9d.csv"
    elif dataset_name == "Vehicles":
        return "https://data.cityofchicago.org/resource/68nd-jvt3.csv"




In [49]:
# Format the current date and time as a string
current_datetime_stamp = datetime.now().strftime("%Y%m%d%H%M%S")
#S3 bucket name
s3_bucket_name = "uncharted-s3"
#Load the incremental data for the 3 datasources
datasets = ["Crashes","People","Vehicles"]
for i in datasets:
    #Get API URL
    api_url = get_api_url(i)
    # Call the function to get data from the API
    api_data = get_api_data(api_url)
    # Upload data to S3 if successfully fetched
    if api_data:
        #S3 object key
        s3_object_key = f"raw/incremental/{i}/{i}_{current_datetime_stamp}.csv"
        #Upload data to S3
        upload_to_s3(api_data, s3_bucket_name, s3_object_key)

CSV data uploaded successfully to s3://uncharted-s3/raw/incremental/Crashes/Crashes_20231126213815.csv
CSV data uploaded successfully to s3://uncharted-s3/raw/incremental/People/People_20231126213815.csv
CSV data uploaded successfully to s3://uncharted-s3/raw/incremental/Vehicles/Vehicles_20231126213815.csv
