## Setup and Data Loading

In [None]:
# Install PySpark if not already installed
# !pip install pyspark
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
%cd /content/drive/MyDrive/hw5
os.chdir('/content/drive/MyDrive/hw5')
os.listdir()

/content/drive/MyDrive/hw5


['country-db', 'hw5-551-fa25.pdf', 'hw5_spark.ipynb']

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fc
spark = SparkSession.builder.getOrCreate()

country = spark.read.json('/content/drive/MyDrive/hw5/country-db/country.json')
city= spark.read.json('/content/drive/MyDrive/hw5/country-db/city.json')
cl = spark.read.json('/content/drive/MyDrive/hw5/country-db/cl.json')

# Create temporary views for SQL queries
city.createOrReplaceTempView("city")
country.createOrReplaceTempView("country")
cl.createOrReplaceTempView("cl")

print("Data loaded successfully!")

Data loaded successfully!


## Query 1: Country with GNP > 100,000 in America

In [None]:
# Query 1: Using Spark SQL
query1_sql = """
select Name, GNP
from country
where Continent like '%America%' and GNP > 100000;
"""

result1 = spark.sql(query1_sql)
print("Query 1 Results (SQL):")
result1.show(20)

In [None]:
# Query 1: Using DataFrame API
result1 = (country 
          .filter((fc.col('Continent').like('%America%')) & (fc.col('GNP') > 100000)) 
          .select('Name', 'GNP'))

print("Query 1 Results (DataFrame API):")
result1.show(20)

In [None]:
# Query 1: Using RDD API
rdd_country = country.rdd
result1_rdd = (rdd_country
               .filter(lambda row: 'America' in row.Continent and row.GNP > 100000)
               .map(lambda row: (row.Name, row.GNP)))

## Query 2: countries in North America with population > 100000

In [None]:
# Query 2: Using Spark SQL
query2_sql = """
select avg(GNP)

from country

where Continent = 'North America' and Population > 100000;
"""

result2 = spark.sql(query2_sql)
print("Query 2 Results (SQL):")
result2.show()

In [None]:
# Query 2: Using DataFrame API
result2 = country \
          .filter((fc.col('Continent') == 'North America') & (fc.col('Population') > 100000)) \
          .agg(fc.avg('GNP').alias('Average_GNP'))

print("Query 2 Results (DataFrame API):")
result2.show()

In [None]:
# Query 2: Using RDD API
rdd_country = country.rdd
filtered_rdd = rdd_country.filter(lambda row: row.Continent == 'North America' \
                                  and row.Population > 100000)
gnp_values = filtered_rdd.map(lambda row: row.GNP).filter(lambda gnp: gnp is not None)
count = gnp_values.count()
total_gnp = gnp_values.sum()
average_gnp = total_gnp / count if count > 0 else None

## Query 3: Join country and city, filter by North America and GNP > 10000

In [None]:
# Query 3: Using Spark SQL
query3_sql = """
select country.Name as Country, city.Name as Capital, country.GNP

from country join city on country.Capital = city.ID

where continent = "North America" and GNP > 10000

order by country.GNP desc

limit 5;
"""

result3 = spark.sql(query3_sql)
print("Query 3 Results (SQL):")
result3.show()

In [None]:
# Query 3: Using DataFrame API
result3 = country.alias('c') \
          .join(city.alias('ci'), fc.col('c.Capital') == fc.col('ci.ID')) \
          .filter((fc.col('c.Continent') == 'North America') & (fc.col('c.GNP') > 10000)) \
          .select(fc.col('c.Name').alias('Country'), fc.col('ci.Name').alias('Capital'), fc.col('c.GNP')) \
          .orderBy(fc.col('c.GNP').desc()) \
          .limit(5)

print("Query 3 Results (DataFrame API):")
result3.show()

## Query 4: Languages spoken in more than 20 countries

In [None]:
# Query 4: Using Spark SQL
query4_sql = """
select Language, count(*)

from cl

group by Language

having count(*) > 20;
"""

result4 = spark.sql(query4_sql)
print("Query 4 Results (SQL):")
result4.show()

In [None]:
# Query 4: Using DataFrame API
result4 = cl.groupBy('Language') \
          .count() \
          .filter(fc.col('count') > 20)

print("Query 4 Results (DataFrame API):")
result4.show()

In [None]:
# Query 4: Using RDD API
cl_rdd = cl.rdd
language_count = cl_rdd.map(lambda row: (row.Language, 1)) \
                .reduceByKey(lambda a, b: a + b) \
                .filter(lambda x: x[1] > 20)

## Query 5: Districts in USA with more than 10 cities

In [None]:
# Query 5: Using Spark SQL
query5_sql = """
select District, avg(Population) 

from city

where CountryCode = "USA"

group by District

having count(*) > 10;
"""

result5 = spark.sql(query5_sql)
print("Query 5 Results (SQL):")
result5.show()

In [None]:
# Query 5: Using DataFrame API
result5 = city \
          .filter(fc.col('CountryCode') == 'USA') \
          .groupBy('District') \
          .agg(fc.avg('Population').alias('Average_Population'), fc.count('*').alias('city_count')) \
          .filter(fc.col('city_count') > 10) \
          .select('District', 'Average_Population')

print("Query 5 Results (DataFrame API):")
result5.show()