In [3]:

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Environmental Data Analysis") \
    .getOrCreate()
def load_data_spark(file_path):
    return spark.read.csv(file_path, header=True, inferSchema=True)

data = load_data_spark('data_dictionary.csv')



In [5]:
# Load the CalEnviroScreen data dictionary
calenviroscreen_data_dictionary_path = 'calenviroscreen40resultsdatadictionary_f_2021.csv'
calenviroscreen_data_dictionary = load_data_spark(calenviroscreen_data_dictionary_path)
calenviroscreen_data_dictionary.show(5)

24/05/06 19:11:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------------+----------------+-----------------+-----+--------------------+------------+----------+-------------+------------------+------------------------+-----+----------+-----------+----------+---------+--------------+--------------+-------------------+-----+---------+----------+---------------+------------+-----------------+-----------+------------+-------------+------------------+-------------------+------------------------+----------+---------------+-----------------+----------------------+-----------+----------------+----------------+----------------------+---------------------+------+-----------+----------------+---------------------+----------------------+---------------------------+---------+--------------+--------------------+-------------------------+-------+------------+------------+-----------------+--------------+-------------------+-----------+----------------+---------------+
|Census Tract|Total Population|California County|  ZIP|Approximate Location|   Longitude|  L

In [10]:
cal_env_df = calenviroscreen_data_dictionary.select(
    'Census Tract', 'Total Population', 'California County', 'ZIP', 'Approximate Location',
    'Longitude', 'Latitude', 'Ozone', '`PM2.5`', 'Diesel PM', 'Drinking Water', 'Lead', 'Pesticides',
    '`Tox. Release`', 'Traffic', 'Cleanup Sites', 'Groundwater Threats', '`Haz. Waste`', '`Imp. Water Bodies`',
    'Solid Waste', 'Pollution Burden', 'Asthma', 'Low Birth Weight', 'Cardiovascular Disease',
    'Education', 'Linguistic Isolation', 'Poverty', 'Unemployment', 'Housing Burden'
)


cal_env_df.show(5)

+------------+----------------+-----------------+-----+--------------------+------------+----------+-----+-----------+---------+--------------+-----+----------+------------+-----------+-------------+-------------------+----------+-----------------+-----------+----------------+------+----------------+----------------------+---------+--------------------+-------+------------+--------------+
|Census Tract|Total Population|California County|  ZIP|Approximate Location|   Longitude|  Latitude|Ozone|      PM2.5|Diesel PM|Drinking Water| Lead|Pesticides|Tox. Release|    Traffic|Cleanup Sites|Groundwater Threats|Haz. Waste|Imp. Water Bodies|Solid Waste|Pollution Burden|Asthma|Low Birth Weight|Cardiovascular Disease|Education|Linguistic Isolation|Poverty|Unemployment|Housing Burden|
+------------+----------------+-----------------+-----+--------------------+------------+----------+-----+-----------+---------+--------------+-----+----------+------------+-----------+-------------+-----------------

In [11]:
from pyspark.sql import SparkSession

def load_data_spark(file_path):
    return spark.read.csv(file_path, header=True, inferSchema=True)

# Load the demographic profile data using Spark
demographic_profile_path = 'demographic_profile.csv'
demographic_profile = load_data_spark(demographic_profile_path)

# Display the first few rows of the demographic profile data to understand its structure
demographic_profile.show(5)

# Merge cal_env_df with demographic_profile on 'Census Tract'
merged_df = cal_env_df.join(
    demographic_profile.select('Census Tract', 'Children < 10 years (%)', 'Pop 10-64 years (%)', 'Elderly > 64 years (%)'),
    on='Census Tract',
    how='left' 
)

# Print the columns of the merged DataFrame
print(merged_df.columns)

# Display unique values in 'California County' column
merged_df.select('California County').distinct().show()


+------------+-------------+------------------+------------------------+-----------------+----------------+-----------------------+-------------------+----------------------+------------+---------+--------------------+-------------------+------------------+------------------+
|Census Tract|CES 4.0 Score|CES 4.0 Percentile|CES 4.0 Percentile Range|California County|Total Population|Children < 10 years (%)|Pop 10-64 years (%)|Elderly > 64 years (%)|Hispanic (%)|White (%)|African American (%)|Native American (%)|Asian American (%)|Other/Multiple (%)|
+------------+-------------+------------------+------------------------+-----------------+----------------+-----------------------+-------------------+----------------------+------------+---------+--------------------+-------------------+------------------+------------------+
|  6001400100|         4.85|               2.8|    1-5% (lowest scores)|         Alameda |            3120|                   7.82|              66.12|                 2

In [12]:
from pyspark.sql.functions import col

# Define the list of Bay Area counties, ensure no trailing spaces in the names
bay_area_counties = ['Alameda', 'Contra Costa', 'Marin', 'Napa', 'San Francisco', 'San Mateo', 'Santa Clara', 'Solano']

# Filter rows where 'California County' is in the list of Bay Area counties
ces_bay_df = merged_df.filter(col('California County').isin(bay_area_counties))

# Display unique values in 'California County' column from the filtered DataFrame
ces_bay_df.select('California County').distinct().show()


+-----------------+
|California County|
+-----------------+
|    San Francisco|
|      Santa Clara|
|     Contra Costa|
|        San Mateo|
+-----------------+



In [14]:

# Print the schema of the DataFrame
ces_bay_df.printSchema()



