part a

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("AvgTemperatureByRegion").getOrCreate()

# Load the CSV file into a Spark DataFrame
file_path = "/FileStore/tables/city_temperature.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# using Spark DataFrame
avg_temp_by_region_df = df.groupBy("Region").agg({"AvgTemperature": "avg"}).withColumnRenamed("avg(AvgTemperature)", "AvgTemperature")

display(avg_temp_by_region_df)

Region,AvgTemperature
Africa,53.54951656193528
Asia,62.56865184754707
Europe,46.69628524306841
Australia/South Pacific,61.180869127275976
North America,55.30093262524632
Middle East,68.38455378399779
South/Central America & Carribean,62.18943880107512


part b

In [0]:
# Load the CSV file into a Spark DataFrame
file_path = "/FileStore/tables/city_temperature.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

#getting a dataframe with just countries in Asia

asia_df = df.filter(col("Region") == "Asia")
# display(asia_df)

avgTempByYearAsia = asia_df.groupBy("Year").agg({"AvgTemperature": "avg"}).withColumnRenamed("avg(AvgTemperature)", "AvgTemperature")
display(avgTempByYearAsia)

Year,AvgTemperature
2003,63.26089236790604
2007,62.56406543519104
2018,66.41154836031548
2015,67.69533035270744
2006,62.95917808219164
2013,67.47865504358647
1997,57.96391769150392
2014,67.26617683686162
2019,63.41934412619359
2004,62.38467603434807


part c

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("AvgTemperatureByCitySpain").getOrCreate()

# Load the CSV file into a Spark DataFrame
file_path = "/FileStore/tables/city_temperature.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Filter data for cities in the Country "Spain"
spain_df = df.filter(col("Country") == "Spain")

# Register the DataFrame as a temporary table to use Spark SQL
spain_df.createOrReplaceTempView("spain_temperature_table")

# Using Spark SQL to find the average temperature by City for cities in Spain
AvgTempCitySpain = spark.sql("SELECT City, AVG(AvgTemperature) AS AvgTemperature FROM spain_temperature_table GROUP BY City")

# Show the result
display(AvgTempCitySpain)


City,AvgTemperature
Madrid,58.4405352903085
Bilbao,58.69515432764961
Barcelona,61.26926397582565


part d

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("AvgTemperatureByCountry").getOrCreate()

# Load city temperature data
temperature_file_path = "/FileStore/tables/city_temperature.csv"
temperature_df = spark.read.csv(temperature_file_path, header=True, inferSchema=True)

# Load country capitals data
capitals_file_path = "/FileStore/tables/country_list-2.csv"
capitals_df = spark.read.csv(capitals_file_path, header=True, inferSchema=True)

# Join the temperature data with the country capitals data
joined_df = temperature_df.join(capitals_df, on="Country")

# table
joined_df.createOrReplaceTempView("joined_table")

# find the capital and average temperature for each country
avgTempCountry = spark.sql(
    "SELECT Country, Capital, AVG(AvgTemperature) AS AvgTemperature "
    "FROM joined_table "
    "WHERE City = Capital "
    "GROUP BY Country, Capital"
)

# Show the result
display(avgTempCountry)


Country,Capital,AvgTemperature
Namibia,Windhoek,57.990341031729066
China,Beijing,54.71997625728461
Mauritania,Nouakchott,73.41478705686403
Sierra Leone,Freetown,-9.820260911579975
Tunisia,Tunis,66.47864234837016
Central African Republic,Bangui,67.01951219512179
Madagascar,Antananarivo,63.44589898553858
Ethiopia,Addis Ababa,25.45525551371705
Uganda,Kampala,44.1425256341069
Guinea-Bissau,Bissau,2.3921217353767306


part e

In [0]:
from pyspark.sql.functions import broadcast

# same as part d, except use a broadcast variable

broadcast_capitals = broadcast(capitals_df)

# Join the temperature data with the broadcasted country capitals data
joined_df = temperature_df.join(broadcast_capitals, on="Country")

# table
joined_df.createOrReplaceTempView("joined_temperature_table")

avgTempCountry = spark.sql(
    "SELECT Country, Capital, AVG(AvgTemperature) AS AvgTemperature "
    "FROM joined_temperature_table "
    "WHERE City = Capital "
    "GROUP BY Country, Capital"
)

display(avgTempCountry)

Country,Capital,AvgTemperature
Namibia,Windhoek,57.990341031729066
China,Beijing,54.71997625728461
Mauritania,Nouakchott,73.41478705686403
Sierra Leone,Freetown,-9.820260911579975
Tunisia,Tunis,66.47864234837016
Central African Republic,Bangui,67.01951219512179
Madagascar,Antananarivo,63.44589898553858
Ethiopia,Addis Ababa,25.45525551371705
Uganda,Kampala,44.1425256341069
Guinea-Bissau,Bissau,2.3921217353767306


1fi

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import broadcast

