# Large Dataset Loading and Preparation Scripts

Just for demonstration purposes, do not run!

In [None]:
import ee
import geemap
import os
import requests
from datetime import datetime
import geopandas as gpd
import pandas as pd
import cenpy
import pygris
import warnings
warnings.filterwarnings('ignore')

## 1. Preparation

In [None]:
ee.Authenticate()
ee.Initialize(project="gee-emilyzhou0112") # replace with your own project name

In [None]:
pa_tracts = gpd.read_file('PATH')
pa_bound = pa_tracts.dissolve() # dissolve geometry to get the boundary
pa_geom= ee.Geometry.Polygon(list(pa_bound['geometry'].iloc[0].exterior.coords)) # convert the geometry into a format suitable for gee
aoi = ee.FeatureCollection(pa_geom)

In [None]:
tolerance = 0.01
pa_tracts['geometry'] = pa_tracts['geometry'].simplify(tolerance, preserve_topology=True)
pa_tracts_ee = geemap.geopandas_to_ee(pa_tracts)

## 2. Landsat Data

In [None]:
## Define Time Period
startSpring = datetime(2022, 3, 1) # spring
endSpring = datetime(2022, 5, 31)
startSummer = datetime(2022, 6, 1) # summer
endSummer = datetime(2022, 8, 31)
startFall = datetime(2022, 9, 1) # fall
endFall = datetime(2022, 11, 30)
startWinter = datetime(2022, 12, 1) # winter
endWinter = datetime(2023, 2, 28)

# Format dates into strings that Earth Engine expects ("YYYY-MM-DD")
startSpring= startSpring.strftime('%Y-%m-%d')
endSpring = endSpring.strftime('%Y-%m-%d')
startSummer = startSummer.strftime('%Y-%m-%d')
endSummer = endSummer.strftime('%Y-%m-%d')
startFall = startFall.strftime('%Y-%m-%d')
endFall = endFall.strftime('%Y-%m-%d')
startWinter = startWinter.strftime('%Y-%m-%d')
endWinter = endWinter.strftime('%Y-%m-%d')

In [None]:
## Helper Function - Scale Bands
def apply_scale_factors(image):
    # Scale and offset values for optical bands
    optical_bands = image.select('SR_B.').multiply(0.0000275).add(-0.2)

    # Scale and offset values for thermal bands
    thermal_bands = image.select('ST_B.*').multiply(0.00341802).add(149.0)

    # Add scaled bands to the original image
    return image.addBands(optical_bands, None, True) \
                .addBands(thermal_bands, None, True)


In [None]:
## Helper Function - Mask Clouds
def cloud_mask(image):
    # Define cloud shadow and cloud bitmask (Bits 3 and 5)
    cloud_shadow_bit_mask = 1 << 3
    cloud_bit_mask = 1 << 5

    # Select the Quality Assessment (QA) band for pixel quality information
    qa = image.select('QA_PIXEL')

    # Create a binary mask to identify clear conditions (both cloud and cloud shadow bits set to 0)
    mask = qa.bitwiseAnd(cloud_shadow_bit_mask).eq(0) \
                .And(qa.bitwiseAnd(cloud_bit_mask).eq(0))

    # Update the original image, masking out cloud and cloud shadow-affected pixels
    return image.updateMask(mask)

In [None]:
def calculate_seasonal_indices(image_collection, aoi, season_name):
    """
    Calculate NDVI, SAVI, EVI, Fraction of Vegetation (FV),
    Emissivity (EM), and Land Surface Temperature (LST) for a season.

    Parameters:
    - image_collection: ee.ImageCollection, the collection of images for the season.
    - aoi: ee.Geometry, the area of interest.
    - season_name: str, name of the season (for debugging/logging purposes).

    Returns:
    - ee.Image, containing the calculated indices and LST.
    """

    # Calculate NDVI
    ndvi = image_collection.normalizedDifference(['SR_B5', 'SR_B4']).rename('NDVI')

    # Calculate SAVI
    savi = image_collection.expression(
        '1.5 * (NIR - RED) / (NIR + RED + 0.5)', {
            'NIR': image_collection.select('SR_B5'),
            'RED': image_collection.select('SR_B4')
        }
    ).rename('SAVI')

    # Calculate EVI
    evi = image_collection.expression(
        '2.5 * ((NIR - RED) / (NIR + 6 * RED - 7.5 * BLUE + 1))', {
            'NIR': image_collection.select('SR_B5'),
            'RED': image_collection.select('SR_B4'),
            'BLUE': image_collection.select('SR_B2')
        }
    ).rename('EVI')

    # NDVI min and max for Fraction of Vegetation (FV) calculation
    ndvi_min = ndvi.reduceRegion(
        reducer=ee.Reducer.min(),
        geometry=aoi,
        scale=30,
        maxPixels=1e9
    ).get('NDVI')

    ndvi_max = ndvi.reduceRegion(
        reducer=ee.Reducer.max(),
        geometry=aoi,
        scale=30,
        maxPixels=1e9
    ).get('NDVI')

    # Convert NDVI_min and NDVI_max to ee.Number
    ndvi_min = ee.Number(ndvi_min)
    ndvi_max = ee.Number(ndvi_max)

    # Fraction of Vegetation (FV)
    fv = ndvi.subtract(ndvi_min).divide(ndvi_max.subtract(ndvi_min)).pow(2).rename('FV')

    # Emissivity (EM)
    em = fv.multiply(0.004).add(0.986).rename('EM')

    # Thermal band (Band 10)
    thermal = image_collection.select('ST_B10').rename('thermal')

    # Land Surface Temperature (LST)
    lst = thermal.expression(
        '(TB / (1 + (0.00115 * (TB / 1.438)) * log(em))) - 273.15',
        {
            'TB': thermal.select('thermal'),  # Thermal band temperature in Kelvin
            'em': em  # Emissivity
        }
    ).rename('LST')

    seasonal_image = ndvi.addBands([savi, evi, fv, em, lst])
    return seasonal_image

