# Group 3 Project LMU CMSI Database Systems 620 
# Fall 2023, Professor Latif, November 20, 2023

- Presentation and Project Database Engines 
    * Postgres
    * MySQL
    * Spark



1. Dataset 1 from Brittany and Tanya
    * Climate Change: Earth Surface Temperature Data Climate Change: Earth Surface Temperature Data (Kaggle)
    * This dataset is 89 MB with global temperatures since 1750
    

1. Dataset 2 from Mariam
    * CO₂ Emissions from Our World in Data Organization 
    * This dataset is 1.3MB with 9 columns, 30k rows and emissions since 1750

1. Dataset 3 from Nidhi and Krutik
    * Sea Level Change Change in Mean Sea Level from IMF.org (csv)
    * The dataset is 14.6 MB with 13 columns and around 35K rows.
    
    
Notes: 
1. Spark doesn't have constraints like primary or foreign keys

# Spark Database Engine #3

Apache Spark is a unified engine designed for large-scale distributed data processing, on premises in data centers or in the cloud.

Initial Steps Taken:
1. Download Apache Spark using https://spark.apache.org/downloads.html
2. %pip  install pyspark
3. %pip install --upgrade pip and homebrew

In [83]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
import warnings
warnings.filterwarnings('ignore')

In [84]:
import pyspark
import datetime

from os.path import abspath
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext


from pyspark.sql.functions import to_date, col, when, col, regexp_replace, year, expr, year, month, dayofmonth, concat, lpad, to_date, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType

In [2]:
warehouse_location = abspath('spark-warehouse')
dataset_size_in_bytes = 1073741824

def calculate_partitions(dataset_size_in_bytes, bytes_per_partition=10000000):
    return max(1, int(dataset_size_in_bytes / bytes_per_partition))

