<a href="https://colab.research.google.com/github/SharveshSp04/sxs210399-sharvesh-subapalaniraj/blob/main/q2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Q2: Temperature Analysis - AUTO EXECUTION

# Initialize SparkContext
try:
    from pyspark import SparkContext
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("TemperatureAnalysis").getOrCreate()
    sc = spark.sparkContext
except Exception as e:
    print(f"Error initializing Spark: {e}")
    # Handle the error or exit if Spark initialization fails
    sc = None # Ensure sc is None if initialization fails


def clean_temperature(temp):
    """Clean temperature data - handle missing/invalid values"""
    try:
        temp_float = float(temp)
        # Assuming -99 indicates missing data (common in temperature datasets)
        return temp_float if temp_float > -50 else None
    except (ValueError, TypeError):
        return None

def create_mock_q2_data():
    """Create mock temperature and country data for Q2 testing"""

    # Mock temperature data
    temp_data = """Region,Country,State,City,Month,Day,Year,AvgTemperature
Asia,Japan,Tokyo,Tokyo,1,15,2020,5.5
Asia,Japan,Tokyo,Tokyo,2,15,2020,6.5
Asia,China,Beijing,Beijing,1,15,2020,-2.0
Asia,China,Beijing,Beijing,2,15,2020,1.5
Europe,Germany,Berlin,Berlin,1,15,2020,2.0
Europe,Germany,Berlin,Berlin,2,15,2020,4.0
Europe,Germany,Munich,Munich,1,15,2020,1.5
Europe,Germany,Munich,Munich,2,15,2020,3.5
Europe,France,Paris,Paris,1,15,2020,4.0
Europe,France,Paris,Paris,2,15,2020,6.0
North America,USA,California,Los Angeles,1,15,2020,15.0
North America,USA,California,Los Angeles,2,15,2020,16.0
Africa,Egypt,Cairo,Cairo,1,15,2020,14.0
Africa,Egypt,Cairo,Cairo,2,15,2020,16.0"""

    # Mock country capital data
    country_data = """country,capital,type
Japan,Tokyo,primary
China,Beijing,primary
Germany,Berlin,primary
France,Paris,primary
Egypt,Cairo,primary
USA,Washington,primary"""

    # Save mock data to files
    with open("city_temperature.csv", "w") as f:
        f.write(temp_data)

    with open("country-list.csv", "w") as f:
        f.write(country_data)

    print("Created mock Q2 datasets")

def q2_solution():
    print("\n" + "="*50)
    print("EXECUTING QUESTION 2")
    print("="*50)

    # Create mock data if real datasets don't exist
    try:
        temp_rdd = sc.textFile("city_temperature.csv")
        if temp_rdd.count() == 0:
            create_mock_q2_data()
            temp_rdd = sc.textFile("city_temperature.csv")
    except:
        create_mock_q2_data()
        temp_rdd = sc.textFile("city_temperature.csv")

    # Remove header and parse CSV
    header = temp_rdd.first()
    temp_data = temp_rdd.filter(lambda line: line != header) \
                       .map(lambda line: line.split(","))

    # A. Average temperature for each Region
    region_avg = temp_data.map(lambda fields: (fields[0], clean_temperature(fields[7]))) \
                         .filter(lambda x: x[1] is not None) \
                         .mapValues(lambda temp: (temp, 1)) \
                         .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                         .mapValues(lambda x: x[0] / x[1])

    region_avg.coalesce(1).saveAsTextFile("q2a_region_avg_output")

    # B. Average temperature by Month for Asia region
    asia_month_avg = temp_data.filter(lambda fields: fields[0] == "Asia") \
                             .map(lambda fields: ((fields[4], fields[6]), clean_temperature(fields[7]))) \
                             .filter(lambda x: x[1] is not None) \
                             .mapValues(lambda temp: (temp, 1)) \
                             .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                             .mapValues(lambda x: x[0] / x[1]) \
                             .map(lambda x: (x[0][0], x[0][1], x[1]))  # (Month, Year, AvgTemp)

    asia_month_avg.coalesce(1).saveAsTextFile("q2b_asia_month_avg_output")

    # C. Average temperature by City for Germany
    germany_city_avg = temp_data.filter(lambda fields: fields[1] == "Germany") \
                               .map(lambda fields: (fields[3], clean_temperature(fields[7]))) \
                               .filter(lambda x: x[1] is not None) \
                               .mapValues(lambda temp: (temp, 1)) \
                               .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                               .mapValues(lambda x: x[0] / x[1])

    germany_city_avg.coalesce(1).saveAsTextFile("q2c_germany_city_avg_output")

    # D. Capital city temperatures by country
    # Load country capital data
    try:
        country_rdd = sc.textFile("country-list.csv")
        country_header = country_rdd.first()
        country_data = country_rdd.filter(lambda line: line != country_header) \
                                 .map(lambda line: line.split(",")) \
                                 .map(lambda fields: (fields[0], fields[1]))  # (Country, Capital)
    except:
        create_mock_q2_data()
        country_rdd = sc.textFile("country-list.csv")
        country_header = country_rdd.first()
        country_data = country_rdd.filter(lambda line: line != country_header) \
                                 .map(lambda line: line.split(",")) \
                                 .map(lambda fields: (fields[0], fields[1]))

    # Process temperature data for capital cities
    capital_temp_data = temp_data.map(lambda fields: (fields[3], (fields[1], fields[6], clean_temperature(fields[7])))) \
                                .filter(lambda x: x[1][2] is not None) \
                                .mapValues(lambda x: (x[0], x[1], x[2]))  # (City, (Country, Year, Temp))

    # Join with country data and calculate averages
    capital_avg = country_data.join(capital_temp_data) \
                             .map(lambda x: ((x[1][0], x[1][1][1]), (x[1][1][2], 1))) \
                             .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                             .mapValues(lambda x: x[0] / x[1]) \
                             .map(lambda x: f"{x[0][0]}\t{x[0][1]}\t{x[1]}")

    capital_avg.coalesce(1).saveAsTextFile("q2d_capital_avg_output")

    # Print results for verification
    print("=== Q2 Results ===")
    print("\nA. Region averages:")
    for region, temp in region_avg.collect():
        print(f"{region}: {temp:.2f}°C")

    print("\nB. Asia month averages:")
    for month, year, temp in asia_month_avg.collect():
        print(f"Year {year}, Month {month}: {temp:.2f}°C")

    print("\nC. Germany city averages:")
    for city, temp in germany_city_avg.collect():
        print(f"{city}: {temp:.2f}°C")

    print("\nD. Capital averages:")
    for result in capital_avg.collect():
        print(result)

    return region_avg, asia_month_avg, germany_city_avg, capital_avg

# AUTO EXECUTE Q2
q2_region_avg, q2_asia_month_avg, q2_germany_city_avg, q2_capital_avg = q2_solution()


EXECUTING QUESTION 2
=== Q2 Results ===

A. Region averages:
Asia: 2.88°C
Europe: 3.50°C
Africa: 15.00°C
North America: 15.50°C

B. Asia month averages:
Year 2020, Month 1: 1.75°C
Year 2020, Month 2: 4.00°C

C. Germany city averages:
Berlin: 3.00°C
Munich: 2.50°C

D. Capital averages:


In [None]:
from google.colab import drive
drive.mount('/content/drive')