In [19]:
import urllib.request
import json
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from awsglue.context import GlueContext
import botocore

# Create SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)

# Define schema for the DataFrame
schema = StructType([
    StructField("id", IntegerType()),
    StructField("url", StringType()),
    StructField("name", StringType()),
    StructField("season", IntegerType()),
    StructField("number", IntegerType()),
    StructField("type", StringType()),
    StructField("airdate", StringType()),  # Assuming airdate is a string
    StructField("airtime", StringType()),
    StructField("airstamp", StringType()),  # Assuming airstamp is a timestamp
    StructField("runtime", IntegerType()),
    StructField("rating/average", DoubleType()),  # Assuming rating/average is a double
    StructField("image", StringType()),  # Assuming image is a string
    StructField("summary", StringType()),  # Assuming summary is a string
    StructField("_links_self_href", StringType()),
    StructField("_links_show_href", StringType()),
    StructField("_links_show_name", StringType()),
    StructField("_embedded_show_id", IntegerType()),
    StructField("_embedded_show_url", StringType()),
    StructField("_embedded_show_name", StringType()),
    StructField("_embedded_show_type", StringType()),
    StructField("_embedded_show_language", StringType()),
    StructField("_embedded_show_genres_0", StringType()),
    StructField("_embedded_show_genres_1", StringType()),
    StructField("_embedded_show_status", StringType()),
    StructField("_embedded_show_runtime", IntegerType()),
    StructField("_embedded_show_averageRuntime", IntegerType()),
    StructField("_embedded_show_premiered", StringType()),
    StructField("_embedded_show_ended", StringType()),
    StructField("_embedded_show_officialSite", StringType()),
    StructField("_embedded_show_schedule_time", StringType()),
    StructField("_embedded_show_rating_average", DoubleType()),
    StructField("_embedded_show_weight", IntegerType()),
    StructField("_embedded_show_network_id", IntegerType()),
    StructField("_embedded_show_network_name", StringType()),
    StructField("_embedded_show_network_country_name", StringType()),
    StructField("_embedded_show_network_country_code", StringType()),
    StructField("_embedded_show_network_country_timezone", StringType()),
    StructField("_embedded_show_network_officialSite", StringType()),
    StructField("_embedded_show_webChannel", StringType()),
    StructField("_embedded_show_dvdCountry", StringType()),
    StructField("_embedded_show_externals_tvrage", StringType()),
    StructField("_embedded_show_externals_thetvdb", StringType()),
    StructField("_embedded_show_externals_imdb", StringType()),
    StructField("_embedded_show_image_medium", StringType()),
    StructField("_embedded_show_image_original", StringType()),
    StructField("_embedded_show_summary", StringType()),
    StructField("_embedded_show_updated", IntegerType()),
    StructField("_embedded_show__links_self_href", StringType()),
    StructField("_embedded_show__links_previousepisode_href", StringType()),
    StructField("_embedded_show__links_previousepisode_name", StringType()),
    StructField("_embedded_show__links_nextepisode_href", StringType()),
    StructField("_embedded_show__links_nextepisode_name", StringType()),
    StructField("_embedded_show_genres_2", StringType()),
    StructField("_embedded_show_schedule_days_0", StringType()),
    StructField("_embedded_show_schedule_days_1", StringType()),
    StructField("_embedded_show_network", StringType()),
    StructField("_embedded_show_webChannel_id", IntegerType()),
    StructField("_embedded_show_webChannel_name", StringType()),
    StructField("_embedded_show_webChannel_country_name", StringType()),
    StructField("_embedded_show_webChannel_country_code", StringType()),
    StructField("_embedded_show_webChannel_country_timezone", StringType()),
    StructField("_embedded_show_webChannel_officialSite", StringType()),
    StructField("_embedded_show_schedule_days_2", StringType()),
    StructField("_embedded_show_schedule_days_3", StringType()),
    StructField("_embedded_show_schedule_days_4", StringType()),
    StructField("_embedded_show_schedule_days_5", StringType()),
    StructField("_embedded_show_schedule_days_6", StringType()),
    StructField("image_medium", StringType()),
    StructField("image_original", StringType()),
    StructField("_embedded_show_image", StringType()),
    StructField("_embedded_show_webChannel_country", StringType()),
    StructField("_embedded_show_genres_3", StringType()),
    StructField("_embedded_show_dvdCountry_name", StringType()),
    StructField("_embedded_show_dvdCountry_code", StringType()),
    StructField("_embedded_show_dvdCountry_timezone", StringType())
])

# Define S3 path
s3_path = "s3://myfinalprojectstorage/test"

# Define flag variable (0 or 1)
flag = 1

# Define start date for fetching data (if flag is 1)
start_date = "2024-01-01"

# If flag is 1, fetch data from date URL for each date from start_date to today
if flag == 1:
    # code to delete data from S3 bucket (if necessary)
    
    current_date = datetime.now().strftime("%Y-%m-%d")
    delta = datetime.strptime(current_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")
    for i in range(delta.days + 1):
        date_to_fetch = (datetime.strptime(start_date, "%Y-%m-%d") + timedelta(days=i)).strftime("%Y-%m-%d")
        api_url = f"https://api.tvmaze.com/schedule/web?date={date_to_fetch}&country=US"
        
        # Fetch data from the TVMaze API using urllib.request
        with urllib.request.urlopen(api_url) as url:
            data = json.load(url)
        
        # Convert JSON data to a Spark DataFrame with explicit schema
        current_date_data = spark.createDataFrame(data, schema=schema)
        
        # Delete existing data in the path corresponding to the specific date
        current_date_data.write.mode("overwrite").parquet(f"{s3_path}/{prefix}")
else:
    # If flag is 0, fetch data only for the current day
    current_date = datetime.now().strftime("%Y-%m-%d")
    api_url = f"https://api.tvmaze.com/schedule/web?date={current_date}&country=US"
    
    # Fetch data from the TVMaze API using urllib.request
    with urllib.request.urlopen(api_url) as url:
        data = json.load(url)
    
    # Convert JSON data to a Spark DataFrame with explicit schema
    current_date_data = spark.createDataFrame(data, schema=schema)
    
    # Delete existing data in the path corresponding to the specific date
    current_date_data.write.mode("overwrite").parquet(f"{s3_path}/{prefix}")



