# **Q1:**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Install java
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz ## Install Apache Spark
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5baccb352fcbb159c9b9d6e42eeff13f40d46cde493f256862de5e2ddb2a9d0c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [1]:
# I reinstall the pyspark with a exact version 3.1.2, cuz i had the problem with the link between PySpark and sql without doing this
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
!pip install pyspark==3.1.2




In [2]:
import os
from pyspark.sql import SparkSession
# Define Java and Spark home path in Google Colab
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

# from pyspark import SparkConf, SparkContext
from datetime import datetime, date, timedelta
from dateutil import relativedelta
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_list, collect_set, concat, first, array_distinct, col, size, expr
import random

In [4]:
spark = SparkSession.builder.appName("Flight Data Analysis").getOrCreate()


In [6]:
flightData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("2015-summary.csv")

In [7]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [8]:
flightData2015.sort("count").explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=33]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [9]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)


[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [10]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [11]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=55]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=68]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-su

In [12]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [13]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)


[Row(max(count)=370002)]

In [14]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [15]:
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [16]:
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#113L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#17,destination_total#113L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[sum(count#19)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=238]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_sum(count#19)])
            +- FileScan csv [DEST_COUNTRY_NAME#17,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




# **Q2:**

# **(1) - a:**

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, col

# new Spark session
spark = SparkSession.builder \
    .appName("Highest Average Temperature Analysis") \
    .getOrCreate()

temperature_df = spark.read.csv("GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv", header=True, inferSchema=True)

# convert dt column to date type
temperature_df = temperature_df.withColumn("year", col("dt").substr(0, 4))

# deal with NA
temperature_df = temperature_df.na.drop(subset=["AverageTemperature"])


max_temp = temperature_df.select(max("AverageTemperature")).collect()[0][0]
max_temp_record = temperature_df.filter(temperature_df["AverageTemperature"] == max_temp)
max_temp_record.select("Country", "year", "AverageTemperature").show()

# Stop the Spark session
spark.stop()

+-------+----+------------------+
|Country|year|AverageTemperature|
+-------+----+------------------+
| Kuwait|2012| 38.84200000000001|
+-------+----+------------------+



# **(1) - b:**

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, min, col, abs

# new Spark session
spark = SparkSession.builder \
    .appName("Temperature Change Analysis") \
    .getOrCreate()

temperature_df = spark.read.csv("GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv", header=True, inferSchema=True)

# convert dt to date type and extract the year
temperature_df = temperature_df.withColumn("year", col("dt").substr(0, 4))

# drop na
temperature_df = temperature_df.na.drop(subset=["AverageTemperature"])

# max &min
temp_range_df = temperature_df.groupBy("Country").agg(
    max("AverageTemperature").alias("MaxTemperature"),
    min("AverageTemperature").alias("MinTemperature")
)

# diff
temp_range_df = temp_range_df.withColumn("TemperatureChange", abs(col("MaxTemperature") - col("MinTemperature")))
top_countries = temp_range_df.orderBy(col("TemperatureChange").desc()).limit(10)
top_countries.select("Country", "TemperatureChange").show()

spark.stop()

+-----------+------------------+
|    Country| TemperatureChange|
+-----------+------------------+
| Kazakhstan|            49.163|
|   Mongolia|48.157999999999994|
|     Canada|            43.532|
|    Finland|            40.332|
|    Belarus|            39.338|
|    Estonia|38.882999999999996|
| Kyrgyzstan| 38.43599999999999|
|North Korea|            38.342|
|     Latvia|            38.063|
|    Moldova|            38.012|
+-----------+------------------+



# **(2) - a, b:**

In [22]:
!pip uninstall pyspark py4j
!pip install pyspark


Found existing installation: pyspark 3.1.2
Uninstalling pyspark-3.1.2:
  Would remove:
    /usr/local/bin/beeline
    /usr/local/bin/beeline.cmd
    /usr/local/bin/docker-image-tool.sh
    /usr/local/bin/find-spark-home
    /usr/local/bin/find-spark-home.cmd
    /usr/local/bin/find_spark_home.py
    /usr/local/bin/load-spark-env.cmd
    /usr/local/bin/load-spark-env.sh
    /usr/local/bin/pyspark
    /usr/local/bin/pyspark.cmd
    /usr/local/bin/pyspark2.cmd
    /usr/local/bin/run-example
    /usr/local/bin/run-example.cmd
    /usr/local/bin/spark-class
    /usr/local/bin/spark-class.cmd
    /usr/local/bin/spark-class2.cmd
    /usr/local/bin/spark-shell
    /usr/local/bin/spark-shell.cmd
    /usr/local/bin/spark-shell2.cmd
    /usr/local/bin/spark-sql
    /usr/local/bin/spark-sql.cmd
    /usr/local/bin/spark-sql2.cmd
    /usr/local/bin/spark-submit
    /usr/local/bin/spark-submit.cmd
    /usr/local/bin/spark-submit2.cmd
    /usr/local/bin/sparkR
    /usr/local/bin/sparkR.cmd
    /usr/lo

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, lit, max, min
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder.appName("HW2_part2").getOrCreate()
co2_data_path = 'CO2 emissions per capita per country.csv'
temp_data_path = 'GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv'

# read and process global temperature data
globalLandTemp = spark.read.option("inferSchema", "true").option("header", "true").csv(temp_data_path)
globalLandTemp = globalLandTemp.withColumn("Year", year(globalLandTemp.dt))
globalLandTemp.createOrReplaceTempView("temp_data")

# CO2 emission data
co2 = spark.read.option("inferSchema", "true").option("header", "true").csv(co2_data_path)

# reshape CO2 data into a long format
schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Year", StringType(), True),
    StructField("CO2_per_capita", DoubleType(), True)
])
co2_long = spark.createDataFrame([], schema)