root
 |-- Census Tract: long (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- California County: string (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- Approximate Location: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Ozone: double (nullable = true)
 |-- PM2.5: double (nullable = true)
 |-- Diesel PM: double (nullable = true)
 |-- Drinking Water: string (nullable = true)
 |-- Lead: string (nullable = true)
 |-- Pesticides: double (nullable = true)
 |-- Tox. Release: double (nullable = true)
 |-- Traffic: string (nullable = true)
 |-- Cleanup Sites: double (nullable = true)
 |-- Groundwater Threats: double (nullable = true)
 |-- Haz. Waste: double (nullable = true)
 |-- Imp. Water Bodies: integer (nullable = true)
 |-- Solid Waste: double (nullable = true)
 |-- Pollution Burden: double (nullable = true)
 |-- Asthma: string (nullable = true)
 |-- Low Birth Weight: string (nullable = true)

In [17]:
from pyspark.sql.functions import col, sum as spark_sum

na_counts = ces_bay_df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) if " " not in c and "." not in c else spark_sum(col(f"`{c}`").isNull().cast("int")).alias(c)
    for c in ces_bay_df.columns
])

na_counts_row = na_counts.collect()[0]

# Iterate through each column and check for nulls
na_cols = []
for c in ces_bay_df.columns:
    na_count = na_counts_row[c]
    if na_count != 0:
        na_cols.append(c)
    print(c, na_count)


# Save the DataFrame to CSV
ces_bay_df.write.csv('ces_bay.csv', mode='overwrite', header=True)


Census Tract 0
Total Population 0
California County 0
ZIP 0
Approximate Location 0
Longitude 0
Latitude 0
Ozone 0
PM2.5 0
Diesel PM 0
Drinking Water 0
Lead 0
Pesticides 0
Tox. Release 0
Traffic 0
Cleanup Sites 0
Groundwater Threats 0
Haz. Waste 0
Imp. Water Bodies 0
Solid Waste 0
Pollution Burden 0
Asthma 0
Low Birth Weight 0
Cardiovascular Disease 0
Education 0
Linguistic Isolation 0
Poverty 0
Unemployment 0
Housing Burden 0
Children < 10 years (%) 0
Pop 10-64 years (%) 0
Elderly > 64 years (%) 0


In [18]:
from pyspark.sql import Window
from pyspark.sql.functions import col, expr, coalesce

# Define a window specification
windowSpec = Window.partitionBy('California County')

# Calculate medians and update DataFrame
for c in na_cols:
    median_df = ces_bay_df.withColumn(c + "_median", expr(f"percentile_approx({c}, 0.5)").over(windowSpec))
    ces_bay_df = ces_bay_df.join(
        median_df.select("California County", c + "_median"),
        on="California County",
        how="left"
    )
    ces_bay_df = ces_bay_df.withColumn(c, coalesce(col(c), col(c + "_median"))).drop(c + "_median")

# Show the DataFrame to confirm changes
ces_bay_df.show()


+------------+----------------+-----------------+-----+--------------------+------------+----------+-----+-----------+---------+--------------+-----+----------+------------+-----------+-------------+-------------------+----------+-----------------+-----------+----------------+------+----------------+----------------------+---------+--------------------+-------+------------+--------------+-----------------------+-------------------+----------------------+
|Census Tract|Total Population|California County|  ZIP|Approximate Location|   Longitude|  Latitude|Ozone|      PM2.5|Diesel PM|Drinking Water| Lead|Pesticides|Tox. Release|    Traffic|Cleanup Sites|Groundwater Threats|Haz. Waste|Imp. Water Bodies|Solid Waste|Pollution Burden|Asthma|Low Birth Weight|Cardiovascular Disease|Education|Linguistic Isolation|Poverty|Unemployment|Housing Burden|Children < 10 years (%)|Pop 10-64 years (%)|Elderly > 64 years (%)|
+------------+----------------+-----------------+-----+--------------------+------

In [22]:
from pyspark.sql.functions import col, sum as spark_sum, expr

# Create a list to correctly handle column names with special characters in expressions
null_counts = ces_bay_df.select([
    spark_sum(expr(f"`{c}`").isNull().cast("int")).alias(c) if '.' in c or ' ' in c else
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in ces_bay_df.columns
])

# Show the count of null values in each column
null_counts.show()


+------------+----------------+-----------------+---+--------------------+---------+--------+-----+-----+---------+--------------+----+----------+------------+-------+-------------+-------------------+----------+-----------------+-----------+----------------+------+----------------+----------------------+---------+--------------------+-------+------------+--------------+-----------------------+-------------------+----------------------+
|Census Tract|Total Population|California County|ZIP|Approximate Location|Longitude|Latitude|Ozone|PM2.5|Diesel PM|Drinking Water|Lead|Pesticides|Tox. Release|Traffic|Cleanup Sites|Groundwater Threats|Haz. Waste|Imp. Water Bodies|Solid Waste|Pollution Burden|Asthma|Low Birth Weight|Cardiovascular Disease|Education|Linguistic Isolation|Poverty|Unemployment|Housing Burden|Children < 10 years (%)|Pop 10-64 years (%)|Elderly > 64 years (%)|
+------------+----------------+-----------------+---+--------------------+---------+--------+-----+-----+---------+---

In [26]:
from pyspark.sql.functions import col, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

window_spec = Window.partitionBy([col('`' + c + '`') if '.' in c or ' ' in c else col(c) for c in ces_bay_df.columns]).orderBy(monotonically_increasing_id())
ces_bay_df = ces_bay_df.withColumn("row_number", row_number().over(window_spec))
ces_bay_df = ces_bay_df.filter(col("`PM2.5`") > 10).select(["`PM2.5`", "`Tox. Release`"])
duplicate_count = ces_bay_df.filter(col("row_number") > 1).count()
print("Number of duplicate rows: ", duplicate_count)


ces_bay_df = ces_bay_df.drop("row_number")


Number of duplicate rows:  0


In [27]:
# Save the DataFrame to a CSV file
ces_bay_df.write.csv('ces_bay_cleaned.csv', mode='overwrite', header=True)