In [None]:
seasons = {
    'spring': (startSpring, endSpring),
    'summer': (startSummer, endSummer),
    'fall': (startFall, endFall),
    'winter': (startWinter, endWinter)
}

seasonal_results = {}
for season, (start_date, end_date) in seasons.items():
    image_collection = ee.ImageCollection("LANDSAT/LC08/C02/T1_L2") \
        .filterBounds(aoi) \
        .filterDate(start_date, end_date) \
        .map(apply_scale_factors) \
        .map(cloud_mask) \
        .median() \
        .clip(aoi)

    seasonal_results[season] = calculate_seasonal_indices(image_collection, aoi, season)


In [None]:
# Function to export zonal stats to Google Drive
def export_zonal_stats(image, reducer, file_name, folder_name="FILE NAME"):
    """
    Exports zonal statistics of an image band to Google Drive as a CSV.

    Parameters:
    - image: ee.Image, the image containing the band to export.
    - reducer: ee.Reducer, the reducer to aggregate data (e.g., mean, median).
    - file_name: str, name of the file (e.g., 'ndvi_spring.csv').
    - folder_name: str, Google Drive folder to save the file in.
    """
    zonal_stats = image.reduceRegions(
        collection=pa_tracts_ee,
        reducer = ee.Reducer.mean()
        scale=30  # Resolution of the analysis
    )

    task = ee.batch.Export.table.toDrive(
        collection=zonal_stats,
        fileFormat='CSV',
        fileNamePrefix=file_name.replace('.csv', ''),
        folder=folder_name
    )
    task.start()
    print(f"Export started for {file_name}. Check Google Drive for the results.")


In [None]:
# Seasonal results containing all seasonal images with bands
seasonal_results = {
    "spring": seasonal_results['spring'],
    "summer": seasonal_results['summer'],
    "fall": seasonal_results['fall'],
    "winter": seasonal_results['winter']
}

# List of bands to process
bands = ['NDVI', 'EVI', 'SAVI', 'LST']

# Export each band for every season
for season, image in seasonal_results.items():
    for band in bands:
        band_image = image.select(band)  # Extract specific band
        file_name = f"{band.lower()}_{season}.csv"  # File name e.g., ndvi_spring.csv
        export_zonal_stats(image=band_image, reducer=reducer, file_name=file_name)

## 3. Land Cover Data

In [None]:
dataset = ee.ImageCollection('USGS/NLCD_RELEASES/2021_REL/NLCD')
nlcd2021 = dataset.filter(ee.Filter.eq('system:index', '2021')).first()
landcover = nlcd2021.select('landcover')
pa_landcover = landcover.clip(aoi)

In [None]:
high_density = pa_landcover.eq(23).Or(pa_landcover.eq(24))
low_density = pa_landcover.eq(21).Or(pa_landcover.eq(22))
forest = pa_landcover.eq(41).Or(pa_landcover.eq(42)).Or(pa_landcover.eq(43))
grasses = pa_landcover.eq(52).Or(pa_landcover.eq(71)).Or(pa_landcover.eq(81)).Or(pa_landcover.eq(82))
wetlands = pa_landcover.eq(90).Or(pa_landcover.eq(95))
open_water = pa_landcover.eq(11)

