# County wise concatenation of parquet files using Spark

- We use spark to join our county wise segmented files. 
- This method is computationally intensive due to the union operations

In [2]:
!pip install netCDF4



In [41]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession    
spark = SparkSession.builder.getOrCreate()
df = spark.sql("select 'spark' as hello ")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



# importing libraries

In [42]:
import os
from netCDF4 import Dataset
import numpy as np
import pandas as pd 
from datetime import datetime, timedelta
import pickle


In [33]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType, IntegerType
from pyspark.sql import DataFrame

#### Reading netcdf files

In [43]:
def unpack(spark, path, lat_location, lon_location, id):
    # data = Dataset(path, 'r')
    try:
        data = Dataset(path, 'r')
    except Exception as e:
        print(f"An error occurred while opening the file: {str(e)}")
        return None  # Return None to indicate failure
    variable_name = list(data.variables.keys())[-1]
    # Storing the lat and lon data into the variables 
    lat = data.variables['lat'][:]
    lon = data.variables['lon'][:]


    # Squared difference of lat and lon 
    sq_diff_lat = (lat - lat_location)**2
    sq_diff_lon = (lon - lon_location)**2

    # Identifying the index of the minimum value for lat and lon 
    min_index_lat = sq_diff_lat.argmin()
    min_index_lon = sq_diff_lon.argmin()

    feature = data.variables[variable_name]


    days = data.variables['day']
    start_date = datetime(1900, 1, 1)  # Start date in the 1900 system
    dates = [start_date + timedelta(days=int(day)) for day in days]


    schema = StructType([
            StructField("Date", DateType(), True),
            StructField(variable_name, FloatType(), True),
            StructField("ID", IntegerType(), True)
        ])
    # df = pd.DataFrame(columns=['Date', variable_name])
    # df['Date'] = dates
    data_list = []

    dt = np.arange(0, data.variables['day'].size)
    for time_index in dt:
        # Use numpy.ma.getdata to get unmasked values
        feature_values = float(feature[time_index, min_index_lat, min_index_lon])
        # data_list.append((dates[time_index], feature_values))
        data_list.append((dates[time_index], feature_values, id))  # Include the "county" value


        # Now, you can assign the unmasked values to the 'Temperature' column
        # df.at[time_index, variable_name] = feature_values
    
    
    spark_df = spark.createDataFrame(data_list, schema)

    return spark_df

In [44]:
coordinates_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/One Touch/BigDataProject-Capstone/dataCollection/county_cooridnates.csv")

In [45]:
coordinates_df.printSchema()

root
 |-- county: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- ID: integer (nullable = true)



In [17]:
coordinates_df.show()

+----------+----------+-----------+---+
|    county|       lat|       long| ID|
+----------+----------+-----------+---+
|     adams|39.9787786|-91.2110065|  0|
| alexander|37.1801529|-89.3502834|  1|
|      bond|38.8630331|-89.4391416|  2|
|     boone| 42.321246|-88.8235511|  3|
|     brown|39.9498214|-90.7485656|  4|
|    bureau|41.4016294|-89.5341179|  5|
|   calhoun|39.1397507|-90.6506113|  6|
|   carroll|42.0647352|-89.9556785|  7|
|      cass|39.9682127| -90.250722|  8|
| champaign|40.1164205|-88.2433829|  9|
| christian|39.5212598|-89.2829783| 10|
|     clark|39.3260541|-87.7838526| 11|
|      clay|38.7340694|-88.4910693| 12|
|   clinton|38.5896187| -89.420064| 13|
|     coles|39.5266741|-88.2184999| 14|
|      cook|41.8197385| -87.756525| 15|
|  crawford|39.0131644|-87.7291001| 16|
|cumberland|39.2744299|-88.2423795| 17|
|   de witt|40.1734773|-88.9003733| 18|
|   douglas|39.7628415|-88.2170516| 19|
+----------+----------+-----------+---+
only showing top 20 rows



In [37]:
path = '/Volumes/One Touch/BigDataProject-Capstone/gridMET/ Minimum Near-Surface Relative Humidity/rmin_1980.nc'

In [28]:
schema = StructType([
    StructField("Date", DateType(), True),
    StructField("relative_humidity", FloatType(), True),
    StructField("ID", IntegerType(), True)
])

In [47]:
import time

# Spark union for concatenation

In [48]:
start_time = time.time()
concatenated_df = None

for row in coordinates_df.rdd.collect():
    ID = row['ID']
    lat_location = row['lat']
    lon_location = row['long']
    
    # Call the unpack function for the current row
    result_df = unpack(spark, path, lat_location, lon_location, ID)
    
    if isinstance(result_df, DataFrame):
        if concatenated_df is None:
            concatenated_df = result_df
        else:
            concatenated_df = concatenated_df.unionAll(result_df)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")


  feature_values = float(feature[time_index, min_index_lat, min_index_lon])


Execution time: 97.86711406707764 seconds


In [27]:
concatenated_df.show()


23/10/26 02:38:27 WARN DAGScheduler: Broadcasting large task binary with size 5.6 MiB


+----------+-----------------+---+
|      Date|relative_humidity| ID|
+----------+-----------------+---+
|1980-01-01|             44.8|  0|
|1980-01-02|             40.2|  0|
|1980-01-03|             43.7|  0|
|1980-01-04|             52.5|  0|
|1980-01-05|             55.5|  0|
|1980-01-06|             67.8|  0|
|1980-01-07|             50.4|  0|
|1980-01-08|             56.1|  0|
|1980-01-09|             49.1|  0|
|1980-01-10|             58.7|  0|
|1980-01-11|             46.2|  0|
|1980-01-12|             43.8|  0|
|1980-01-13|             59.4|  0|
|1980-01-14|             55.1|  0|
|1980-01-15|             68.1|  0|
|1980-01-16|             60.3|  0|
|1980-01-17|             59.7|  0|
|1980-01-18|             61.1|  0|
|1980-01-19|             58.9|  0|
|1980-01-20|             54.1|  0|
+----------+-----------------+---+
only showing top 20 rows



In [13]:
concatenated_df.select('county').distinct().count()

23/10/26 02:07:44 WARN DAGScheduler: Broadcasting large task binary with size 5.0 MiB
23/10/26 02:11:52 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

694