In [0]:
#Question1 A
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

# Create a SparkSession
spark = SparkSession.builder.appName("AvgTempByRegionDataFrame").getOrCreate()

# Load the CSV file into a DataFrame
hw3_df = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/city_temperature.csv", header=True, inferSchema=True)

# Filter out invalid temperature entries
filtered_df = hw3_df.filter(col("AvgTemperature") != -99)

# Compute the average temperature for each region
average_by_region_df = filtered_df.groupBy("Region").agg(avg("AvgTemperature").alias("Average Temperature"))

# Display the results
average_by_region_df.show()


+--------------------+-------------------+
|              Region|Average Temperature|
+--------------------+-------------------+
|              Africa|  74.40260231125495|
|                Asia|   68.1097225987458|
|              Europe|  51.94717142841552|
|Australia/South P...|  62.30369323842191|
|       North America|  56.15019771858279|
|         Middle East|  73.84068255374054|
|South/Central Ame...|  72.20202379397276|
+--------------------+-------------------+



In [0]:
#Question1 B
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

# Create a SparkSession
spark = SparkSession.builder.appName("AvgTempByYearInAsiaDataFrame").getOrCreate()

# Load the CSV file into a DataFrame
hw3_df = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/city_temperature.csv", header=True, inferSchema=True)

# Filter rows based on valid AvgTemperature and "Asia" region
valid_temps_in_asia_df = hw3_df.filter((col("AvgTemperature") != -99.0) & (col("Region") == "Asia"))

# Compute average temperature for each year
average_by_year_df = valid_temps_in_asia_df.groupBy("Year").agg(avg("AvgTemperature").alias("Average Temperature"))

# Sort the results by year
sorted_results_df = average_by_year_df.orderBy("Year")

# Display the results
sorted_results_df.show()


+----+-------------------+
|Year|Average Temperature|
+----+-------------------+
|1995|   67.2629851012579|
|1996|  67.86967758228818|
|1997|  68.34836746936332|
|1998|  69.06291989664082|
|1999|  68.02917794316642|
|2000|  67.71886102847435|
|2001|  67.73726484541146|
|2002|  68.05932217366207|
|2003|   67.9122232063773|
|2004|  68.31447879572664|
|2005|  67.78168187744457|
|2006|  68.70920807327536|
|2007|  68.87105562784646|
|2008|   68.0895085896924|
|2009|  68.72838223632029|
|2010|  68.82422924901184|
|2011|  67.65085130533478|
|2012|  67.23867893253757|
|2013|  67.83878858474075|
|2014|   67.8062718640678|
+----+-------------------+
only showing top 20 rows



In [0]:
#Question1 C
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

# Create a SparkSession
spark = SparkSession.builder.appName("AvgTempByCityInSpainDataFrame").getOrCreate()

# Load the CSV file into a DataFrame
hw3_df = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/city_temperature.csv", header=True, inferSchema=True)

# Filter rows based on the "Spain" country and valid AvgTemperature entries
valid_temps_in_spain_df = hw3_df.filter((col("Country") == "Spain") & (col("AvgTemperature") != -99.0))

# Compute average temperature for each city in Spain
average_by_city_df = valid_temps_in_spain_df.groupBy("City").agg(avg("AvgTemperature").alias("Average Temperature"))

# Display the results
average_by_city_df.show()


+---------+-------------------+
|     City|Average Temperature|
+---------+-------------------+
|   Madrid|  58.91773111062988|
|   Bilbao| 59.173121887854684|
|Barcelona|  61.78984408835005|
+---------+-------------------+



In [0]:
#Question1 D
from pyspark.sql.functions import col, avg

#Load the CSV files into DataFrames
df_temp = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/city_temperature.csv", header=True, inferSchema=True) # Change path as per your file location
df_countries = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/country_list.csv", header=True, inferSchema=True) 

#Preprocessing on temperature data:
#1. Remove rows where AvgTemperature is None or a non-numeric value
#2. Handle specific values that are known to be erroneous for temperature (e.g., -99)
df_temp_cleaned = df_temp.filter(
    col("AvgTemperature").isNotNull() & 
    (col("AvgTemperature") != -99)
)

#Join the two DataFrames to get only the rows of capital cities
joined_df = df_temp_cleaned.join(df_countries, (df_temp_cleaned["Country"] == df_countries["Country"]) & (df_temp_cleaned["City"] == df_countries["Capital"]))

#Group by Country and Capital City to compute average for AvgTemperature
avg_temp_by_capital = joined_df.groupBy(df_countries["Country"], df_countries["Capital"].alias("Capital City")).agg(avg("AvgTemperature").alias("Average_Temperature"))

#Display the results
avg_temp_by_capital.orderBy("Country").show()

+--------------------+------------+-------------------+
|             Country|Capital City|Average_Temperature|
+--------------------+------------+-------------------+
|             Albania|      Tirana| 61.135237970711394|
|             Algeria|     Algiers|  64.37253818654507|
|           Argentina|Buenos Aires|  62.91649875419788|
|           Australia|    Canberra|   56.4014755343388|
|             Austria|      Vienna|  51.46943722943731|
|             Bahamas|      Nassau|  78.56622639449847|
|             Bahrain|      Manama|  81.21907752273744|
|          Bangladesh|       Dhaka|  78.72032520325234|
|            Barbados|  Bridgetown|  81.07108635764779|
|             Belarus|       Minsk|  45.39047039291643|
|             Belgium|    Brussels|   51.5118640398354|
|             Bermuda|    Hamilton|  71.59983807124873|
|             Bolivia|      La Paz|  45.50655826558265|
|            Bulgaria|       Sofia|   51.6243039116221|
|             Burundi|   Bujumbura|  73.68981900