In [None]:
def neighboring_landcover_metrics(landcover_mask, file_name):
    """
    Function to calculate total and neighboring land cover metrics and export them as a CSV.

    Args:
        landcover_mask (ee.Image): Binary mask of the landcover categories to analyze.
        pa_tracts_ee (ee.FeatureCollection): The census tracts FeatureCollection.
        description (str): Description for the export task.
        file_name (str): File name prefix for the exported CSV.
    """
    # Define the kernel for neighboring pixels
    kernel = ee.Kernel.square(radius=1, units='pixels')  # 3x3 neighborhood
    neighbors = landcover_mask.convolve(kernel).gte(1)  # At least one neighbor

    # Calculate total landcover pixels
    total_landcover = landcover_mask.reduceRegions(
        collection=pa_tracts_ee,
        reducer=ee.Reducer.sum(),
        scale=30
    ).select(['sum'], ['total_landcover'])

    # Calculate neighboring landcover pixels
    neighbor_landcover = neighbors.reduceRegions(
        collection=pa_tracts_ee,
        reducer=ee.Reducer.sum(),
        scale=30
    ).select(['sum'], ['neighbor_landcover'])

    # Merge FeatureCollections and retain geoid
    merged_fc = total_landcover.map(lambda feature:
        feature.set(
            'neighbor_landcover',
            neighbor_landcover.filter(ee.Filter.eq('system:index', feature.get('system:index')))
                              .first()
                              .get('neighbor_landcover')
        ).set(
            'geoid', pa_tracts_ee.filter(ee.Filter.eq('system:index', feature.get('system:index')))
                                 .first()
                                 .get('GEOID')
        )
    )

    # Export the merged FeatureCollection
    export_task = ee.batch.Export.table.toDrive(
        collection=merged_fc.select(['geoid', 'total_landcover', 'neighbor_landcover']),
        folder='FOLDER NAME',
        fileNamePrefix=file_name,
        fileFormat='CSV'
    )
    export_task.start()
    print(f"Export task started: {file_name}")

In [None]:
neighboring_landcover_metrics(
    landcover_mask=forest,
    file_name='forest_landcover_metrics'
)

neighboring_landcover_metrics(
    landcover_mask=high_density,
    file_name='high_density_landcover_metrics'
)

In [None]:
def summarize_landcover_pixels(landcover_mask, file_name):
    """
    Function to summarize total landcover pixels for each tract and export as a CSV.

    Args:
        landcover_mask (ee.Image): Binary mask of the landcover categories to analyze.
        pa_tracts_ee (ee.FeatureCollection): The census tracts FeatureCollection.
        description (str): Description for the export task.
        file_name (str): File name prefix for the exported CSV.
    """
    # Calculate total landcover pixels
    total_landcover = landcover_mask.reduceRegions(
        collection=pa_tracts_ee,
        reducer=ee.Reducer.sum(),
        scale=30
    ).map(lambda feature: feature.set(
        'geoid', feature.get('GEOID')
    ))

    # Export the results to Drive
    export_task = ee.batch.Export.table.toDrive(
        collection=total_landcover.select(['geoid', 'sum']),
        folder='FOLDER NAME',
        fileNamePrefix=file_name,
        fileFormat='CSV'
    )
    export_task.start()
    print(f"Export task started: {file_name}")

In [None]:
landcover_list = [
    {'mask': grasses, 'file_name': 'grasses_landcover'},
    {'mask': low_density, 'file_name': 'low_density_landcover'},
    {'mask': wetlands, 'file_name': 'wetlands_landcover'},
    {'mask': open_water, 'file_name': 'open_water_landcover'}
]

for landcover in landcover_list:
    summarize_landcover_pixels(landcover['mask'], landcover['file_name'])

## 4. Tobacco Retailer Data

In [None]:
all_retailers = pd.read_csv('PATH')
pa_retailers = all_retailers[all_retailers['state'] == 'PA']
pa_retailers = pa_retailers[["county", "license_type", "lat", "lon"]]

In [None]:
pa_retailers.to_csv('PATH', index=False)

## 5. CDC Data

In [None]:
cdc_data = pd.read_csv("PATH")

In [None]:
# process CRD data
PA_Asthma = cdc_data[(cdc_data['Measure'] == "Current asthma among adults") & (cdc_data['StateAbbr'] == "PA")]
PA_COP = cdc_data[(cdc_data['Measure'] == "Chronic obstructive pulmonary disease among adults") & (cdc_data['StateAbbr'] == "PA")]
PA_Chronic = PA_Asthma.merge(
    PA_COP[['LocationName', 'Data_Value']],
    on="LocationName",
    how="left"
).rename(columns={"Data_Value_x": "Asthma", "Data_Value_y": "COP"})

