In [2]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark==3.5.1

In [3]:
from pyspark import SparkContext
import pyspark, re


sc = SparkContext(appName="bigdatahw4")

print("Spark version:", pyspark.__version__)

Spark version: 3.5.1


In [6]:
spark = SparkSession.builder \
    .appName("CS4371_Homework4") \
    .getOrCreate()

print("sp")

✅ SparkSession active — ready for DataFrames!


In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
import pandas as pd
from pyspark.sql.functions import avg, col, trim, broadcast, udf
from pyspark.sql.types import BooleanType, StringType



In [7]:
city_temp = spark.read.csv("city_temperature.csv", header=True, inferSchema=True)
country_list = spark.read.csv("country-list.csv", header=True, inferSchema=True)

# Check schemas and preview a few rows
print("City Temperature Schema:")
city_temp.printSchema()
city_temp.show(5)

print("Country List Schema:")
country_list.printSchema()
country_list.show(5)

City Temperature Schema:
root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- State: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- AvgTemperature: double (nullable = true)

+------+-------+-----+-------+-----+---+----+--------------+
|Region|Country|State|   City|Month|Day|Year|AvgTemperature|
+------+-------+-----+-------+-----+---+----+--------------+
|Africa|Algeria| NULL|Algiers|    1|  1|1995|          64.2|
|Africa|Algeria| NULL|Algiers|    1|  2|1995|          49.4|
|Africa|Algeria| NULL|Algiers|    1|  3|1995|          48.8|
|Africa|Algeria| NULL|Algiers|    1|  4|1995|          46.4|
|Africa|Algeria| NULL|Algiers|    1|  5|1995|          47.9|
+------+-------+-----+-------+-----+---+----+--------------+
only showing top 5 rows