# Initialize Spark session
spark = SparkSession.builder.appName("AvgTemperatureByCountryUDF").getOrCreate()

# Load city temperature data
temperature_file_path = "/FileStore/tables/city_temperature.csv"
temperature_df = spark.read.csv(temperature_file_path, header=True, inferSchema=True)

# Load country capitals data
capitals_file_path = "/FileStore/tables/country_list-2.csv"
capitals_df = spark.read.csv(capitals_file_path, header=True, inferSchema=True)

# UDF to filter years (Year >= 2000)
filter_year_udf = udf(lambda year: year if year >= 2000 else None, IntegerType())

# Apply UDF to create a new column "FilteredYear"
temperature_df = temperature_df.withColumn("FilteredYear", filter_year_udf("Year"))

# Broadcast the capitals data
broadcast_capitals = broadcast(capitals_df)

# Join the temperature data with the broadcasted country capitals data
joined_df = temperature_df.join(broadcast_capitals, on="Country")

# Register the joined DataFrame as a temporary table to use Spark SQL
joined_df.createOrReplaceTempView("joined_temperature_table")

# Use Spark SQL to find the capital, filtered year, and average temperature for each country
avg_temp_by_country = spark.sql(
    "SELECT Country, Capital, FilteredYear, AVG(AvgTemperature) AS AvgTemperature "
    "FROM joined_temperature_table "
    "WHERE City = Capital AND FilteredYear IS NOT NULL "
    "GROUP BY Country, Capital, FilteredYear"
)

# Show the result
display(avg_temp_by_country)


Country,Capital,FilteredYear,AvgTemperature
Egypt,Cairo,2003,71.87999999999997
Laos,Vientiane,2017,80.4443835616438
Mauritania,Nouakchott,2015,77.83442622950817
Senegal,Dakar,2003,75.94383561643835
North Korea,Pyongyang,2008,51.5325136612022
Kenya,Nairobi,2015,55.10081967213115
Sierra Leone,Freetown,2011,3.239726027397261
Mauritania,Nouakchott,2001,74.31452054794518
China,Beijing,2012,53.6983606557377
Guinea-Bissau,Bissau,2013,73.53205479452053


1F ii

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, broadcast
from pyspark.sql.types import IntegerType, StringType

# Initialize Spark session
spark = SparkSession.builder.appName("AvgTemperatureByCountryUDF").getOrCreate()

# Load city temperature data
temperature_file_path = "/FileStore/tables/city_temperature.csv"
temperature_df = spark.read.csv(temperature_file_path, header=True, inferSchema=True)

# Load country capitals data
capitals_file_path = "/FileStore/tables/country_list-2.csv"
capitals_df = spark.read.csv(capitals_file_path, header=True, inferSchema=True)

# Define the UDF to filter years (Year >= 2000)
filter_year_udf = udf(lambda year: year if year >= 2000 else None, IntegerType())

# Define the UDF to format the output
format_output_udf = udf(lambda capital, country, avg_temp: f"{capital} is the capital of {country} and its average temperature is {avg_temp}", StringType())

# Apply the UDF to create a new column "FilteredYear"
temperature_df = temperature_df.withColumn("FilteredYear", filter_year_udf("Year"))

# Broadcast the capitals data
broadcast_capitals = broadcast(capitals_df)

# Join the temperature data with the broadcasted country capitals data
joined_df = temperature_df.join(broadcast_capitals, on="Country")

# Register the joined DataFrame as a temporary table to use Spark SQL
joined_df.createOrReplaceTempView("joined_temperature_table")

# Use Spark SQL to find the capital, filtered year, and average temperature for each country
avg_temp_by_country = spark.sql(
    "SELECT Country, Capital, FilteredYear, AVG(AvgTemperature) AS AvgTemperature "
    "FROM joined_temperature_table "
    "WHERE City = Capital AND FilteredYear IS NOT NULL "
    "GROUP BY Country, Capital, FilteredYear"
)

# Apply the UDF to create a new column "Output"
avg_temp_by_country = avg_temp_by_country.withColumn("Output", format_output_udf("Capital", "Country", "AvgTemperature"))

display(avg_temp_by_country.select("Output"))
# display(avg_temp_by_country.select("Output").show(truncate=False))


Output
Cairo is the capital of Egypt and its average temperature is 71.87999999999997
Vientiane is the capital of Laos and its average temperature is 80.4443835616438
Nouakchott is the capital of Mauritania and its average temperature is 77.83442622950817
Dakar is the capital of Senegal and its average temperature is 75.94383561643835
Pyongyang is the capital of North Korea and its average temperature is 51.5325136612022
Nairobi is the capital of Kenya and its average temperature is 55.10081967213115
Freetown is the capital of Sierra Leone and its average temperature is 3.239726027397261
Nouakchott is the capital of Mauritania and its average temperature is 74.31452054794518
Beijing is the capital of China and its average temperature is 53.6983606557377
Bissau is the capital of Guinea-Bissau and its average temperature is 73.53205479452053