spark = (SparkSession \
    .builder \
    .appName("CMSI 620: Database Engine") \
    .config("spark.executor.memory", "11g") \
    .config("spark.executor.memoryOverhead", "8g") \
    .config("spark.driver.memory", "6g") \
    .config("spark.driver.memoryOverhead", "4g") \
    .config("spark.memory.fraction", "0.1") \
    .config("spark.executor.instances", "5") \
    .config("spark.executor.cores", "5") \
    .config("spark.sql.autoBroadcastJoinThreshold", "100m") \
    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.shuffle.partitions", calculate_partitions(dataset_size_in_bytes)) \
    .enableHiveSupport() \
    .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/26 12:04:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
threshold = 10 * 1024 * 1024  # 10 MB
if dataset_size_in_bytes < threshold:
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", dataset_size_in_bytes)
else:
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [7]:
# spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLogLevel("OFF")
# spark.sparkContext.setLogLevel("ERROR")

In [3]:
spark.sparkContext

## Methods and Variables 

In [101]:
DATABASE = "cmsi620_gpr3_db"  

GLOBAL_TEMP_CSV_FILE_LOCATION = 'GlobalLandTemperaturesByCity.csv'
EMISSIONS_CSV_FILE_LOCATION = 'CO2.csv'
SEA_LEVEL_CSV_FILE_LOCATION = 'sea_level_new.csv'
SEA_COUNTRY_CSV_FILE_LOCATION = 'country_sea.csv'

In [6]:
def get_update_cols(df, schema_cols):
    for field in schema_cols.fields:
        print(field.name, field.name)
        df = df.withColumnRenamed(field.name, field.name)
    return df

def get_tables(db):
    tables = spark.catalog.listTables(db)
    print(tables)
    for table in tables:
        print(table.name)
    return None

def get_databases():
    databases = spark.catalog.listDatabases()
    for db in databases:
        print(db.name)
    return None

## Create A Database

In [11]:
spark.sql('create database CMSI620_GPR3_db')

In [62]:
spark.sql('describe database extended CMSI620_GPR3_db')

DataFrame[info_name: string, info_value: string]

## Check Database

In [63]:
spark.catalog.listDatabases()

[Database(name='cmsi620_gpr3_db', catalog='spark_catalog', description='', locationUri='file:/Users/mariamjoan/Desktop/Spark/spark-warehouse/cmsi620_gpr3_db.db'),
 Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/Users/mariamjoan/Desktop/Spark/spark-warehouse')]

In [10]:
spark.sql('show databases').show()

+---------------+
|      namespace|
+---------------+
|cmsi620_gpr3_db|
|        default|
+---------------+



In [58]:
spark.catalog.currentDatabase()

'cmsi620_gpr3_db'

In [7]:
spark.sql("use cmsi620_gpr3_db")

23/11/26 11:57:15 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/26 11:57:15 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/11/26 11:57:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/11/26 11:57:17 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore mariamjoan@192.168.1.53
23/11/26 11:57:17 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


DataFrame[]

In [8]:
spark.catalog.currentDatabase()

'cmsi620_gpr3_db'

In [None]:
spark.catalog.listTables()

In [12]:
spark.catalog.listTables('default')

[Table(name='src', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False)]

In [14]:
spark.sql('show tables from default').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|      src|      false|
+---------+---------+-----------+



In [8]:
spark.sql('show tables from cmsi620_gpr3_db').show(truncate=False) 

+---------------+----------------------------------+-----------+
|namespace      |tableName                         |isTemporary|
+---------------+----------------------------------+-----------+
|cmsi620_gpr3_db|global_co2_emissions_country_f    |false      |
|cmsi620_gpr3_db|global_co2_emissions_country_sum_f|false      |
|cmsi620_gpr3_db|global_temp_city_f                |false      |
|cmsi620_gpr3_db|global_temp_f                     |false      |
|cmsi620_gpr3_db|sea_level_country_f               |false      |
|cmsi620_gpr3_db|sea_level_f                       |false      |
+---------------+----------------------------------+-----------+



# Dataset #1: Global Temperature
### Climate Change: Earth Surface Temperature Data Climate Change: Earth Surface Temperature Data (Kaggle)

In [12]:
global_city_schema = StructType([
    StructField("dt", DateType(), True),
    StructField("AverageTemperature", FloatType(), True),
    StructField("AverageTemperatureUncertainty", FloatType(), True),
    StructField("City", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Latitude", StringType(), True),
    StructField("Longitude", StringType(), True)
])

In [13]:
df = spark.read.csv(
    path=GLOBAL_TEMP_CSV_FILE_LOCATION,
    sep=",",
    header=True,
    quote='"',
    encoding="UTF-8",
    schema =global_city_schema
)

In [14]:
city = df.selectExpr(
    "City as city",
    "Country as country",
    "Latitude as latitude",
    "Longitude as longitude"
).dropna()

In [15]:
city = city.dropDuplicates(["city", "country", "latitude", "longitude"])

In [20]:
temp = df.selectExpr(
    "City as city",
    "Country as country",
    "dt as date_measured",
    "AverageTemperature as avg_temp",
    "AverageTemperatureUncertainty as avg_temp_uncertainty"
)

In [21]:
temp = temp.filter(col("avg_temp").isNotNull())

In [18]:
city.show()
city.printSchema()
city.count()

23/11/25 20:54:32 INFO FileSourceStrategy: Pushed Filters: 
23/11/25 20:54:32 INFO FileSourceStrategy: Post-Scan Filters: atleastnnonnulls(4, City#35, Country#36, Latitude#37, Longitude#38)
23/11/25 20:54:32 INFO HashAggregateExec: spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but current version of codegened fast hashmap does not support this aggregate.
23/11/25 20:54:32 INFO CodeGenerator: Code generated in 21.36901 ms
23/11/25 20:54:32 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 200.8 KiB, free 1223.1 MiB)
23/11/25 20:54:32 INFO BlockManagerInfo: Removed broadcast_11_piece0 on mariams-mbp-2.lan:56985 in memory (size: 5.8 KiB, free: 1223.6 MiB)
23/11/25 20:54:32 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 34.6 KiB, free 1223.1 MiB)
23/11/25 20:54:32 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on mariams-mbp-2.lan:56985 (size: 34.6 KiB, free: 1223.5 MiB)
23/11/25 20:54:32 IN

+---------------+--------------+--------+---------+
|           city|       country|latitude|longitude|
+---------------+--------------+--------+---------+
|       Ayacucho|          Peru|  13.66S|   73.49W|
|         Bantou|         China|  24.92N|  118.82E|
|       Ajdabiya|         Libya|  31.35N|   20.73E|
|         Ambala|         India|  29.74N|   77.54E|
|       Akishima|         Japan|  36.17N|  139.23E|
|      Amagasaki|         Japan|  34.56N|  136.22E|
|         Arnhem|   Netherlands|  52.24N|    5.26E|
|          Baneh|          Iran|  36.17N|   45.75E|
|      Barcelona|     Venezuela|  10.45N|   64.64W|
|       Acarigua|     Venezuela|   8.84N|   68.92W|
|     Almetyevsk|        Russia|  55.45N|   51.02E|
|         Awassa|      Ethiopia|   7.23N|   38.11E|
|         Abadan|          Iran|  29.74N|   48.00E|
|       Aberdeen|United Kingdom|  57.05N|    1.48W|
|       Adilabad|         India|  20.09N|   78.48E|
|          Aktau|    Kazakhstan|  44.20N|   51.43E|
|        Ana

23/11/25 20:54:37 INFO BlockManagerInfo: Removed broadcast_14_piece0 on mariams-mbp-2.lan:56985 in memory (size: 18.9 KiB, free: 1223.5 MiB)
23/11/25 20:54:40 INFO Executor: Finished task 11.0 in stage 14.0 (TID 50). 2730 bytes result sent to driver
23/11/25 20:54:40 INFO TaskSetManager: Finished task 11.0 in stage 14.0 (TID 50) in 3110 ms on mariams-mbp-2.lan (executor driver) (1/12)
23/11/25 20:54:40 INFO Executor: Finished task 9.0 in stage 14.0 (TID 48). 2730 bytes result sent to driver
23/11/25 20:54:40 INFO TaskSetManager: Finished task 9.0 in stage 14.0 (TID 48) in 3368 ms on mariams-mbp-2.lan (executor driver) (2/12)
23/11/25 20:54:40 INFO Executor: Finished task 0.0 in stage 14.0 (TID 39). 2773 bytes result sent to driver
23/11/25 20:54:40 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 39) in 3469 ms on mariams-mbp-2.lan (executor driver) (3/12)
23/11/25 20:54:40 INFO Executor: Finished task 6.0 in stage 14.0 (TID 45). 2730 bytes result sent to driver
23/11/25 20:54

3510

In [22]:
temp.show()
temp.printSchema()
temp.count()

+-----+-------+-------------+--------+--------------------+
| city|country|date_measured|avg_temp|avg_temp_uncertainty|
+-----+-------+-------------+--------+--------------------+
|Århus|Denmark|   1743-11-01|   6.068|               1.737|
|Århus|Denmark|   1744-04-01|   5.788|               3.624|
|Århus|Denmark|   1744-05-01|  10.644|               1.283|
|Århus|Denmark|   1744-06-01|  14.051|               1.347|
|Århus|Denmark|   1744-07-01|  16.082|               1.396|
|Århus|Denmark|   1744-09-01|  12.781|               1.454|
|Århus|Denmark|   1744-10-01|    7.95|                1.63|
|Århus|Denmark|   1744-11-01|   4.639|               1.302|
|Århus|Denmark|   1744-12-01|   0.122|               1.756|
|Århus|Denmark|   1745-01-01|  -1.333|               1.642|
|Århus|Denmark|   1745-02-01|  -2.732|               1.358|
|Århus|Denmark|   1745-03-01|   0.129|               1.088|
|Århus|Denmark|   1745-04-01|   4.042|               1.138|
|Århus|Denmark|   1750-01-01|   1.699|  

                                                                                

8235082

## Dataset #1 Quality

In [23]:
date_col = "date_measured"
temp = temp.withColumn(date_col, regexp_replace(col(date_col), "-", ""))

In [24]:
temp.take(1)

[Row(city='Århus', country='Denmark', date_measured='17431101', avg_temp=6.067999839782715, avg_temp_uncertainty=1.7369999885559082)]

In [25]:
temp.describe().show()
temp.explain()

23/11/26 10:50:34 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+--------+-----------+-------------------+------------------+--------------------+
|summary|    city|    country|      date_measured|          avg_temp|avg_temp_uncertainty|
+-------+--------+-----------+-------------------+------------------+--------------------+
|  count| 8235082|    8235082|            8235082|           8235082|             8235082|
|   mean|    null|       null|1.910898935810014E7| 16.72743263617215|  1.0285747414276099|
| stddev|    null|       null|  641959.9052579077|10.353442482004715|  1.1297332887072418|
|    min|A Coruña|Afghanistan|           17431101|           -42.704|               0.034|
|    max|  Ürümqi|   Zimbabwe|           20130901|            39.651|              15.396|
+-------+--------+-----------+-------------------+------------------+--------------------+

== Physical Plan ==
*(1) Project [City#3 AS city#84, Country#4 AS country#85, regexp_replace(cast(dt#0 as string), -, , 1) AS date_measured#131, AverageTemperature#1 AS avg_temp#87

                                                                                

## Save Dataset #1 as Table in Spark Database

In [81]:
city.write.mode("overwrite").saveAsTable(f"{DATABASE}.global_temp_city_f")

                                                                                

In [26]:
temp.write.mode("overwrite").saveAsTable(f"{DATABASE}.global_temp_f")

23/11/26 10:51:40 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/11/26 10:51:40 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/11/26 10:51:40 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/26 10:51:40 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


# Dataset #2 Co2 Emissions 
## Global CO2 Emissions Data By Year and Country Across Multiple Domains, e.g Oil, Gas, Coal, etc.

In [31]:
emissions_schema = StructType([
    StructField("Entity", StringType(), True),
    StructField("Code", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Annual_CO2_other_industry_other", FloatType(), True),
    StructField("Annual_CO2_other_industry_flaring", FloatType(), True),
    StructField("Annual_CO2_other_industry_cement", FloatType(), True),
    StructField("Annual_CO2_other_industry_gas", FloatType(), True),
    StructField("Annual_CO2_other_industry_oil", FloatType(), True),
    StructField("Annual_CO2_other_industry_coal", FloatType(), True)
])

In [32]:
co2 = spark.read.csv(
    path=EMISSIONS_CSV_FILE_LOCATION,
    sep=",",
    header=True,
    quote='"',
    encoding="UTF-8",
    schema =emissions_schema
)

In [33]:
co2tx = co2.selectExpr(
    "Entity as country",
    "Year as year_measured",
    "Annual_CO2_other_industry_other as co2_other_yr",
    "Annual_CO2_other_industry_flaring as co2_flaring_yr",
    "Annual_CO2_other_industry_cement as co2_cement_yr",
    "Annual_CO2_other_industry_gas as co2_gas_yr",
    "Annual_CO2_other_industry_oil as co2_oil_yr",
    "Annual_CO2_other_industry_coal as co2_coal_yr"
)

In [35]:
co2tx.show()
co2tx.printSchema()
co2tx.count()

+-----------+-------------+------------+--------------+-------------+----------+----------+-----------+
|    country|year_measured|co2_other_yr|co2_flaring_yr|co2_cement_yr|co2_gas_yr|co2_oil_yr|co2_coal_yr|
+-----------+-------------+------------+--------------+-------------+----------+----------+-----------+
|Afghanistan|         1949|        null|           0.0|          0.0|       0.0|       0.0|    14656.0|
|Afghanistan|         1950|        null|           0.0|          0.0|       0.0| 63203.996|    21068.0|
|Afghanistan|         1951|        null|           0.0|          0.0|       0.0|   65952.0|    25648.0|
|Afghanistan|         1952|        null|           0.0|          0.0|       0.0|   59892.0|  31707.998|
|Afghanistan|         1953|        null|           0.0|          0.0|       0.0|   68307.0|    37949.0|
|Afghanistan|         1954|        null|           0.0|          0.0|       0.0|   63754.0|    42502.0|
|Afghanistan|         1955|        null|           0.0|         

23/11/26 10:53:29 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Entity, Year, Annual_CO2_industry_other, Annual_CO2_flaring, Annual_CO2_other_cement, Annual_CO2_other_gas, Annual_CO2_other_oil, Annual_CO2_other_coal
 Schema: Entity, Year, Annual_CO2_other_industry_other, Annual_CO2_other_industry_flaring, Annual_CO2_other_industry_cement, Annual_CO2_other_industry_gas, Annual_CO2_other_industry_oil, Annual_CO2_other_industry_coal
Expected: Annual_CO2_other_industry_other but found: Annual_CO2_industry_other
CSV file: file:///Users/mariamjoan/Desktop/Spark/CO2.csv


29457

## Dataset #2 Quality

In [40]:
co2tx.describe().show()
co2tx.explain()

23/11/26 10:54:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Entity, Year, Annual_CO2_industry_other, Annual_CO2_flaring, Annual_CO2_other_cement, Annual_CO2_other_gas, Annual_CO2_other_oil, Annual_CO2_other_coal
 Schema: Entity, Year, Annual_CO2_other_industry_other, Annual_CO2_other_industry_flaring, Annual_CO2_other_industry_cement, Annual_CO2_other_industry_gas, Annual_CO2_other_industry_oil, Annual_CO2_other_industry_coal
Expected: Annual_CO2_other_industry_other but found: Annual_CO2_industry_other
CSV file: file:///Users/mariamjoan/Desktop/Spark/CO2.csv


+-------+-----------+------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|summary|    country|     year_measured|        co2_other_yr|      co2_flaring_yr|      co2_cement_yr|          co2_gas_yr|          co2_oil_yr|         co2_coal_yr|
+-------+-----------+------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+
|  count|      29457|             29457|                2386|               24875|              24974|               24943|               25042|               25069|
|   mean|       null|1937.0758393590659|1.8430048331300262E7|   3493834.438078785|  8392489.821707202|   5.0080670938516E7|1.1138560219004871E8|1.5439769972069743E8|
| stddev|       null| 67.96719006257644| 4.186320591625684E7|2.0297655092618626E7|6.300943499326838E7|3.0987496653119504E8| 6.250209486950399E8| 7.514527249915178E8|
|   

## Save Dataset #2 as Table in Spark Database

In [41]:
co2tx.write.mode("overwrite").saveAsTable(f"{DATABASE}.global_co2_emissions_country_f")

23/11/26 10:54:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Entity, Year, Annual_CO2_industry_other, Annual_CO2_flaring, Annual_CO2_other_cement, Annual_CO2_other_gas, Annual_CO2_other_oil, Annual_CO2_other_coal
 Schema: Entity, Year, Annual_CO2_other_industry_other, Annual_CO2_other_industry_flaring, Annual_CO2_other_industry_cement, Annual_CO2_other_industry_gas, Annual_CO2_other_industry_oil, Annual_CO2_other_industry_coal
Expected: Annual_CO2_other_industry_other but found: Annual_CO2_industry_other
CSV file: file:///Users/mariamjoan/Desktop/Spark/CO2.csv


### Build Emissions Table to Fit Schema and ERD

In [42]:
co2_dm = spark.sql(""" 
select
     monotonically_increasing_id() as emissions_id 
    ,country
    ,year_measured
    ,co2_coal_yr
    ,co2_oil_yr
    ,co2_gas_yr
    ,co2_cement_yr
    ,co2_flaring_yr
    ,co2_other_yr
    ,sum(co2_coal_yr + co2_oil_yr + co2_gas_yr + co2_cement_yr + co2_flaring_yr + co2_other_yr) cumulative_co2
from cmsi620_gpr3_db.global_co2_emissions_country_f
group by 1, 2, 3, 4, 5, 6, 7, 8, 9
""")

In [43]:
co2_dm.write.mode("overwrite").saveAsTable(f"{DATABASE}.global_co2_emissions_country_sum_f")

# Dataset #3 Sea Level Changes by Country

In [44]:
sea_level_schema = StructType([
    StructField("ObjectId", IntegerType(), True),
    StructField("Country", StringType(), True),
#     StructField("ISO2", IntegerType(), True),
#     StructField("ISO3", FloatType(), True),
#     StructField("Indicator", FloatType(), True),
#     StructField("Unit", FloatType(), True),
#     StructField("Source", FloatType(), True),
#     StructField("CTS_Code", FloatType(), True),
#     StructField("CTS_Name", FloatType(), True),
#     StructField("CTS_Full_Descriptor", FloatType(), True),
    StructField("Measure", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Value", FloatType(), True)
])

In [56]:
sea = spark.read.csv(
    path=SEA_LEVEL_CSV_FILE_LOCATION,
    sep=",",
    header=True,
    quote='"',
    encoding="UTF-8",
    schema =sea_level_schema
)

sea_country = spark.read.csv(
    path=SEA_COUNTRY_CSV_FILE_LOCATION,
    sep=",",
    header=True,
    quote='"',
    encoding="UTF-8"
)

In [57]:
seat = sea.selectExpr(
    "ObjectId as id",
    "Country as country",
    "Measure as sea_name",
    "Date    as date_measured",
    "Value as sea_level"
)

In [58]:
seat = seat.withColumn(
    "date_measured",
    date_format(to_date(col("date_measured"), 'MM/dd/yyyy'), 'yyyyMMdd')
)

In [59]:
seat = seat.withColumn(
    "year_measured",
    year(to_date("date_measured", 'yyyyMMdd'))
)

In [60]:
sea_country.show(1)
sea_country.printSchema()
sea_country.count()

+-------+-------------+
|country|          sea|
+-------+-------------+
| Belize|Caribbean Sea|
+-------+-------------+
only showing top 1 row

root
 |-- country: string (nullable = true)
 |-- sea: string (nullable = true)



243

In [61]:
seat.show(3)
seat.printSchema()
seat.count()

+---+-------+--------------+-------------+---------+-------------+
| id|country|      sea_name|date_measured|sea_level|year_measured|
+---+-------+--------------+-------------+---------+-------------+
|  1|  World|   Andaman Sea|     19921217|   -10.34|         1992|
|  2|  World|   Arabian Sea|     19921217|   -18.46|         1992|
|  3|  World|Atlantic Ocean|     19921217|   -15.41|         1992|
+---+-------+--------------+-------------+---------+-------------+
only showing top 3 rows

root
 |-- id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- sea_name: string (nullable = true)
 |-- date_measured: string (nullable = true)
 |-- sea_level: float (nullable = true)
 |-- year_measured: integer (nullable = true)



35604

## Save Dataset #3 as Table in Spark Database

In [62]:
seat.write.mode("overwrite").saveAsTable(f"{DATABASE}.sea_level_f")

In [65]:
sea_country.write.mode("overwrite").saveAsTable(f"{DATABASE}.sea_level_country_f")

23/11/25 23:13:36 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/11/25 23:13:36 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/11/25 23:13:36 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/11/25 23:13:36 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


# Performance 
1. Using TimeIt from Python

## Dataset #1 

In [70]:
%%timeit
query = "select * from cmsi620_gpr3_db.global_temp_f"
result = spark.sql(query)

6.03 ms ± 105 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## Dataset #2 

In [71]:
%%timeit
query = f"select * from {DATABASE}.global_co2_emissions_country_sum_f"
result = spark.sql(query)

6.46 ms ± 206 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## Dataset # 3 

In [73]:
%%timeit
query = f"select * from {DATABASE}.sea_level_f"
result = spark.sql(query)

6.11 ms ± 334 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## All Datasets # 4

In [124]:
%%timeit
query = spark.sql("""
with base as ( 
select
     coalesce( f.country, e.country, f.country )  country  
    ,s.sea_name 
    ,cast( f.date_measured as int ) as date_measured
    ,year( to_date( f.date_measured, 'yyyyMMdd')) as year_measured
    
    ,max( f.avg_temp  )  avg_temp_celsius
    ,max( round( ((f.avg_temp * 9/5) + 32) ,2)) avg_temp_faranheit
    ,max( cast( e.cumulative_co2 as int ))  cumulative_co2
    ,max( s.sea_level )  sea_level_mm 
    ,max( (s.sea_level *1.0 / 25.4 ) ) as sea_level_inches
from cmsi620_gpr3_db.global_temp_f f
    left join cmsi620_gpr3_db.global_temp_city_f c on f.country = c.country    
    left join cmsi620_gpr3_db.sea_level_country_f l on f.country  = l.country
    left join cmsi620_gpr3_db.sea_level_f s         on s.sea_name = l.sea and f.date_measured = s.date_measured
    left join cmsi620_gpr3_db.global_co2_emissions_country_sum_f e on f.country = e.country and year( to_date( f.date_measured, 'yyyyMMdd')) = e.year_measured
group by 1, 2, 3, 4
)

select 
     country
    ,sea_name
    ,year_measured
    ,date_measured
    ,avg_temp_faranheit
    ,avg_temp_celsius
    ,cumulative_co2
    ,sea_level_mm
    ,sea_level_inches
    ,row_number() over( partition by country order by year_measured, date_measured desc ) as rn 
from base
order by 1
""")

55.3 ms ± 11.6 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [123]:
%%timeit
query = spark.sql("""
with base as ( 
select
     coalesce( f.country, e.country, f.country )  country  
    ,s.sea_name 
    ,cast( f.date_measured as int ) as date_measured
    ,year( to_date( f.date_measured, 'yyyyMMdd')) as year_measured
    
    ,f.avg_temp    avg_temp_celsius
    ,round( ((f.avg_temp * 9/5) + 32) ,2) avg_temp_faranheit
    ,cast( e.cumulative_co2 as int )  cumulative_co2
    ,s.sea_level   sea_level_mm 
    ,(s.sea_level *1.0 / 25.4 )  as sea_level_inches
from cmsi620_gpr3_db.global_temp_f f
    left join cmsi620_gpr3_db.global_temp_city_f c on f.country = c.country    
    left join cmsi620_gpr3_db.sea_level_country_f l on f.country  = l.country
    left join cmsi620_gpr3_db.sea_level_f s         on s.sea_name = l.sea and f.date_measured = s.date_measured
    left join cmsi620_gpr3_db.global_co2_emissions_country_sum_f e on f.country = e.country and year( to_date( f.date_measured, 'yyyyMMdd')) = e.year_measured
)

select 
     country
    ,sea_name
    ,year_measured
    ,date_measured
    ,avg_temp_faranheit
    ,avg_temp_celsius
    ,cumulative_co2
    ,sea_level_mm
    ,sea_level_inches
    ,row_number() over( partition by country order by year_measured, date_measured desc ) as rn 
from base
order by 1
""")

51.9 ms ± 2.44 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


# Querying All (3) Datasets
- LEFT JOIN and GROUPBY and WINDOW FUNCTION

In [78]:
fq = spark.sql("""
with base as ( 
select
     coalesce( f.country, e.country, f.country )  country  
    ,s.sea_name 
    ,cast( f.date_measured as int ) as date_measured
    ,year( to_date( f.date_measured, 'yyyyMMdd')) as year_measured
    
    ,max( f.avg_temp  )  avg_temp_celsius
    ,max( cast( e.cumulative_co2 as int ))  cumulative_co2
    ,max( s.sea_level )  sea_level_mm 
    ,max( (s.sea_level *1.0 / 25.4 ) ) as sea_level_inches
from cmsi620_gpr3_db.global_temp_f f
    left join cmsi620_gpr3_db.global_temp_city_f c on f.country = c.country    
    left join cmsi620_gpr3_db.sea_level_country_f l on f.country  = l.country
    left join cmsi620_gpr3_db.sea_level_f s         on s.sea_name = l.sea and f.date_measured = s.date_measured
    left join cmsi620_gpr3_db.global_co2_emissions_country_sum_f e on f.country = e.country and year( to_date( f.date_measured, 'yyyyMMdd')) = e.year_measured
where 1=1
    and sea_name is not null
group by 1, 2, 3, 4
)

select 
     country
    ,sea_name
    ,year_measured
    ,date_measured
    ,avg_temp_faranheit
    ,avg_temp_celsius
    ,cumulative_co2
    ,sea_level_mm
    ,sea_level_inches
    ,row_number() over( partition by country order by year_measured, date_measured desc ) as rn 
from base
order by 1
""")

In [79]:
fq.show(100, truncate=False)



+---------+--------------+-------------+-------------+------------------+----------------+--------------+------------+---------------------+---+
|country  |sea_name      |year_measured|date_measured|avg_temp_faranheit|avg_temp_celsius|cumulative_co2|sea_level_mm|sea_level_inches     |rn |
+---------+--------------+-------------+-------------+------------------+----------------+--------------+------------+---------------------+---+
|Albania  |Adriatic Sea  |1993         |19931001     |65.44             |18.577          |null          |5.99        |0.23582676264244742  |1  |
|Albania  |Adriatic Sea  |1993         |19930901     |69.92             |21.068          |null          |-11.91      |-0.46889763178787836 |2  |
|Albania  |Adriatic Sea  |1993         |19930701     |75.13             |23.961          |null          |-89.61      |-3.527952779935101   |3  |
|Albania  |Adriatic Sea  |1994         |19941201     |49.58             |9.764           |null          |24.19       |0.9523622257

                                                                                

In [80]:
fq.describe().show()
fq.explain()

23/11/26 17:10:31 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+-------+------------+------------------+-------------------+------------------+------------------+--------------------+------------------+-------------------+-----------------+
|summary|country|    sea_name|     year_measured|      date_measured|avg_temp_faranheit|  avg_temp_celsius|      cumulative_co2|      sea_level_mm|   sea_level_inches|               rn|
+-------+-------+------------+------------------+-------------------+------------------+------------------+--------------------+------------------+-------------------+-----------------+
|  count|   2758|        2758|              2758|               2758|              2758|              2758|                 693|              2758|               2758|             2758|
|   mean|   null|        null|2005.1145757795505|2.005176521319797E7| 72.53365482233514|22.518680928124045|2.6457746055122656E8|16.312461903814476| 0.6422229095989934|15.41841914430747|
| stddev|   null|        null|5.7626174524599225|  57677.27896205512|1

# CRUD Operations

### 1. Create: 

In [None]:
spark.sql("""
CREATE TABLE global_temp_city_d (
    dt STRING, 
    AverageTemperature FLOAT, 
    AverageTemperatureUncertainty FLOAT, 
    City STRING, 
    Country STRING, 
    Latitude STRING, 
    Longitude STRING
)
USING CSV
PARTITIONED BY (snapshot_date, country)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  'separatorChar' = ',',
  'quoteChar' = '"',
  'escapeChar' = '\\'
)
STORED AS TEXTFILE
LOCATION file:///Users/mariamjoan/Desktop/Spark/spark-warehouse';
""")

### 2. Read

In [18]:
spark.sql(f"select max(avg_temp) max_avg_temp from {DATABASE}.global_temp_f ").show(3)

+------------+
|max_avg_temp|
+------------+
|      39.651|
+------------+



### 3. **Update** 

Spark does not have a concept of UPDATE in the traditional RDBMS sense. You can only perform "update" like operations on the RDD or Spark DataFrame but, you will need to perform a WRITE action in order for that change to be reflected back to the database.

In [41]:
update_df = spark.sql(f"select * from {DATABASE}.global_temp_f ")

In [46]:
update_df = update_df.withColumn("year", year("date_collected"))

### 4. Delete 
Spark does not have a concept of DELETE because Spark DataFrames are an immutable object, and we cannot delete rows “in place”.

In [86]:
ddf = spark.sql(f""" select * from {DATABASE}.global_temp_f where 1=1 and date_measured > 20000101""")

In [93]:
us_df= ddf.filter('country = "United States" ')
us_df.show(4)

+-----------+-------------+-------------+--------+--------------------+
|       city|      country|date_measured|avg_temp|avg_temp_uncertainty|
+-----------+-------------+-------------+--------+--------------------+
|Baton Rouge|United States|     20000201|  16.108|               0.265|
|Baton Rouge|United States|     20000301|  19.299|               0.327|
|Baton Rouge|United States|     20000401|   20.13|               0.194|
|Baton Rouge|United States|     20000501|  26.323|               0.271|
+-----------+-------------+-------------+--------+--------------------+
only showing top 4 rows



# ACID Operations

### 1. Atomicity 

In [39]:
pwd_path = !pwd

In [55]:
update_df.withColumn("avg_temp_farenheit", expr("(avg_temp * 9/5) + 32")).show(2)

+-----+-------+--------------+--------+--------------------+---------------------+----+------------------+
| city|country|date_collected|avg_temp|avg_temp_uncertainty|date_collected_format|year|avg_temp_farenheit|
+-----+-------+--------------+--------+--------------------+---------------------+----+------------------+
|Namur|Belgium|    1866-07-01|  16.911|               0.652|             18660701|1866|62.439797973632814|
|Namur|Belgium|    1866-08-01|  15.738|               0.742|             18660801|1866|60.328399658203125|
+-----+-------+--------------+--------+--------------------+---------------------+----+------------------+
only showing top 2 rows



In [57]:
try:
    # Atomic Operation 1: Transform the DataFrame
    update_fdf = update_df.withColumn("avg_temp_farenheit", expr("(avg_temp * 9/5) + 32"))

    print("Transformed DataFrame:")
    update_fdf.show(3)

    # Atomic Operation 2: Write the transformed DataFrame to a temporary file
    update_fdf.write.mode("overwrite").parquet(pwd_path)

    # Atomic Operation 3: Simulate a failure (for demonstration purposes)
    raise Exception("Simulated failure")

except Exception as e:
    # Log the exception (simulating recovery or rollback)
    print(f"Exception: {e}")
    print("Rolling back changes...")

finally:
    # Atomic Operation 4: Read the original DataFrame from the temporary file (recovery)
    recovered_update_fdf = update_fdf

    # Show the recovered DataFrame
    print("Recovered DataFrame:")
    recovered_update_fdf.show(3)

Transformed DataFrame:
+-----+-------+--------------+--------+--------------------+---------------------+----+------------------+
| city|country|date_collected|avg_temp|avg_temp_uncertainty|date_collected_format|year|avg_temp_farenheit|
+-----+-------+--------------+--------+--------------------+---------------------+----+------------------+
|Namur|Belgium|    1866-07-01|  16.911|               0.652|             18660701|1866|62.439797973632814|
|Namur|Belgium|    1866-08-01|  15.738|               0.742|             18660801|1866|60.328399658203125|
|Namur|Belgium|    1866-09-01|  14.752|               0.793|             18660901|1866| 58.55360107421875|
+-----+-------+--------------+--------+--------------------+---------------------+----+------------------+
only showing top 3 rows

Exception: An error occurred while calling o170.parquet. Trace:
py4j.Py4JException: Method parquet([class java.util.ArrayList]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEng

### 2. Consistency, 3. Isolation or Idempotency, 4. Durability 

Spark is mainly designed for distributed processing of data, and transactional guarantees are handled at data storage level like as with HADOOP or distributed file systems, or databases that can be used in conjunction with Spark not by Spark alone however, Spark does by nature resolve many of these operations such as consistency, atomicity and durability. Spark can't require commits or writes to be isolated instances or isolated from other transactions without negative impact. Spark has multiple layers of commits, from task and job level, and these can happen at different times.

# Replication

This configuration `spark.sql.files.maxRecordsPerFile` controls the maximum number of records to write out to a single file when saving a DataFrame to a distributed file system.

In [120]:
spark.conf.set("spark.sql.files.maxRecordsPerFile", "1000")
spark.conf.set("spark.default.parallelism", "5")

In [121]:
ddf_repartition_replicated = ddf.repartition(8)  # update # of partitions to cache for faster memory access  
ddf_repartition_replicated.cache()

DataFrame[city: string, country: string, date_measured: string, avg_temp: float, avg_temp_uncertainty: float]

In [122]:
ddf_repartition_replicated.show()

+-----------+--------------------+-------------+--------+--------------------+
|       city|             country|date_measured|avg_temp|avg_temp_uncertainty|
+-----------+--------------------+-------------+--------+--------------------+
|      Cagua|           Venezuela|     20000401|  26.758|               0.369|
|   Borujerd|                Iran|     20050601|  21.506|               0.534|
|Bournemouth|      United Kingdom|     20120501|  12.128|               0.253|
|    Bristol|      United Kingdom|     20010601|    14.4|               0.236|
|    Bologna|               Italy|     20060201|   4.156|               0.299|
|    Brikama|              Gambia|     20100801|  28.567|               0.244|
|    Bikaner|               India|     20040101|  15.014|               0.436|
|     Brugge|             Belgium|     20061001|  14.137|               0.148|
|     Bozhou|               China|     20090501|  21.443|               0.485|
|     Bukavu|Congo (Democratic...|     20130801|  21