Country List Schema:
root
 |-- country: string (nullable = true)
 |-- capital: string (n

## Q1A

In [10]:
q1a = (
    city_temp
    .groupBy("Region")
    .agg(avg(col("AvgTemperature")).alias("Avg_AvgTemperature"))
    .orderBy("Region")
)

q1a.show(truncate=False)

(
    q1a
    .coalesce(1)  #single output
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", "\t")
    .csv("output_Q1A")
)

print("files saved under /content/output_Q1A")

+---------------------------------+------------------+
|Region                           |Avg_AvgTemperature|
+---------------------------------+------------------+
|Africa                           |53.54951656193528 |
|Asia                             |62.56864868961511 |
|Australia/South Pacific          |61.180869127275976|
|Europe                           |46.69628524306878 |
|Middle East                      |68.3845217196125  |
|North America                    |55.300932625245395|
|South/Central America & Carribean|62.189438801074665|
+---------------------------------+------------------+

files saved under /content/output_Q1A


## Q1B

In [11]:
q1b = (
    city_temp
    .filter(col("Region") == "Africa")
    .groupBy("Year")
    .agg(avg(col("AvgTemperature")).alias("Avg_AvgTemperature"))
    .orderBy("Year")
)

q1b.show(10, truncate=False)

(
    q1b
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", "\t")
    .csv("output_Q1B")
)

print("files saved under /content/output_Q1B")

+----+------------------+
|Year|Avg_AvgTemperature|
+----+------------------+
|201 |-99.0             |
|1995|52.97673713055389 |
|1996|48.348380171740835|
|1997|37.28249510763201 |
|1998|30.327075252133515|
|1999|34.153868682097425|
|2000|30.25645374034299 |
|2001|39.14619744922062 |
|2002|34.032092583845056|
|2003|34.28053849787433 |
+----+------------------+
only showing top 10 rows

files saved under /content/output_Q1B


## Q1C

In [13]:

q1c = (
    city_temp
    .withColumn("Country", trim(col("Country")))
    .withColumn("City", trim(col("City")))
    .filter(col("Country") == "Jordan")
    .groupBy("City")
    .agg(avg(col("AvgTemperature")).alias("Avg_AvgTemperature"))
    .orderBy(col("Avg_AvgTemperature").desc())  # or .orderBy("City") if you prefer alpha
)

q1c.show(20, truncate=False)

(
    q1c
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", "\t")
    .csv("output_Q1C")
)

print("files saved under /content/output_Q1C")

+-----+------------------+
|City |Avg_AvgTemperature|
+-----+------------------+
|Amman|64.16010144614734 |
+-----+------------------+

files saved under /content/output_Q1C


## Q1D

In [15]:
ct = (
    city_temp
    .withColumn("City", trim(col("City")))
    .withColumn("Country", trim(col("Country")))
    .alias("t")
)
cl = (
    country_list
    .withColumn("country", trim(col("country")))
    .withColumn("capital", trim(col("capital")))
    .alias("c")
)

#join on capital-city and country
joined = ct.join(
    cl,
    (col("t.City") == col("c.capital")) & (col("t.Country") == col("c.country")),
    "inner"
)

#group and aggregate using fully-qualified column refs
q1d = (
    joined
    .groupBy(col("c.capital").alias("Capital"), col("c.country").alias("Country"))
    .agg(avg(col("t.AvgTemperature")).alias("Avg_AvgTemperature"))
    .orderBy("Country")
)

#preview
q1d.show(truncate=False)

(
    q1d
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", "\t")
    .csv("output_Q1D")
)

print("files saved under /content/output_Q1D")

+------------+------------------------+------------------+
|Capital     |Country                 |Avg_AvgTemperature|
+------------+------------------------+------------------+
|Tirana      |Albania                 |33.172922512410985|
|Algiers     |Algeria                 |63.755439240232846|
|Buenos Aires|Argentina               |62.304899633067144|
|Canberra    |Australia               |55.57968918627248 |
|Vienna      |Austria                 |51.04722641916694 |
|Nassau      |Bahamas                 |76.57303831624395 |
|Manama      |Bahrain                 |80.63559248866856 |
|Dhaka       |Bangladesh              |10.10987951807226 |
|Bridgetown  |Barbados                |77.00251697494743 |
|Minsk       |Belarus                 |41.82123272884281 |
|Brussels    |Belgium                 |51.057047269587684|
|Hamilton    |Bermuda                 |66.97127603710837 |
|La Paz      |Bolivia                 |44.86714871573494 |
|Sofia       |Bulgaria                |45.20334556442906

## Q1E

In [17]:
t = (
    city_temp
    .withColumn("City", trim(col("City")))
    .withColumn("Country", trim(col("Country")))
    .alias("t")
)
c = (
    country_list
    .withColumn("country", trim(col("country")))
    .withColumn("capital", trim(col("capital")))
    .alias("c")
)

joined_b = t.join(
    broadcast(c),
    (col("t.City") == col("c.capital")) & (col("t.Country") == col("c.country")),
    "inner"
)

q1e = (
    joined_b
    .groupBy(col("c.capital").alias("Capital"), col("c.country").alias("Country"))
    .agg(avg(col("t.AvgTemperature")).alias("Avg_AvgTemperature"))
    .orderBy("Country")
)

#preview
q1e.show(truncate=False)

(
    q1e
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", "\t")
    .csv("output_Q1E")
)

print("files saved under /content/output_Q1E")

+------------+------------------------+------------------+
|Capital     |Country                 |Avg_AvgTemperature|
+------------+------------------------+------------------+
|Tirana      |Albania                 |33.172922512410985|
|Algiers     |Algeria                 |63.755439240232846|
|Buenos Aires|Argentina               |62.304899633067144|
|Canberra    |Australia               |55.57968918627248 |
|Vienna      |Austria                 |51.04722641916694 |
|Nassau      |Bahamas                 |76.57303831624395 |
|Manama      |Bahrain                 |80.63559248866856 |
|Dhaka       |Bangladesh              |10.10987951807226 |
|Bridgetown  |Barbados                |77.00251697494743 |
|Minsk       |Belarus                 |41.82123272884281 |
|Brussels    |Belgium                 |51.057047269587684|
|Hamilton    |Bermuda                 |66.97127603710837 |
|La Paz      |Bolivia                 |44.86714871573494 |
|Sofia       |Bulgaria                |45.20334556442906

## Q1F

In [21]:
year_filter_udf = udf(lambda y: y >= 2012 if y is not None else False, BooleanType())

#apply UDF filter
filtered = city_temp.filter(year_filter_udf(col("Year")))

format_udf = udf(
    lambda capital, country, temp: f"{capital} is the capital of {country} and its average temperature is {round(temp,2)}",
    StringType()
)
t = (
    filtered
    .withColumn("City", trim(col("City")))
    .withColumn("Country", trim(col("Country")))
    .alias("t")
)
c = (
    country_list
    .withColumn("country", trim(col("country")))
    .withColumn("capital", trim(col("capital")))
    .alias("c")
)
joined_f = t.join(
    c,
    (col("t.City") == col("c.capital")) & (col("t.Country") == col("c.country")),
    "inner"
)

#. group
avg_df = (
    joined_f
    .groupBy(col("c.capital").alias("Capital"), col("c.country").alias("Country"))
    .agg(avg(col("t.AvgTemperature")).alias("Avg_AvgTemperature"))
)

#formatting
final_q1f = avg_df.withColumn(
    "Description",
    format_udf(col("Capital"), col("Country"), col("Avg_AvgTemperature"))
).select("Description")

final_q1f.show(10, truncate=False)

# Saving both outputs
(
    filtered
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .csv("output_Q1F_part1")
)
(
    final_q1f
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .option("delimiter", "\t")
    .csv("output_Q1F_part2")
)

print("files saved to /content/output_Q1F_part1 and /content/output_Q1F_part2")

+-------------------------------------------------------------------------------------+
|Description                                                                          |
+-------------------------------------------------------------------------------------+
|Zagreb is the capital of Croatia and its average temperature is 54.3                 |
|Manama is the capital of Bahrain and its average temperature is 80.87                |
|Abu Dhabi is the capital of United Arab Emirates and its average temperature is 82.38|
|Addis Ababa is the capital of Ethiopia and its average temperature is 54.15          |
|Bishkek is the capital of Kyrgyzstan and its average temperature is 52.4             |
|Skopje is the capital of Macedonia and its average temperature is 54.19              |
|Helsinki is the capital of Finland and its average temperature is 42.37              |
|Algiers is the capital of Algeria and its average temperature is 63.53               |
|Damascus is the capital of Syri

In [23]:
import glob
from google.colab import files

def download_outputs(folder_pattern):
    for folder in sorted(glob.glob(folder_pattern)):
        files_in_folder = glob.glob(f"{folder}/*.csv") + glob.glob(f"{folder}/*.tsv")
        if not files_in_folder:
            files_in_folder = glob.glob(f"{folder}/part-*")
        for f in files_in_folder:
            print(f"⬇️ Downloading {f} ...")
            files.download(f)

#download outputs for all questions
download_outputs("output_Q1A")
download_outputs("output_Q1B")
download_outputs("output_Q1C")
download_outputs("output_Q1D")
download_outputs("output_Q1E")
download_outputs("output_Q1F_part1")
download_outputs("output_Q1F_part2")

print("c")

ModuleNotFoundError: No module named 'util'

In [24]:
import glob, shutil
output_map = {
    "output_Q1A": "Q1A_output.csv",
    "output_Q1B": "Q1B_output.csv",
    "output_Q1C": "Q1C_output.csv",
    "output_Q1D": "Q1D_output.csv",
    "output_Q1E": "Q1E_output.csv",
    "output_Q1F_part1": "Q1F_part1_filtered.csv",
    "output_Q1F_part2": "Q1F_part2_formatted.csv",
}

for folder, new_name in output_map.items():
    # Find Spark’s generated part file (it may be .csv or no extension)
    part_files = glob.glob(f"{folder}/part-*")
    if not part_files:
        print(f"⚠️ No file found in {folder}")
        continue

    part_file = part_files[0]
    new_path = f"/content/{new_name}"

    # Move and rename
    shutil.move(part_file, new_path)
    print(f"✅ Renamed: {part_file} → {new_path}")

    # Download
    files.download(new_path)

✅ Renamed: output_Q1A/part-00000-76004f08-12b6-42aa-8096-d4a5ad3fca3e-c000.csv → /content/Q1A_output.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Renamed: output_Q1B/part-00000-125f1493-3193-486f-acd7-86ae40f9564f-c000.csv → /content/Q1B_output.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Renamed: output_Q1C/part-00000-51ff11cb-c8ca-4d0e-abb3-9a7377ca99f2-c000.csv → /content/Q1C_output.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Renamed: output_Q1D/part-00000-91819710-496a-4434-b366-6e42295202da-c000.csv → /content/Q1D_output.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Renamed: output_Q1E/part-00000-7ee0de72-4e21-4f73-9e05-57f38538acfb-c000.csv → /content/Q1E_output.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Renamed: output_Q1F_part1/part-00000-8619ea62-1487-48e0-b737-a7e7aca4fdc1-c000.csv → /content/Q1F_part1_filtered.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Renamed: output_Q1F_part2/part-00000-cf3828d9-a605-4d3f-a4b7-2559ff60b3a6-c000.csv → /content/Q1F_part2_formatted.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>