year_columns = co2.columns[2:]
for year in year_columns:
    df_year = co2.select(
        col("Country Name").alias("Country"),
        lit(year).alias("Year"),
        col(year).cast(DoubleType()).alias("CO2_per_capita")
    )
    co2_long = co2_long.unionByName(df_year)

# filter and aggregate temperature data to get yearly changes per country
filtered_globalLandTemp = globalLandTemp.filter((col("Year") >= 1960) & (col("Year") <= 2014))
df_temp_change = filtered_globalLandTemp.groupBy('Country', 'Year').agg(
    (max('AverageTemperature') - min('AverageTemperature')).alias('TemperatureChange')
).orderBy("Year", "Country")

# cast year to string to match with CO2 data format
df_temp_change = df_temp_change.withColumn("Year", df_temp_change["Year"].cast(StringType()))
co2_long = co2_long.withColumn("Year", co2_long["Year"].cast(StringType()))

merged_df = df_temp_change.join(co2_long, ["Country", "Year"], "inner")
sorted_merged_df = merged_df.orderBy("Year", "Country")
non_null = sorted_merged_df.filter(sorted_merged_df.CO2_per_capita.isNotNull())

# make sure all data is numeric before correlation calculation
non_null = non_null.withColumn('CO2_per_capita', non_null['CO2_per_capita'].cast(DoubleType()))
non_null = non_null.withColumn('TemperatureChange', non_null['TemperatureChange'].cast(DoubleType()))

print("Sorted and Cleaned Merged DataFrame:")
non_null.show()

# corr
correlation = non_null.stat.corr('CO2_per_capita', 'TemperatureChange')
print("Correlation coefficient:", correlation)

spark.stop()

Sorted and Cleaned Merged DataFrame:
+--------------+----+------------------+--------------+
|       Country|Year| TemperatureChange|CO2_per_capita|
+--------------+----+------------------+--------------+
|   Afghanistan|1960|            24.789|   0.046059897|
|       Albania|1960|            18.404|   1.258194928|
|       Algeria|1960|22.242000000000004|   0.553763777|
|        Angola|1960| 5.777000000000001|   0.097471604|
|     Argentina|1960|            14.945|   2.367473032|
|     Australia|1960|            14.335|   8.582936643|
|       Austria|1960|            19.263|   4.373318828|
|       Bahrain|1960|            16.924|   3.544478443|
|      Barbados|1960|2.0559999999999974|   0.746296641|
|       Belgium|1960|            14.815|   9.941594074|
|        Belize|1960| 4.963999999999999|   0.477971846|
|         Benin|1960|             4.073|   0.066354063|
|       Bolivia|1960|             5.279|    0.27203787|
|        Brazil|1960|             2.314|   0.649630979|
|      Bulg