In [0]:
#Question1 E
from pyspark.sql.functions import col, avg, broadcast

# Load the CSV files into DataFrames
df_temp = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/city_temperature.csv", header=True, inferSchema=True)  # Change path as per your file location
df_countries = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/country_list.csv", header=True, inferSchema=True)

# Preprocessing on temperature data:
# 1. Remove rows where AvgTemperature is None or a non-numeric value
# 2. Handle specific values that are known to be erroneous for temperature (e.g., -99)
df_temp_cleaned = df_temp.filter(
    col("AvgTemperature").isNotNull() &
    (col("AvgTemperature") != -99)
)

# Join the two DataFrames to get only the rows of capital cities
# Use broadcast join for optimization
joined_df = df_temp_cleaned.join(broadcast(df_countries), (df_temp_cleaned["Country"] == df_countries["Country"]) & (df_temp_cleaned["City"] == df_countries["Capital"]))

# Group by Country and Capital City to compute average for AvgTemperature
avg_temp_by_capital = joined_df.groupBy(df_countries["Country"], df_countries["Capital"].alias("Capital City")).agg(avg("AvgTemperature").alias("Average_Temperature"))

# Display the results
avg_temp_by_capital.orderBy("Country").show()


+--------------------+------------+-------------------+
|             Country|Capital City|Average_Temperature|
+--------------------+------------+-------------------+
|             Albania|      Tirana| 61.135237970711394|
|             Algeria|     Algiers|  64.37253818654507|
|           Argentina|Buenos Aires|  62.91649875419788|
|           Australia|    Canberra|   56.4014755343388|
|             Austria|      Vienna|  51.46943722943731|
|             Bahamas|      Nassau|  78.56622639449847|
|             Bahrain|      Manama|  81.21907752273744|
|          Bangladesh|       Dhaka|  78.72032520325234|
|            Barbados|  Bridgetown|  81.07108635764779|
|             Belarus|       Minsk|  45.39047039291643|
|             Belgium|    Brussels|   51.5118640398354|
|             Bermuda|    Hamilton|  71.59983807124873|
|             Bolivia|      La Paz|  45.50655826558265|
|            Bulgaria|       Sofia|   51.6243039116221|
|             Burundi|   Bujumbura|  73.68981900

In [0]:
#Question1 F
from pyspark.sql.functions import col, avg, udf
from pyspark.sql.types import BooleanType, StringType

# Define the UDF for year filtering
@udf(BooleanType())
def filter_year(year):
    return year >= 2000

# Define the UDF for output formatting
@udf(StringType())
def format_output(capital, country, average_temperature):
    return f"{capital} is the capital of {country} and its average temperature is {average_temperature}"

# Load the CSV files into DataFrames
df_temp = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/city_temperature.csv", header=True, inferSchema=True)
df_countries = spark.read.csv("/FileStore/shared_uploads/hyeongjinjoo82@gmail.com/country_list.csv", header=True, inferSchema=True)

# Preprocess the temperature data
df_temp_cleaned = df_temp.filter(
    col("AvgTemperature").isNotNull() & 
    (col("AvgTemperature") != -99)
).filter(filter_year(col("Year")))

# Join the two DataFrames to get only the rows of capital cities, aliasing the columns from df_countries
df_countries_aliased = df_countries.withColumnRenamed("Country", "CountryName").withColumnRenamed("Capital", "CapitalName")
joined_df = df_temp_cleaned.join(broadcast(df_countries_aliased), (df_temp_cleaned["Country"] == df_countries_aliased["CountryName"]) & (df_temp_cleaned["City"] == df_countries_aliased["CapitalName"]))

# Group by Country and Capital City to compute the average temperature, using the aliased names to avoid ambiguity
avg_temp_by_capital = joined_df.groupBy("CountryName", "CapitalName").agg(avg("AvgTemperature").alias("Average_Temperature"))

# Format the final output using the UDF
final_output = avg_temp_by_capital.withColumn("Formatted_Output", format_output(col("CapitalName"), col("CountryName"), col("Average_Temperature")))

# Select only the formatted output and order by CountryName
final_output.select("Formatted_Output").orderBy("CountryName").show(truncate=False)

+--------------------------------------------------------------------------------------------------+
|Formatted_Output                                                                                  |
+--------------------------------------------------------------------------------------------------+
|Tirana is the capital of Albania and its average temperature is 61.01525893104418                 |
|Algiers is the capital of Algeria and its average temperature is 64.39437550579977                |
|Buenos Aires is the capital of Argentina and its average temperature is 63.08454999325328         |
|Canberra is the capital of Australia and its average temperature is 56.64297665632144             |
|Vienna is the capital of Austria and its average temperature is 51.83600215662492                 |
|Nassau is the capital of Bahamas and its average temperature is 78.89768864717884                 |
|Manama is the capital of Bahrain and its average temperature is 81.3842346594743          