In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Homework2_Spark_Chapter2") \
    .getOrCreate()

print("Spark version:", spark.version)

# Load all the summary CSV files (2010-2015)
df = spark.read.csv("gs://msba-tasmania-svtayade/201*-summary.csv", header=True, inferSchema=True)

# Explore schema and data
df.printSchema()
df.show(5)
df.describe().show()

# Ensure 'count' is numeric
df = df.withColumn("count", F.col("count").cast("int"))

# Basic analysis
df.groupBy("DEST_COUNTRY_NAME").count().orderBy(F.desc("count")).show(10)

# Add year from file name
df_with_year = df.withColumn("source_file", F.input_file_name())
df_with_year = df_with_year.withColumn("year", F.regexp_extract("source_file", r'(\d{4})', 1))

df_with_year.select("year", "DEST_COUNTRY_NAME", "count").show(5)

# Group by year and country
df_with_year.groupBy("year", "DEST_COUNTRY_NAME") \
    .sum("count") \
    .orderBy("year", F.desc("sum(count)")) \
    .show()

# Load temperature data
df_temp = spark.read.csv(
    "gs://msba-tasmania-svtayade/GlobalLandTemperatures_GlobalLandTemperaturesByCountry.csv",
    header=True, inferSchema=True
)

# Extract year
df_temp = df_temp.withColumn("Year", F.year("dt"))
df_temp.createOrReplaceTempView("temp")

# a. Highest average temperature
spark.sql("""
    SELECT Country, Year, AVG(AverageTemperature) as AvgTemp
    FROM temp
    GROUP BY Country, Year
    ORDER BY AvgTemp DESC
    LIMIT 1
""").show()

# b. Top 10 countries with highest temp change
df_country_year = spark.sql("""
    SELECT Country, Year, AVG(AverageTemperature) as AvgTemp
    FROM temp
    GROUP BY Country, Year
""")

w_first = Window.partitionBy("Country").orderBy("Year")
w_last  = Window.partitionBy("Country").orderBy(F.desc("Year"))

first_last = (
    df_country_year
    .withColumn("FirstYear",     F.first("Year").over(w_first))
    .withColumn("FirstYearTemp", F.first("AvgTemp").over(w_first))
    .withColumn("LastYear",      F.first("Year").over(w_last))
    .withColumn("LastYearTemp",  F.first("AvgTemp").over(w_last))
    .select("Country", "FirstYear", "FirstYearTemp", "LastYear", "LastYearTemp")
    .distinct()
    .withColumn("TempChange", F.col("LastYearTemp") - F.col("FirstYearTemp"))
)

first_last.orderBy(F.desc("TempChange")).show(10, truncate=False)

# Load CO2 data
co2_path = "gs://msba-tasmania-svtayade/CO2 emissions per capita per country.csv"

co2_raw = (
    spark.read.option("header", True).option("inferSchema", True).csv(co2_path)
)

# Convert wide to long format
years = [str(y) for y in range(1960, 2015)]
expr = (
    "stack(" + str(len(years)) + ", " +
    ", ".join([f"'{y}', `{y}`" for y in years]) +
    ") as (Year, CO2_per_capita)"
)

co2_long = (
    co2_raw.selectExpr("`Country Name` as Country", expr)
           .filter(F.col("CO2_per_capita").isNotNull())
           .withColumn("Year", F.col("Year").cast("int"))
)

# Filter temperature data for same range
temp_60_14 = (
    df_country_year.filter((F.col("Year") >= 1960) & (F.col("Year") <= 2014))
)

# Join datasets
merged = (
    temp_60_14.join(co2_long, on=["Country", "Year"])
              .select("Country", "Year", "AvgTemp", "CO2_per_capita")
)

print(f"Rows after merge (1960–2014): {merged.count()}")
merged.show(5, truncate=False)

# Global correlation
corr_val = merged.stat.corr("AvgTemp", "CO2_per_capita")
print(f"Pearson correlation (AvgTemp vs CO2 per capita): {corr_val:.4f}")

# Correlation by country
corr_by_country = (
    merged.groupBy("Country")
          .agg(F.corr("AvgTemp", "CO2_per_capita").alias("Corr"))
          .orderBy(F.desc("Corr"))
)

corr_by_country.show(5, truncate=False)

# End of script
spark.stop()