In [None]:
PA_Chronic.to_csv('PATH', index=False)

In [None]:
# process HRB data
PA_Smoking = cdc_data[(cdc_data['Measure'] == "Current cigarette smoking among adults") & (cdc_data['StateAbbr'] == "PA")]
PA_Drinking = cdc_data[(cdc_data['Measure'] == "Binge drinking among adults") & (cdc_data['StateAbbr'] == "PA")]
PA_Physical_Activity = cdc_data[(cdc_data['Measure'] == "No leisure-time physical activity among adults") & (cdc_data['StateAbbr'] == "PA")]
PA_Short_Sleep = cdc_data[(cdc_data['Measure'] == "Short sleep duration among adults") & (cdc_data['StateAbbr'] == "PA")]

PA_HRB = PA_Smoking.merge(
    PA_Drinking[['LocationName', 'Data_Value']], on='LocationName', how='left'
).rename(columns={"Data_Value_x": "Smoking", "Data_Value_y": "Drinking"})

PA_HRB = PA_HRB.merge(
    PA_Physical_Activity[['LocationName', 'Data_Value']], on='LocationName', how='left'
).rename(columns={'Data_Value': 'Physical_Activity'})

PA_HRB = PA_HRB.merge(
    PA_Short_Sleep[['LocationName', 'Data_Value']], on='LocationName', how='left'
).rename(columns={'Data_Value': 'Short_Sleep'})
PA_HRB[['LocationName', 'Smoking', 'Drinking', 'Physical_Activity', 'Short_Sleep']]

In [None]:
PA_HRB.to_csv('PATH', index=False)

## 6. Census Data

In [None]:
acs = cenpy.remote.APIConnection("ACSDT5Y2022")

In [None]:
census_var = ["NAME",
              "B02001_001E", # total
              "B02001_002E", # white
              "B02001_003E", # black
              "B02001_004E", # native american
              "B02001_005E", # asian
              "B03002_012E", # hispanic
              'B01001_020E', # male 65-66
              'B01001_021E', # male 67-69
              'B01001_022E', # male 70-74
              'B01001_023E', # male 75-79
              'B01001_024E', # male 80-84
              'B01001_025E', # male over 85
              'B01001_044E', # female 65-66
              'B01001_045E', # female 67-69
              'B01001_046E', # female 70-74
              'B01001_047E', # female 75-79
              'B01001_048E', # female 80-84
              'B01001_049E', # female over 85
              'B18101_007E', # Male 5 to 17 years With a disability
              'B18101_010E', # Male 18 to 34 years With a disability
              'B18101_013E', # Male 35 to 64 years With a disability
              'B18101_016E', # Male 65 to 74 years With a disability
              'B18101_019E', # Male over 75 years With a disability
              'B18101_026E', # Female 5 to 17 years With a disability
              'B18101_029E', # Female 18 to 34 years With a disability
              'B18101_032E', # Female 35 to 64 years With a disability
              'B18101_035E', # Female 65 to 74 years With a disability
              'B18101_038E'
             ]

In [None]:
pa_state_code = "42"
census_data = acs.query(
    cols=census_var,
    geo_unit="tract",
    geo_filter={"state": pa_state_code}
)
for variable in census_var:
    if variable != "NAME":
        census_data[variable] = census_data[variable].astype(float)

In [None]:
census_data['minority'] = (
    (census_data['B02001_001E'] - census_data['B02001_002E']) / census_data['B02001_001E']
)
census_data['aging'] = (
    census_data[[
        'B01001_020E', 'B01001_021E', 'B01001_022E', 'B01001_023E',
        'B01001_024E', 'B01001_025E', 'B01001_044E', 'B01001_045E',
        'B01001_046E', 'B01001_047E', 'B01001_048E', 'B01001_049E'
    ]].sum(axis=1) / census_data['B02001_001E']
)
census_data['disability'] = (
    census_data[[
        'B18101_007E', 'B18101_010E', 'B18101_013E', 'B18101_016E',
        'B18101_019E', 'B18101_026E', 'B18101_029E', 'B18101_032E',
        'B18101_035E', 'B18101_038E'
    ]].sum(axis=1) / census_data['B02001_001E']
)

In [None]:
census_data = census_data[["NAME", "county", "tract", "minority", "aging", "disability"]]
tracts = pygris.tracts(state=pa_state_code, year=2022)
pa_census_data = tracts.merge(census_data, left_on=["COUNTYFP", "TRACTCE"], right_on=["county", "tract"],)
pa_census_data = pa_census_data[["GEOID", "minority", "aging", "disability", "geometry"]]

In [None]:
pa_census_data.to_csv('PATH', index=False)