In [None]:
import pandas as pd


finance_df = pd.read_csv('./Finance.csv')
industry_df = pd.read_csv('./Industry.csv')
education_df = pd.read_csv('./Educationv.csv')

# check if there are missing values
print(finance_df.isnull().sum())
print(industry_df.isnull().sum())
print(education_df.isnull().sum())
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db = client["regional_economy"]
finance_collection = db["finance"]
industry_collection = db["industry"]
education_collection = db["education"]

# finance_collection.delete_many({})
# industry_collection.delete_many({})
# education_collection.delete_many({})

# finance_data = finance_df.to_dict("records")
# finance_collection.insert_many(finance_data)

# industry_data = industry_df.to_dict("records")
# industry_collection.insert_many(industry_data)

# education_data = education_df.to_dict("records")
# education_collection.insert_many(education_data)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import time
spark = SparkSession.builder \
    .appName("RegionalEconomyAnalysis") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/regional_economy.finance") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/regional_economy.finance") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:2.4.2,org.mongodb:mongodb-driver-core:3.12.10") \
    .getOrCreate()


finance_df = spark.read.format("mongo") \
    .option("collection", "finance") \
    .load()


industry_df = spark.read.format("mongo") \
    .option("uri", "mongodb://127.0.0.1/regional_economy.industry") \
    .load()


education_df = spark.read.format("mongo") \
    .option("uri", "mongodb://127.0.0.1/regional_economy.education") \
    .load()

finance_df.createOrReplaceTempView("finance")
start_time = time.time()
sql_df = spark.sql("SELECT Year, SUM(Less_than_5000) as total_income_under_5000 FROM finance GROUP BY Year")
sql_df.show()
sql_time = time.time() - start_time
print("Spark SQL takes: {:.3f} s".format(sql_time))

#Spark Implementation (Transformations, SQL Queries, Performance Optimization)
start_time = time.time()
df_api = finance_df.groupBy("Year").sum("Less_than_5000") \
    .withColumnRenamed("sum(Less_than_5000)", "total_income_under_5000")
df_api.show()
df_api_time = time.time() - start_time
print("DataFrame API takes: {:.3f} s".format(df_api_time))
finance_df.explain()
print("Industry DataFrame Schema:")
industry_df.printSchema()
industry_df.show(5)

print("Education DataFrame Schema:")
education_df.printSchema()
education_df.show(5)


finance_df = finance_df.withColumn("Less_than_5000", col("Less_than_5000").cast("integer")) \
                       .withColumn("5000_to_9999", col("5000_to_9999").cast("integer")) \
                       .withColumn("10000_to_14999", col("10000_to_14999").cast("integer"))

industry_df = industry_df.na.fill(0)
education_df = education_df.na.fill("Unknown")


joined_df = finance_df.join(education_df, on="Year", how="inner")
joined_df.show(5)

agg_finance = finance_df.groupBy("Year").sum("Less_than_5000")
agg_finance = agg_finance.withColumnRenamed("sum(Less_than_5000)", "total_income_under_5000")
agg_finance.show()

pandas_df = agg_finance.toPandas()

plt.figure(figsize=(10,6))
plt.plot(pandas_df["Year"], pandas_df["total_income_under_5000"], marker='o')
plt.xlabel("Year")
plt.ylabel("Total Income (Under $5000)")
plt.title("Trend of Low Income Population Over Years")
plt.grid(True)
plt.show()

spark.stop()